※このブログは2016年11月9日に公開された英語ブログ「A Deep Dive into TiKV」の拙訳です。
Table of Content
- About TiKV
- Architecture
- Protocol
- Raft
- Placement Driver (PD)
- Transaction
- Coprocessor
- Key processes analysis
TiKVについて
TiKV (発音: /’taɪkeɪvi:/ tai-K-V, 語源: titanium) は、Google Spanner、F1、HBaseの設計に基づく分散Key-Valueデータベースですが、分散ファイルシステムには依存せずよりシンプルな構成になっています。
アーキテクチャ
- Placement Driver (PD):PDはTiKVシステムの頭脳であり、ノード、ストア、リージョンのマッピングに関するメタデータを管理し、データ配置と負荷分散の決定を行います。PDは定期的にレプリケーションの制約をチェックし、負荷とデータのバランスを自動的に調整します。
- ノード:クラスタ内の物理ノード。各ノード内には、1つまたは複数のストアが存在します。各ストア内には、多数のリージョンが存在します。
- ストア:各ストアの中にRocksDBがあり、ローカルディスクにデータを保存します。
- リージョン:リージョンはKey-Valueデータ移動の基本単位であり、ストア内のデータ範囲に相当します。それぞれのリージョンは複数のノードにレプリケートされます。これらの複数のレプリカはRaftグループを形成します。リージョンのレプリカはピアと呼ばれます。
プロトコル
TiKVは異なるコンポーネント間の相互作用にProtocol Buffer プロトコルを使用しています。Rustは当時gRPC をサポートしていないため、以下の形式の独自のプロトコルを使用します。
Message: Header + Payload
Header: | 0xdaf4(2 bytes magic value) | 0x01(version 2 bytes) | msg_len(4 bytes) | msg_id(8 bytes) |
Protocol BufferのデータはメッセージのPayload部に格納されます。ネットワークレベルでは、まず16バイトのHeaderを読み出します。Headerに含まれるメッセージ長 (msg_len
)情報により、実際のメッセージ長を計算し、対応するデータを読み出してデコードします。
TiKVのインタラクションプロトコルは kvproto
プロジェクトに、プッシュダウンに対応するプロトコルは tipb
プロジェクトにあります。ここでは、kvproto
プロジェクトにのみ注目しましょう。
以下は、kvproto
プロジェクト内のプロトコルファイルです:
msgpb.proto
: プロトコルのやり取りは、すべて同じメッセージ構造になっています。メッセージを受信したら、そのメッセージのMessageType
に従って処理します。metapb.proto
: ストア、リージョン、ピアなどのパブリックメタデータを定義します。raftpb.proto
: Raftの内部用のプロトコルです。etcdからポートされており、etcdとの整合性が必要です。raft_serverpb.proto
: Raftノード間のインタラクション用のプロトコルです。raft_cmdpb.proto
: Raftが適用されたときに実際に実行されるコマンドです。pdpb.proto
: TiKVとPD間の相互作用のためのプロトコルです。kvrpcpb.proto
: トランザクションをサポートするKey-Valueプロトコルです。mvccpb.proto
:内部のMVCC (Multi-Version Concurrency Control) 用のプロトコルです。coprocessor.proto
: Push-Down操作に対応するためです。
外部アプリケーションからTiKVに接続するには、以下の方法があります。
- 単純なKey-Value機能のみを使用する場合は、
raft_cmdpb.proto
を実装してください。 - Transactional Key-Value機能については、
kvrpcpb.proto
を実装してください。 - Push-Down機能については、
coprocessor.proto
を実装してください。Push-Downプロトコルの詳細については、tipbを参照してください。
Raft
TiKVは、分散システムにおけるデータの一貫性を確保するためにRaftアルゴリズムを使用しています。詳細については、The Raft Consensus Algorithmを参照してください。
TiKVのRaftは、etcdから完全に移行されています。etcd Raftを選択した理由は、実装が非常に簡単で移行も容易であり、運用実績があるためです。
TiKVのRaftの実装は独立して使用することができます。お客様のプロジェクトに直接適用することができます。
Raftの使用方法については、以下をご参照ください。
- 独自のストレージを定義し、Raft Storage traitを実装します。以下のStorage traitのインターフェースをご参照ください。
// initial_state returns the information about HardState and ConfState in Storage fn initial_state(&self) -> Result<RaftState>; // return the log entries in the [low, high] range fn entries(&self, low: u64, high: u64, max_size: u64) -> Result<Vec<Entry>>; // get the term of the log entry according to the corresponding log index fn term(&self, idx: u64) -> Result<u64>; // get the index from the first log entry at the current position fn first_index(&self) -> Result<u64>; // get the index from the last log entry at the current position fn last_index(&self) -> Result<u64>; // generate a current snapshot fn snapshot(&self) -> Result<Snapshot>;
- rawノードオブジェクトを作成し、対応する構成とカスタマイズされたストレージインスタンスをオブジェクトに渡します。構成については、
election_tick
とheartbeat_tick
に注目する必要があります。Raftのロジックの中には、周期的なTickでステップを踏むものがあります。Tickごとに、Leaderはheartbeatの経過の頻度がheartbeat_tick
の頻度を超えているかどうかを判断します。もしそうなら、LeaderはFollowerにheartbeatを送り、elapseをリセットします。Followerについては、election elapsingの頻度がelection_tick
の頻度を超えた場合、Followerは選挙を開始します。 - rawノードが作成されると、rawノードのtickインターフェースが定期的に(100ms毎など)呼び出され、内部のRaft Step関数を実行します。
- Raftがデータを書き込む場合は、Proposeインターフェースが直接呼び出されます。Proposeインターフェースのパラメータは任意のバイナリデータであり、Raftが複製するデータの厳密なな内容は気にしないことを意味します。データをどのように扱うかは、完全に外部ロジックに任されています。
- メンバーシップの変更処理であれば、rawノードの
propose_conf_change
インターフェースを呼び出し、あるノードを追加/削除するためのConfChangeオブジェクトを送信することができます。 - RawノードのTickやProposeなどの関数が呼び出された後、RaftはReady状態を開始します。ここでは、Ready状態の詳細について説明します。
Ready状態には3つの部分があります。
- Raftのストレージに保存する必要がある部分:エントリ、ハードステート、スナップショット。
- 他のRaftノードに送信する必要がある部分:メッセージ。
- 他のステートマシンに適用する必要がある部分:commit_entries。
Ready状態を処理した後、Advance関数を呼び出してRaftに次のReady処理を通知する必要があります。
TiKVでは、以下のような流れでmioを通してRaftを使用します。
- ベースとなるRaftのティックタイマーを登録する(通常100ms)。タイマーがタイムアウトするたびにrawノードのTickが呼び出され、タイマーの再登録が行われます。
- mioのnotify関数で外部からのコマンドを受信し、Proposeまたは
propose_conf_change
インターフェースを呼び出します。 - mio tickのコールバックでRaftの準備ができたかどうかを判断します(※mio tickは各イベントループの最後で呼び出され、Raftのtickとは異なります)。ReadyであればReadyの処理に進みます。
上記の説明では、1つのRaftのみを使用する方法を取り上げました。しかし、TiKVでは、複数のRaftグループがあります。これらのRaftグループは互いに独立しているため、同じ手法で処理することができます。
TiKVでは、各Raftグループが1つのリージョンに対応します。TiKVでは、最初のうちは範囲(-inf, +inf)を担当するリージョンが1つだけ存在します。より多くのデータが入ってきて、リージョンがその閾値(当時は64MB)に達すると、リージョンは2つのリージョンに分割されます。TiKVのデータはすべてキーによってソートされているため、Split Keyを選択してリージョンを分割すると非常に便利です。詳しい分割の方法はSplitを参照してください。
もちろん、SplitがあるところにはMergeもあります。隣接する2つのリージョンにほとんどデータがない場合、これら2つのリージョンは1つの大きなリージョンにマージすることができます。Region MergeはTiKVのロードマップにありますが、まだ実装されていません。(※記022年現在は実装されています)
Placement Driver
Placement Driver (PD) は、TiKVクラスタ全体の管理とスケジューリングを担当しています。中核的なサービスであるため、高い可用性と安定性を確保する必要があります。
まず解決しなければならないのは、PDの単一障害点です。私たちの解決策は、複数のPDサーバを起動することです。これらのサーバーは、etcdの選挙メカニズムによってリーダーを選出し、リーダーは外部にサービスを提供します。リーダーがダウンした場合は、再度選挙を行い、新しいリーダーを選出し、サービスを提供します。
2つ目の問題は、PDに保存されているデータの整合性です。あるPDがダウンした場合、新たに選出されたPDが一貫性のあるデータを持つようにするにはどうすればよいでしょうか。これもPDのデータをetcdに置くことで解決します。etcdは分散型の一貫したKey-Valueストアなので、そこに格納されているデータの一貫性を確保することができるのです。新しいPDが起動したら、etcdからデータを読み込むだけでいいのです。
当初は独立した外部のetcdサービスを使っていましたが、現在はPDをetcdに組み込み、PD自体がetcdであることを意味しています。埋め込みによって、サービスが1つ減るので、デプロイが簡単になります。また、PDやetcdがカスタマイズしやすくなるため、パフォーマンスも向上します。
現在のPDの機能は以下の通りです。
- Timestamp Oracle (TSO) サービス:TiDBが分散トランザクションを実装するためのグローバルにユニークなタイムスタンプを提供します。
- グローバルユニークIDの生成:TiKVが新しいリージョンとストアにユニークIDを生成できるようにします。
- TiKVクラスタのauto-balance:TiKVでは、データ移動の基本単位はリージョンなので、PD auto-balanceはリージョンを自動的にバランスさせます。リージョンのスケジューリングのトリガーには2つの方法があります。
1). ハートビートトリガー:リージョンは定期的にPDに現在の状態を報告します。もしPDがあるリージョンにレプリカが足りない、あるいは多すぎると判断した場合、PDはこのリージョンにメンバーシップの変更を開始するように通知します。
2). 定期的なトリガーリング:PDは、システム全体が定期的にスケジューリングを必要としているかどうかをチェックします。もしPDが、あるストアに十分なスペースがない、あるいはあるストアにリーダーリージョンが多すぎて負荷が高いことを発見したら、PDはそのストアからリージョンを選択し、レプリカを他のストアに移動させます。
トランザクション
TiKVのトランザクションモデルはGoogle PercolatorとThemis from Xiaomiにヒントを得て、以下の最適化を行っています。
- Percolatorのようなシステムでは、単調増加するタイムスタンプを割り当てるために、Timestamp Oracle (TSO) と呼ばれるグローバルにユニークなタイムサービスが必要です。TSOの機能はTiKVではPDで提供されています。PDでのTSOの生成は純粋なメモリ操作であり、PDが再起動してもTSOが単調増加するように、定期的にTSO情報をetcdに保存します。
- Percolatorが特定の行にカラムを追加してLockなどの情報を保存するのに対して、TiKVではRocksDBのカラムファミリー(CF)を使ってLockに関する情報をすべて扱えるようにしています。大量データの場合、同時トランザクションでロックされる行はそれほど多くありません。そこで、最適化された追加のCFにLockを配置することで、処理速度を大幅に向上させることができます。
- また、追加のCFを使用することで、残ったLockの後始末が容易になるというメリットもあります。ある行のLockをトランザクションが取得した後、スレッドのクラッシュなどによりクリーンアップされず、そのLockにアクセスする後続のトランザクションが存在しない場合、Lockは残されたままになります。CFをスキャンすることで、これらのLockを簡単に発見し、クリーンアップすることができます。
分散トランザクションの実装は、TSOサービスと、TiDBに実装されている対応するトランザクションアルゴリズムをカプセル化したクライアントに依存します。単調増加するタイムスタンプは同時進行するトランザクションの時系列を設定でき、外部クライアントはトランザクションの競合や予期せぬ終了を解決するための調整役として機能します。
それでは、トランザクションがどのように実行されるかを見てみよう。
- トランザクションの開始フェーズ。トランザクションが開始するとき、クライアントはTSOから現在のタイムスタンプ(startTS)を取得しなければなりません。TSOはタイムスタンプの単調増加を保証しているので、startTSはトランザクションの時系列を識別するために使用することができます。
- トランザクションの処理中フェーズ。トランザクションの間、すべての読み取り操作は、TiKVにRPCリクエストを送る間に
startTS
を運ばなければならず、TiKVはMVCCを使用して、startTS
以前に書き込まれたデータを確実に返すようにします。書き込み操作については、TiKVは楽観的同時実行制御を使用します。これは、現在のトランザクションが他のトランザクションに影響を与えないと仮定して、実際のデータをサーバーに書き込むのではなく、クライアント上にキャッシュすることを意味します。 - トランザクションのコミットフェーズ。TiKVは2フェーズコミットアルゴリズムを使用します。一般的な2フェーズコミットとの違いは、独立したトランザクションマネージャが存在しないことです。トランザクションのコミット状態は、コミットすべきキーの1つから選択された
PrimaryKey
のコミット状態によって識別されます。1).
Prewrite
フェーズでは、クライアントは書き込むべきデータを複数のTiKVサーバーに提出します。サーバーにデータが格納されると、サーバーは対応するKeyをLockedに設定し、そのトランザクションのPrimaryKeyを記録します。いずれかのノードで書き込みの競合が発生した場合、トランザクションは中断されロールバックされます。2).
Prewrite
が終了すると、TSOから新しいタイムスタンプが取得され、commitTSとして設定されます。3). コミットフェーズでは、
PrimaryKey
でTiKVサーバーにリクエストを送信します。TiKVのコミット処理の流れは、PrimaryKeyフェーズのLocksをクリーンアップし、commitTSで対応するコミットレコードを書き込みます。PrimaryKey
のコミットが終了すると、そのトランザクションはコミットされます。他のKeyに残っているLockは、Primarykey
の状態を取得することで、コミット状態とそれに対応するcommitTSを取得することができます。しかし、その後のLockの後始末のコストを削減するために、現実的にはトランザクションに関与する全てのKeyをバックエンドで非同期的に投入することになります。
Coprocessor
HBaseと同様に、TiKVはCoprocessorをサポートしています。しかし、今のところCoprocessorは動的にロードすることができず、静的にコードにコンパイルする必要があります。(※2022年現在はCoprocessorは動的にロードできます)
現在、TiKVのCoprocessorは主にSplitとpush-downの2つの場面で使用されており、どちらもTiDBにサービスを提供するために使用されています。
- Splitについては、リージョンの分割要求が本当に提案される前に、その分割キーが合法かどうかをチェックする必要があります。例えば、TiDBのある行について、TiKVにはV1、V2、V3といった多くのバージョンが存在し、V3が最新バージョンとします。仮にV2が選択された分割キーであるとすると、行のデータは2つの異なるリージョンに分割される可能性があり、行のデータをアトミックに扱うことができません。そこで、分割Coprocessorは、分割キーをV1に調整します。こうすることで、この行のデータは、分割中も同じリージョンに存在することになります。
- Push-downでは、TiDBのパフォーマンスを向上させるためにCoprocessorが使用されます。select count (*) のような一部の操作では、TiDBがまず行から行へデータを取得し、その後カウントする必要はありません。より速い方法は、TiDBがこれらの操作を対応するTiKVノードにpush-downし、TiKVノードが計算を行い、TiDBが最終結果をコンソリデーションする方法です。
完全なpush-down処理がどのように機能するかを示すために、select count(*) from t1
を例にとって説明しましょう。
- TiDBはSQL文を解析した後、t1テーブルの範囲に基づいて、t1のすべてのデータがTiKV上のリージョン1とリージョン2にあることを発見し、TiDBはpush-downコマンドをリージョン1とリージョン2に送信します。
- リージョン1とリージョン2はpush-downコマンドを受け取った後、Raft処理によって別々にデータのスナップショットを取得します。
- リージョン1とリージョン2はそれぞれのスナップショットをスキャンして、対応するデータを取得し、
count()
を計算します。 - 各リージョンは
count()
の結果をTiDBに返し、TiDBはその結果を集約して合計値を出力します。
キープロセス分析
Key-Valueオペレーション
TiKVにGetやPutのリクエストが送られたとき、TiKVはどのように処理するのでしょうか。
前述したように、TiKVは単純なKey-Value、トランザクションKey-Value、push-downといった機能を提供しています。しかし、トランザクションKey-Valueであろうとpush-downであろうと、TiKVでは単純なKey-Value操作に変換されることになります。そこで、単純なKey-Value操作を例にして、TiKVがどのようにリクエストを処理するのかを紹介します。TiKVがどのようにトランザクションKey-Valueとpush-downサポートを実装しているかについては、後で説明します。
Put
を例にして、完全なKey-Value処理がどのように行われるかを紹介します。
- クライアントはTiKVに
put k1 v1のようなPUT
コマンドを送信します。まず、クライアントはPDからk1キーのリージョンIDとリージョンピアのリーダーを取得します。次に、クライアントは対応するTiKVノードにPut要求を送信します。 - TiKVサーバーはリクエストを受信すると、mioチャネルを通じて内部のRaftStoreスレッドに通知し、コールバック関数を受け取ります。
RaftStore
スレッドがリクエストを受け取ると、まずリクエストが合法かどうか、リクエストが合法なエポックであるかどうかを含めてチェックします。リクエストが合法で、かつ相手がリージョンのリーダーであれば、RaftStoreスレッドはリクエストをバイナリ配列にエンコードし、Proposeを呼び出してRaft処理を開始します。- ハンドルが準備できた段階で、新たに生成されたエントリーはまずRaftのログに追記され、同時に他のフォロワーに送信されます。
- リージョンの大半のノードがログにエントリーを追記した時点で、エントリーはコミットされます。次のReady処理では、
committed_entries
からエントリーを取得し、デコードして対応するコマンドを実行することができます。RocksDBでは、このようにしてput k1 v1
コマンドが実行されます。 - リーダーによってエントリーのログが適用されると、そのエントリーのコールバックが呼ばれ、クライアントに応答が返されます。
また、Getについても同様の処理が行われ、Raftによって大半のノードにレプリケートされるまでは全てのリクエストが処理されないということになります。もちろん、これも分散システムにおけるデータの線形化可能性を確保するためです。
もちろん、以下のような観点で読み取り要求の最適化を行い、パフォーマンスを向上させます。
- リーダーにリースを導入します。リースの中では、リーダーが有効であると仮定することができるので、リーダーは直接読み取りサービスを提供することができ、Raftの複製ログを経由する必要はありません。
- フォロワーは読み込みサービスを提供します。
これらの最適化はRaftの論文で言及されており、etcdでもサポートされています。今後、TiKVにも同様に導入していく予定です。(※2022年現在は導入されています)
Membership Change
データの安全性を確保するために、異なるストアに複数のレプリカが存在します。各レプリカは他のレプリカのピアです。あるリージョンのレプリカの数が足りない場合、新しいレプリカを追加し、逆にあるリージョンのレプリカの数が閾値を超えた場合、いくつかのレプリカを削除することになります。
TiKVでは、Raft Membership Changeによって、リージョンのレプリカの変更が完了します。しかし、いつどのようにリージョンのMembership Changeするかは、PDによってスケジューリングされます。ここでは、レプリカの追加を例にして、全体の流れを説明します。
- リージョンは定期的にPDにハートビートを送信します。ハートビートには、このリージョンに関する相対的な情報、例えばピアの情報などが含まれます。
- PDはハートビートを受信すると、このリージョンのレプリカの数が設定と一致しているかどうかを調べます。仮にこのリージョンにはレプリカが2つしかなく、セットアップでは3つのレプリカがあるとして、PDは適切なストアを探し、リージョンにChangePeerコマンドを返します。
- リージョンはChangePeerコマンドを受け取った後、別のストアにレプリカを追加する必要があると判断すると、RaftプロセスでChangePeerリクエストを送信します。ログが適用されると、新しいピア情報がリージョンメタに更新され、メンバシップの変更が完了します。
なお、Membership Changeが完了しても、それはリージョンによってレプリカの情報がメタに追加されただけであることに注意する必要があります。その後、リーダーが新しいフォロワーにデータがないことを発見した場合、そのフォロワーにスナップショットを送信することになります。
また、TiKVやetcdにおけるMembership Changeの実装は、Raftの論文にあるものとは異なっていることに注意する必要があります。Raft論文では、新しいピアが追加された場合、ProposeコマンドでRegion metaに追加されます。しかし、簡略化のため、TiKVとetcdはログが適用されるまでRegion metaにピア情報を追加しません。
Split
最初のうちは、リージョンは1つだけです。データが大きくなるにつれて、リージョンは分割される必要があります。
TiKVでは、リージョンが分割された場合、2つの新しいリージョンが作成されます。左のリージョンは古いリージョンのIDを全て使用します。リージョンはその範囲を変更するだけと考えることができます。右のリージョンはPDによって新しいIDを取得します。以下は簡単な例です。
Region 1 [a, c) -> Region 1 [a, b) + Region 2 [b, c)
リージョン1の元の範囲は(a, c)で、b点で分割した後、左のリージョンはリージョン1のままですが、範囲は(a, b)になります。右のリージョンは新しいリージョン、リージョン2となり、その範囲は(b, c)です。
リージョン1の基本サイズが64MBだと仮定します。完全な分割処理は次のようになります。
- ある期間内に、リージョン1のデータの累積サイズが閾値(例えば 8MB)を超えると、リージョン1は分割チェッカーに通知してリージョン1をチェックさせます。
- 分割チェッカーは、リージョン1を順次スキャンします。あるキーの累積サイズが 64MBを超えていることを発見すると、そのキーの記録を残し、それを分割キーとします。一方、分割チェッカーはスキャンを続け、あるキーの累積サイズが閾値(例えば96MB)を超えたことを発見すると、このリージョンは分割可能だと判断し、RaftStoreスレッドに通知します。
- RaftStoreスレッドはこのメッセージを受け取ると、PDにAskSplitコマンドを送信し、PDに対して新たに生成されたPD、例えばリージョン2に対する新しいIDを割り当てるよう要求します。
- PDでIDが生成されると、Admin SplitRequestが生成され、RaftStoreスレッドに送信されます。
- RaftStoreがAdmin SplitRequestを提案する前に、Coprocessorはコマンドを前処理して、分割キーが適切かどうかを判断します。分割キーが適切でない場合、Coprocessorは分割キーを適切なものに調整します。
- 分割要求は、Raftプロセスを通じて送信され、その後適用されます。TiKVの場合、リージョンの分割は、元のリージョンの範囲を変更し、別のリージョンを作成します。これらの変更はすべて、Region metaの変更のみを含み、内部の実データは移動しないので、TiKVでのリージョン分割は非常に高速に行われます。
- 分割が完了すると、TiKVは左リージョンと右リージョンに関する最新の情報をPDに送信します。
TiDB Cloud Dedicated
TiDB Cloudのエンタープライズ版。
専用VPC上に構築された専有DBaaSでAWSとGoogle Cloudで利用可能。
TiDB Cloud Serverless
TiDB Cloudのライト版。
TiDBの機能をフルマネージド環境で使用でき無料かつお客様の裁量で利用開始。