Raft Engine:TiKVにおける複数Raftログのための
ログ構造化組込みストレージエンジン
重要なポイント
- TiDBはオープンソースの分散型NewSQLデータベースで、HTAP(Hybrid Transactional and Analytical Processing)ワークロードをサポートしています。TiKVは、TiDBのための行指向のキーバリューストレージエンジンを提供しています。
- TiKVの以前のバージョンでは、Raft ログの保存にRocksDBが使用されていました。しかし、ディスクI/Oが膨大になっていき、膨大なディスクI/Oは、高いコストと不安定なパフォーマンスをもたらします。
- ディスクI/Oを削減するために、BitCaskをベースにしたRaft Engineを設計・実装しています。
- Raft EngineはRaftのログをディスク上に順次的に保存し、インメモリインデックスがログを指し示します。
- BitCaskと比較して、Raftのログはディスクに書き込まれる前に圧縮され、ヒントファイルは存在しません。これらの改善により、ディスクI/Oがさらに削減されます。
- 当社の内部ベンチマークによると、Raft Engineはスループットを最大4%向上させ、テールレイテンシーを20%削減しました。また、書き込みI/Oを25%〜40%削減し、重いワークロードの下でCPU使用率を12%削減しました。
***
TiDBはオープンソースの分散型NewSQLデータベースで、HTAP(Hybrid Transactional and Analytical Processing)ワークロードをサポートしています。MySQLと互換性があり、水平方向のスケーラビリティ、強力な一貫性、高可用性を備えています。
TiKVは、TiDBのための行指向のキーバリューストレージエンジンを提供します。TiKVのデータは、レプリケーションとロードバランシングのために最小のデータ単位である複数のRegionに分割されます。高可用性(HA)を実現するために、各Regionは3回レプリケーションされ、異なるTiKVノードに分散されます。1つのRegionに含まれるレプリカはRaftグループを形成します。

TiKVのアーキテクチャ
保存する必要があるデータは、データそのものとRaftのログの2つの部分です。各TiKVノードでこれらを保存するために、2つのRocksDBインスタンスが使用されます。以前のバージョンでは、RaftのログはRocksDBに保存され、そこでキーと値のペアに変換されます。RocksDBを使用してRaftログを保存すると、書き込み増幅(WA)が発生し、膨大な量のディスクI/Oが発生します。TiDB 5.4 から、TiKVのマルチRaftログ用に、ログ構造化された組み込みストレージエンジンであるRaft Engineが導入されました。
この記事では、Raft Engineの必要性、設計と実装の方法、そしてパフォーマンス面での利点について深く掘り下げていきます。
Raftのログを保存するためにRocksDBを使用する際の問題点
RocksDBを使用してRaftのログを保存する際の最大の問題点は、大量のディスクI/Oが発生することです。RocksDBでは、ディスクI/Oは2箇所から発生します。
まずRocksDBでは、キーと値のペアがWrite Ahead Log(WAL)に挿入され、その後RocksDBのMemTableにデータが書き込まれます。MemTableのデータが一定のサイズに達すると、RocksDBはディスク内のSST(Sorted String Table)ファイルに内容をフラッシュします。つまり、同じ情報が2回ディスクに書き込まれることになります。
また、RocksDBはWA(Write Amplification)を搭載しています。WAとは、ストレージデバイスに書き込まれるデータ量と、ユーザーが書き込むデータ量との比率のことです。
Log-structured merge tree (LSM-tree)ベースのKVストアは、頻繁にコンパクションを行うため、WAが高いという批判を長年受けてきました。データセットのサイズが大きくなると、LSMツリーの深さだけでなく、全体のWAも増加します。WAの増加は、ストレージの帯域幅を消費し、フラッシュ処理と競合し、最終的にはアプリケーションのスループットを低下させることになります。
TiDBをクラウド環境でデプロイしたり、TiDB Cloud(フルマネージドTiDBサービス)を利用したりすると、RocksDBのディスクI/Oの大量発生が顕在化します。その理由は2つ挙げられます。まず、ディスクI/Oはクラウド・ベンダーによって別途請求される場合があります。ディスクI/Oが大きいということは、一般的に請求額が大きいということです。さらに、RocksDBのディスクI/Oの量は大きく変動するため、QoS(Quality of Service)に影響を与える可能性があります。
Raft Engineの設計
BitCaskにヒントを得て、TiKVに複数のRaftログを保存するために、ログ構造を持つ持続的な組み込みストレージエンジンであるRaft Engineを設計・実装しました。以下の図2は、簡略化したRaft Engineのアーキテクチャを示しています。

