Goでのスケーラブルなキーバリューストアの構築
James Reed
Infrastructure Engineer · Leapcell

Goにおける分散キーバリューストアの紹介
データの指数関数的な増加と、高可用性およびスケーラブルなシステムへの需要の高まりにより、従来のモノリシックなデータベースではしばしば不十分になっています。ソーシャルメディアプラットフォームからEコマースサイトまで、現代のアプリケーションは、複数のマシンにわたって膨大な量のデータを迅速かつ確実に保存および取得するメカニズムを必要としています。そこで、分散キーバリューストアが活躍します。これらは、リレーショナルデータベースよりもシンプルなデータモデルを提供し、パフォーマンスと水平スケーラビリティに焦点を当てているため、今日のクラウドネイティブアーキテクチャの基盤となっています。Goは、優れた並行処理プリミティブ、堅牢な標準ライブラリ、および強力なパフォーマンス特性により、そのようなシステムを構築するための理想的な言語です。この記事では、Goを使用したシンプルな分散キーバリューストアの開発プロセスを、その基本原則と実践的な実装に触れながらガイドします。
コアコンセプトと実装の詳細
コードに飛び込む前に、分散キーバリューストアの理解の中心となるいくつかの重要な用語を明確にしましょう。
- キーバリューストア: データストレージの最も基本的な単位。各データ(値)は、キーによって一意に識別されます。辞書やハッシュマップのようなものと考えてください。
- 分散: スケーラビリティと耐障害性を実現するために、データはクラスター内の複数のノード(サーバー)に分散されます。
- 一貫性: すべてのクライアントが同時に同じデータを見るという保証を指します。さまざまな分散システムがさまざまな一貫性モデル(例:強い一貫性、結果整合性)を提供します。私たちのシンプルなストアでは、基本的なレベルの一貫性を目指します。
- レプリケーション: 一部のノードが失敗した場合でも利用可能性を確保するために、異なるノードにデータの複数のコピーを保存すること。
- ハッシュ/シャーディング: 特定のキーバリューストアがどのノードに配置されるかを決定するために使用されるメカニズム。ノードが追加または削除されたときにデータ移動を最小限に抑えるために、一貫性ハッシュは一般的な手法です。
- RPC(リモートプロシージャコール): あるコンピュータ上のプログラムが、あたかもローカル呼び出しであるかのように別のコンピュータ上のコードを実行できるようにする通信プロトコル。Goの
net/rpc
パッケージやgRPCが一般的な選択肢です。
私たちのシンプルな分散キーバリューストアは、次の側面に焦点を当てます。RPCを使用したクライアントサーバー通信、各ノードでの基本的なキーバリューストレージ、および基本的な分散戦略。
基本的なノード構造
私たちの分散キーバリューストアの各サーバーは「ノード」となります。各ノードは独自のローカルキーバリューストレージを維持します。簡単にするために、メモリ内のmap[string]string
を使用します。実際には、これはRocksDB、LevelDB、またはディスクファイルのような永続ストレージメカニズムによってバックアップされるでしょう。
ノード構造と、RPC経由で公開するメソッドを定義しましょう。
// node.go package main import ( "fmt" "log" "net" "net/rpc" "sync" ) // KVStoreはノード上のローカルキーバリューストレージを表します。 type KVStore struct { mu sync.RWMutex store map[string]string } // NewKVStoreはKVStoreの新しいインスタンスを作成します。 func NewKVStore() *KVStore { return &KVStore{ store: make(map[string]string), } } // Getは指定されたキーの値を取得します。 func (kv *KVStore) Get(key string, reply *string) error { kv.mu.RLock() defer kv.mu.RUnlock() if val, ok := kv.store[key]; ok { *reply = val return nil } return fmt.Errorf("key '%s' not found", key) } // Putはキーバリューストアのペアを保存します。 func (kv *KVStore) Put(pair map[string]string, reply *bool) error { kv.mu.Lock() defer kv.mu.Unlock() for key, value := range pair { kv.store[key] = value } *reply = true return nil } // Nodeは私たちの分散キーバリューストアのサーバーを表します。 type Node struct { id string address string kvStore *KVStore } // NewNodeは新しいNodeインスタンスを作成します。 func NewNode(id, address string) *Node { return &Node{ id: id, address: address, kvStore: NewKVStore(), } } // ServeはノードのRPCサーバーを起動します。 func (n *Node) Serve() { rpc.Register(n.kvStore) // KVStoreをRPC経由で公開するために登録します listener, err := net.Listen("tcp", n.address) if err != nil { log.Fatalf("Error listening on %s: %v", n.address, err) } log.Printf("Node %s listening on %s", n.id, n.address) rpc.Accept(listener) }
このnode.go
ファイルでは:
KVStore
はメモリ内キーバリューストアを管理し、同時アクセスを安全にするためにsync.RWMutex
を使用しています。Get
とPut
はRPCで公開されるメソッドです。結果を返すためにreply
引数がどのように使用されているかに注意してください。Node
はノードのIDとそのKVStore
をカプセル化します。Serve
はRPCサーバーをセットアップし、KVStore
メソッドをリモートからアクセス可能にします。
クライアントインタラクション
クライアントはGet
またはPut
操作を実行するために、ノードのいずれかに接続する必要があります。簡単にするために、私たちのクライアントは特定のノードに直接接続します。より高度なシステムでは、ディスカバリサービスまたはロードバランサーが必要になります。
// client.go package main import ( "fmt" "log" "net/rpc" ) // Clientはキーバリューストアのクライアントを表します。 type Client struct { nodeAddress string rpcClient *rpc.Client } // NewClientは特定のノードに接続された新しいClientを作成します。 func NewClient(nodeAddress string) (*Client, error) { client, err := rpc.DialHTTP("tcp", nodeAddress) if err != nil { return nil, fmt.Errorf("error dialing RPC server at %s: %v", nodeAddress, err) } return &Client{ nodeAddress: nodeAddress, rpcClient: client, }, } // GetはノードへのリモートGetメソッドを呼び出します。 func (c *Client) Get(key string) (string, error) { var reply string err := c.rpcClient.Call("KVStore.Get", key, &reply) if err != nil { return "", fmt.Errorf("error calling Get for key '%s': %v", key, err) } return reply, nil } // PutはノードへのリモートPutメソッドを呼び出します。 func (c *Client) Put(key, value string) error { args := map[string]string{key: value} var reply bool err := c.rpcClient.Call("KVStore.Put", args, &reply) if err != nil { return fmt.Errorf("error calling Put for key '%s': %v", key, err) } if !reply { return fmt.Errorf("put operation failed for key '%s'", key) } return nil } // CloseはRPCクライアント接続を閉じます。 func (c *Client) Close() error { return c.rpcClient.Close() }
client.go
では:
NewClient
は特定のノードアドレスへのRPC接続を確立します。Get
とPut
はRPC呼び出しをラップし、リモート通信をユーザーから抽象化します。
複数のノードのオーケストレーション
「分散」の側面を実証するには、複数のノードを実行し、それらと対話する方法が必要です。この例では、同じmain
関数内で別々のゴルーチンとして実行します。実際のデプロイメントでは、これらは異なるマシン上の別々のプロセスになります。
// main.go package main import ( "log" "time" ) func main() { // ノード1を起動 node1 := NewNode("node-1", ":8001") go node1.Serve() time.Sleep(time.Millisecond * 100) // ノードが起動するのを少し待つ // ノード2を起動(将来の分散ロジックを実証するため) node2 := NewNode("node-2", ":8002") go node2.Serve() time.Sleep(time.Millisecond * 100) // ノードが起動するのを少し待つ // --- ノード1とのクライアントインタラクション --- log.Println("--- Client interacting with Node 1 ---") client1, err := NewClient(":8001") if err != nil { log.Fatalf("Failed to create client for Node 1: %v", err) } defer client1.Close() // データをPutする err = client1.Put("name", "Alice") if err != nil { log.Printf("Error putting 'name': %v", err) } else { log.Println("Put 'name: Alice' successful on Node 1") } err = client1.Put("city", "New York") if err != nil { log.Printf("Error putting 'city': %v", err) } else { log.Println("Put 'city: New York' successful on Node 1") } // データをGetする val, err := client1.Get("name") if err != nil { log.Printf("Error getting 'name': %v", err) } else { log.Printf("Got 'name': %s from Node 1", val) } val, err = client1.Get("country") if err != nil { log.Printf("Error getting 'country': %v", err) } else { log.Printf("Got 'country': %s from Node 1", val) } // --- ノード2とのクライアントインタラクション(最初は空) --- log.Println("\n--- Client interacting with Node 2 ---") client2, err := NewClient(":8002") if err != nil { log.Fatalf("Failed to create client for Node 2: %v", err) } defer client2.Close() val, err = client2.Get("name") // このキーはノード1にPutされた if err != nil { log.Printf("Error getting 'name' from Node 2 (expected): %v", err) } else { log.Printf("Got 'name': %s from Node 2", val) } err = client2.Put("language", "Go") if err != nil { log.Printf("Error putting 'language': %v", err) } else { log.Println("Put 'language: Go' successful on Node 2") } val, err = client2.Get("language") if err != nil { log.Printf("Error getting 'language' from Node 2: %v", err) } else { log.Printf("Got 'language': %s from Node 2", val) } log.Println("\n--- Operations complete. Press Ctrl+C to exit ---") select {} // mainゴルーチンを維持 }
main.go
では:
- 異なるポートでリッスンする2つのノード、
node-1
とnode-2
を起動します。 - 次に、各ノードと個別にやり取りするためのクライアントを作成します。
node-1
に書き込まれたデータは、node-2
で自動的に利用可能にならないことに注意してください。これは、分散戦略の必要性を強調しています。
真の分散ストアのための次のステップ
現在のセットアップは、基本的なRPC通信とローカルストレージを示しています。機能的な分散キーバリューストアにするには、次のものを追加する必要があります。
- 分散レイヤー: 特定のキーがどのノードに保存されるべきかを決定するコンポーネント(例:コーディネーターまたは一貫性ハッシュリング)。クライアントが
Put
またはGet
を実行すると、このレイヤーはリクエストを正しいノードに転送します。 - レプリケーション: データが保存されると、耐障害性を確保するためにいくつかの他のノードにレプリケートされるべきです。
node-1
がダウンした場合、それが保持していたデータはレプリカからアクセス可能であるべきです。 - 一貫性プロトコル: レプリケートされたデータが、特に書き込みや障害の際に、複数のノード間で一貫性を保つことを保証するプロトコル(RaftやPaxosのような)を実装します。
- 障害検出と回復: ノードがオフラインになったときに検出し、そのデータを自動的に復元またはクラスターを再バランスするためのメカニズム。
- 永続化: ノードが再起動したときにデータが失われないように、キーバリューストアをディスクに保存します。
アプリケーションシナリオ
このようなシンプルな分散キーバリューストアは、多くの実際のアプリケーションの基盤を形成します。
- キャッシング: データベースの負荷を軽減するために、頻繁にアクセスされるデータを格納します。
- セッション管理: 複数のWebサーバー間でユーザーセッションデータを格納します。
- 構成管理: アプリケーション構成をさまざまなサービスに分散します。
- リーダー選出: 分散システムでプライマリノードを選択するために使用されます。
- シンプルなデータストレージ: 高い読み取り/書き込みスループットを必要とし、複雑なトランザクション能力やクエリパターンを必要としないアプリケーション。
結論
Goで分散キーバリューストアを構築することは、並行処理とネットワークに適した言語を使用して、分散システムのコアコンセプトを探求する素晴らしい機会を提供します。私たちの例は初歩的ですが、関係する複雑さの基礎を築いています。RPCと並行処理のためにGoの堅牢な標準ライブラリを活用することで、スケーラブルで回復力のあるデータストレージソリューションを作成できます。真に本番環境対応の分散キーバリューストアは、これらの基本を基盤とし、分散、一貫性、および耐障害性のための洗練された機能を追加することになります。本質的に、分散キーバリューストアは、ネットワーク全体に分散された、非常にスケーラブルで耐障害性のあるハッシュテーブルです。