簡略化したRaft Engineのアーキテクチャ
ディスク上では、書き込みリクエストは、キーと実際のデータの両方が、アクティブなアペンドのみのログファイルに順次的に書き込まれます。設定可能な閾値に達すると、新しいファイルが作成されます。
MemTableは、メモリ上のハッシュテーブルです。ログのキーとそのキーのページ位置(オフセット)が格納されています。MemTableは、ディスク上のストレージのインデックスになっています。MemTableはいくつかの部分に分かれており、読み込みや書き込みのリクエストが来たとき、MemTableルーターはそのリクエストを対応するMemTableに誘導します。次のコードは、MemTableのデータ構造を示しています。
pub struct MemTable { /// The ID of the current Raft Group. region_id: u64, /// Container of entries. Incoming entries are pushed to the back with /// ascending log indexes. entry_indexes: VecDeque<ThinEntryIndex>, /// The log index of the first entry. first_index: u64, /// The amount of rewritten entries. Rewritten entries are the oldest /// entries and stored at the front of the container. rewrite_count: usize, /// A map of active key value pairs. kvs: BTreeMap<Vec<u8>, (Vec<u8>, FileId)>, /// Shared statistics. global_stats: Arc<GlobalStats>, }
これまでの説明で、Raft Engineのハイレベルな設計を見てきました。ストレージエンジンとして、読み込み、挿入、更新、削除のリクエストにどのように対応するかが極めて重要です。以下では、TiKVからのログファイルの読み取り、挿入、更新、削除のリクエストに対して、Raft Engineがどのように応答するのかを詳しく説明します。
ログファイルの読み込み
ログファイルを読み込む際、リーダーはまずMemTableルータにアクセスし、対応するMemTableを探します。リーダーは、ログのキーに基づいてMemTableからログのページ位置を取得します。その情報が得られると、エンジンはディスク上のファイルから1回のディスクリードを実行します。engine.rsからログファイルを読み込むサンプルコードを用意しました。
pub fn get_message<S: Message>(&self, region_id: u64, key: &[u8]) -> Result<Option<S>> { let _t = StopWatch::new(&ENGINE_READ_MESSAGE_DURATION_HISTOGRAM); if let Some(memtable) = self.memtables.get(region_id) { if let Some(value) = memtable.read().get(key) { return Ok(Some(parse_from_bytes(&value)?)); } } Ok(None) }
ログファイルの挿入
TiKVからの挿入リクエストに応答するために、エンジンは以下の処理を行います。
- アクティブなデータファイル(ストリーム)にエントリとキー値を追加します。
- MemTableにエントリーを作成します。
この2つの操作はアトミックに実行されます。新しいレコードの挿入は、1回のディスク書き込みのみで、ディスクシークは行われません。engine.rsからログファイルを書き込むサンプルコードを用意しました。
また、Raft Engineは、lz4圧縮に対応しています。ディスクに書き込む前にデータを圧縮するオプションがあります。この圧縮は書き込みI/Oを減らすのに役立ちます。MemTableのレコードは圧縮されません。
pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result<usize> { let start = Instant::now(); let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; let block_handle = { let mut writer = Writer::new(log_batch, sync, start); if let Some(mut group) = self.write_barrier.enter(&mut writer) { let now = Instant::now(); let _t = StopWatch::new_with(&ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now); for writer in group.iter_mut() { ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM.observe( now.saturating_duration_since(writer.start_time) .as_secs_f64(), ); sync |= writer.sync; let log_batch = writer.get_payload(); let res = if !log_batch.is_empty() { self.pipe_log .append(LogQueue::Append, log_batch.encoded_bytes()) } else { Ok(FileBlockHandle { id: FileId::new(LogQueue::Append, 0), offset: 0, len: 0, }) }; writer.set_output(res); } if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) { panic!( "Cannot sync {:?} queue due to IO error: {}", LogQueue::Append, e ); } } writer.finish()? };
ログファイルの更新
ログファイルを更新するリクエストを提供することは、新しいKVペアを挿入することと似ています。違いは、MemTableに新しいエントリを作成する代わりに、既存のレコードが新しいページの位置で更新されることです。ディスク内の古い値はぶら下がり、パージ・フェーズのガベージ・コレクションで削除されます。
ログファイルの削除
TiKVがログファイルを削除したい場合、エンジンは2つのアトミック操作を実行します。それは :
- アクティブなデータストリームに新しいエントリを追加し、その値がtombstoneと等しくなるようにします。
- MemTableのエントリを削除します。
古いレコードがぶら下がったままになっているので、パージ・フェーズのガベージ・コレクションで削除されることになります。
Raft Engineの復旧
Raft Engineは再起動すると、ディスク上のログのキーを読み込み、それを元にMemTableを構築します。この点は、BitCaskと異なります。BitCaskでは、ディスク上にヒントファイルが保存されており、復旧に必要なデータが保持されています。Raft Engineでは、バックグラウンドで余分な書き込みI/Oが発生するため、ヒントファイルを利用しないようにしました。Raft Engineは、PingCAPの内部ベンチマークでは依然として非常に高速に復旧しています。Raft Engineに保存されたデータが10GB未満の場合、通常2秒未満で復旧することができます。
パージ
パージフェーズでは、ディスク上の古いエントリを永久に削除します。パージはRocksDBのコンパクションに似ていますが、Raft Engineのパージはより効率的です。
RocksDBのコンパクションフェーズでは、すべてのデータファイルをメモリに読み込んで結合し、ディスクにフラッシュする必要があります。Raft EngineはMemTableの全レコードのインデックスを持っているので、最新のデータを読み込んでファイルに結合し、ディスクにフラッシュするだけでよいのです。古くなったページは破棄されます。
データをいつパージするかを決めるのはユーザーです。Raft Engineは、ユーザーが`purge_expired_files()`ルーチンを呼び出したときだけ、ログファイルを統合して削除します。デフォルトでは、TiKVは10秒ごとにパージ関数を呼び出します。
また、Raft Engineは有用なフィードバックをユーザーに送信します。`purge_expired_files()` ルーチンが呼び出されるたびに、Raft Engineは特に古いログエントリを保持するRaft Groupsのリストを返します。これらのログエントリはガベージコレクションをブロックするので、ユーザーによってコンパクトにする必要があります。
設計上の考慮点
なぜ、Raft EngineがRocksDBよりも我々のニーズに合っているのでしょうか?
前述の通り、今回のシナリオではRocksDBは膨大な量のディスクI/Oを発生させます。RocksDBの値はキーでソートされているため、レンジクエリを実行する際に非常に良いパフォーマンスをもたらしますが、今回のケースではその必要はありません。キーでソートすると、どうしても書き込みの増幅が大きくなってしまいます。
Raft Engineにはこの機能がないため、ディスクI/Oの発生が少なくなっています。
なぜ複数のMemTableが存在するのでしょうか?
MemTableからのデータの読み取り、またはMemTableへのデータの書き込みが行われた場合、テーブルをロックする必要があります。後続のリクエストは、現在の操作が終了するまでキューで待機する必要があります。複数のMemTableを使用する場合、2つの要求が異なるMemTableにアクセスしても、互いにブロックされることはありません。
パフォーマンス評価
このセクションでは、TPC-Cに基づく内部ベンチマークの結果を示します。TiDBクラスタは以下のように設定しました。
- TiDBとPlacement Driver(PD)サーバーは、AWSのc5.4xlargeインスタンス上に配置しました。
- TiKVサーバーはr5.4xlargeインスタンスで、1TBのgp3 EBSボリュームを使用しています。
- TPC-Cベンチマークでは、5,000ウェアハウスを使用しています。
- 50スレッドと800スレッドで実験を行いました。
各実験は2時間です。


上の表から、RaftログのストレージとしてRocksDBを使用したTiKVと比較して、Raft EngineはQPSを最大4%向上させ、テールレイテンシーを20%削減していることがわかります。書き込みI/Oは25%〜40%削減されました。重いワークロードの場合、CPU使用率は12%減少しました。
実行例
試してみたいという方に、Raft EngineのGitHubリポジトリに実行例を用意しています。これは、TiKVが送信する書き込みリクエストを模倣したものです。このコードでは、まずRaft Engineのインスタンスを初期化し、そこにいくつかのログを書き込んでいます。また、ログを圧縮し、パージフェーズを初期化します。
このサンプルは以下の方法で実行できます。
$ RUST_LOG=debug cargo run --release --example append-compact-purge // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize}; use rand::thread_rng; use rand_distr::{Distribution, Normal}; #[derive(Clone)] pub struct MessageExtTyped; impl MessageExt for MessageExtTyped { type Entry = Entry; fn index(e: &Self::Entry) -> u64 { e.index } } // How to run the example: // $ RUST_LOG=debug cargo run --release --example append-compact-purge fn main() { env_logger::init(); let config = Config { dir: "append-compact-purge-data".to_owned(), purge_threshold: ReadableSize::gb(2), batch_compression_threshold: ReadableSize::kb(0), ..Default::default() }; let engine = Engine::open(config).expect("Open raft engine"); let compact_offset = 32; // In src/purge.rs, it's the limit for rewrite. let mut rand_regions = Normal::new(128.0, 96.0) .unwrap() .sample_iter(thread_rng()) .map(|x| x as u64); let mut rand_compacts = Normal::new(compact_offset as f64, 16.0) .unwrap() .sample_iter(thread_rng()) .map(|x| x as u64); let mut batch = LogBatch::with_capacity(256); let mut entry = Entry::new(); entry.set_data(vec![b'x'; 1024 * 32].into()); let init_state = RaftLocalState { last_index: 0, ..Default::default() }; loop { for _ in 0..1024 { let region = rand_regions.next().unwrap(); let state = engine .get_message::<RaftLocalState>(region, b"last_index") .unwrap() .unwrap_or_else(|| init_state.clone()); let mut e = entry.clone(); e.index = state.last_index + 1; batch.add_entries::<MessageExtTyped>(region, &[e]).unwrap(); batch .put_message(region, b"last_index".to_vec(), &state) .unwrap(); engine.write(&mut batch, false).unwrap(); if state.last_index % compact_offset == 0 { let rand_compact_offset = rand_compacts.next().unwrap(); if state.last_index > rand_compact_offset { let compact_to = state.last_index - rand_compact_offset; engine.compact_to(region, compact_to); println!("[EXAMPLE] compact {} to {}", region, compact_to); } } } for region in engine.purge_expired_files().unwrap() { let state = engine .get_message::<RaftLocalState>(region, b"last_index") .unwrap() .unwrap(); engine.compact_to(region, state.last_index - 7); println!( "[EXAMPLE] force compact {} to {}", region, state.last_index - 7 ); } } }