※このブログは2025年10月21日に公開された英語ブログ「How to Stream Data from Kafka to TiDB」の拙訳です。
現代のアプリケーションは、ユーザーによる操作、トランザクション、ログ、メトリクスなど、リアルタイムに発生する膨大なイベントデータを生成しています。こうしたスケールに対応するため、多くのチームはApache Kafkaを利用しています。Kafkaはアプリケーションとデータパイプラインを疎結合にし、信頼性の高い、高スループットなデータ配信を実現する分散メッセージングシステムです。
一方、ストレージ側ではTiDBが分散型のSQLデータベースとして、水平スケール、トランザクション処理と分析処理の両立、そして高負荷下でも低レイテンシなパフォーマンスを提供します。
KafkaとTiDBを組み合わせることで、高い書き込みスループットと高速なデータ処理が求められるリアルタイムワークロードに強力な基盤を構築できます。
この2部構成のブログチュートリアルでは、KafkaとTiDBを統合する方法を解説します。第1部では、KafkaからTiDBへデータをストリームする仕組みと、このアーキテクチャがますます人気を集めている理由を取り上げます。第2部では、Kafkaが毎秒数百万メッセージを処理する状況でのTiDBのパフォーマンスや、TiDBの内部パフォーマンスを監視する方法を紹介します。
なぜデータをKafka経由でストリーミングするのか?
最近携わったある顧客のプロジェクトでは、アプリケーションがメッセージを直接Kafkaに送信する構成を採用していました。Kafkaからは、SQL ServerやCassandraなどの永続ストレージ層へとデータが流れるようになっています。
このような設計は、大量の書き込みを扱うシステムでは一般的です。高負荷時にアプリケーションからデータベースへ直接書き込みを行うと、レイテンシが長期化する問題が発生し、アプリケーション全体のパフォーマンス低下につながる可能性があります。Kafkaはアプリケーションとデータベースの間でバッファとして機能し、頻繁な書き込みを一旦受け止め、非同期で処理してからストレージ層へ届けることで、この問題を緩和します。
データの取り込みと永続化を疎結合にすることで、トラフィックが急増した場合でもKafkaは安定したパフォーマンスと信頼性を維持することができます。

図1:Kafkaがアプリケーションのデータストリームを疎結合化する仕組み
TiDBとは?
TiDBは、水平スケーラビリティ、強い一貫性、高可用性を備えたオープンソースの分散型SQLデータベースです。コンピュートとストレージを分離したアーキテクチャを採用しており、それぞれのレイヤーを独立してスケールさせることができます。これはコストとパフォーマンスを最適化するうえで大きな利点となります。
TiDBはMySQL互換であるため、既存のアプリケーション、ドライバー、SQL構文をほとんど変更なしでそのまま利用できます。この互換性により、MySQL、PostgreSQL、MongoDBなど他のデータベースからの移行も容易になります。

図2:TiDBがKafkaを補完し統合ワークロードを実現する仕組み
まずは、クラウド上で動作しているテスト用のTiDBインスタンスの例から見てみましょう。
ankitkapoor@Ankits-MacBook-Air bin % ./mysql -uankit -hxxx -P 4000 -p
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| INFORMATION_SCHEMA |
| PERFORMANCE_SCHEMA |
| ankit |
| kafka |
| mysql |
| test |
+--------------------+
6 rows in set (0.10 sec)
TiDBからKafkaはどうなるのか?
TiDBは、TiCDCを使用してKafkaにデータをストリーミングすることも可能です。詳細はこちらのドキュメントに記載されています。
TiCDC (TiDB Change Data Capture) は、TiDBの変更をリアルタイムでキャプチャし、下流に複製するコンポーネントです。TiDBクラスタ内のすべての変更を追跡する内部的な記録であるRaftログを読み取り、それらの変更をKafkaや別のTiDBクラスタ、クラウドストレージなどの外部システムにプッシュします。
参考までに、TiDBのRaft log filesは通常このような形式になっています:
-rw-r--r-- 1 ankitkapoor cc 69B 11 Sep 02:29 0000000000000001.rewrite
-rw-r--r-- 1 ankitkapoor cc 0B 11 Sep 02:39 LOCK
-rw-r--r-- 1 ankitkapoor cc 869K 11 Sep 02:39 0000000000000001.raftlog <— Raft log
TiCDCはTiDBからKafkaへのデータを扱いますが、本記事ではその逆、KafkaからTiDBへのデータについて焦点を当てます。
KafkaからTiDBへ:概要
KafkaからTiDBへのデータストリーミングは、スケーラブルで信頼性の高いデータパイプラインを構築するためのオープンソースフレームワークであるKafka Connectを使って行われることが多いです。PySparkのような他のツールでも実現可能ですが、Kafka Connectは特に本番環境において、より簡単で高パフォーマンスな方法を提供します。
TiDBはMySQL互換であるため、既存のMySQL JDBCドライバーを利用して、KafkaとTiDB間のデータストリームを構築することができます。
必要要件
本ガイドに従うには、以下のコンポーネントが必要です:
- Kafka
- Zookeeper
- Kafka-topics
- Kafka-console-producer
- Kafka-console-consumer
- Kafka Sinkコネクタ
- MySQLクライアント
- TiDBクラスタ
テスト環境
- ローカルマシン:macOS 15.6.1
- MySQLクライアント:9.4.0 (最近のバージョンであれば問題ありません)
- データベース:TiDB Cloud Serverless (一般利用可能)
本ブログで扱わない内容
本ブログでは、ZooKeeper、Kafkaトピック、messages、ストリーミングの基本概念など、Kafkaの基礎知識を理解していることを前提としています。これらのトピックは公式のKafkaドキュメントに詳しく記載されているため、ここでは繰り返し説明しません。
はじめに
ステップ1:Kafkaのインストール
brew install kafka
表示されるメッセージ:
To start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
/opt/homebrew/opt/kafka/bin/kafka-server-start /opt/homebrew/etc/kafka/server.properties
Linuxの場合は、公式のセットアップガイドをご参照ください。
ステップ2:ZooKeeperのインストール
brew install zookeeper
表示されるメッセージ:
To start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don't want/need a background service you can just run:
SERVER_JVMFLAGS="-Dapple.awt.UIElement=true" /opt/homebrew/opt/zookeeper/bin/zkServer start-foreground
ZooKeeperの起動:
brew services start zookeeper
ステップ3:必要なファイルのダウンロード
以下をダウンロードします:
- Kafka Connect JDBC: confluentinc-kafka-connect-jdbc-10.8.4
- MySQL JDBCコネクタ:mysql-connector-j-9.4.0
MySQLコネクタのJARをConfluentのライブラリに移動し、以下の2つの設定ファイルを作成します:
- Connect-standalone.properties
- Mysql-sink-connector.properties
ステップ4:Kafka Connectの設定
設定ファイル connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/pathto_sink_jdbc_connector/
設定ファイル mysql-sink-connector.properties
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=kafka_to_TiDB ( one which we will be creating later, you can choose your desired name )
connection.url=jdbc:hostname:4000/yourdatabase
connection.user=user_name
connection.password=password
auto.create=false
auto.evolve=false
insert.mode=insert
pk.mode=none
table.name.format=tb_kafka_to_TiDB
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
transforms=filter
transforms.filter.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.filter.include=id,user
ステップ5:対象のTiDBテーブルを作成
CREATE TABLE `tb_kafka_to_TiDB` (
`id` int DEFAULT NULL,
`user` char(255) DEFAULT NULL
)
ステップ6:Kafka Connectの起動
設定ファイルと同じディレクトリで、以下のコマンドを実行します:
connect-standalone connect-standalone.properties mysql-sink-connector.properties
注意:このコマンドは、2つの設定ファイルConnect-standalone.propertiesとMysql-sink-connector.propertiesを作成したフォルダで実行してください。
正常に起動した場合のログには、以下の内容が含まれます:
kafka_to_TiDB-0 (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker:58)
[2025-08-18 19:58:35,603] INFO [jdbc-sink|task-0] [Consumer clientId=connector-consumer-jdbc-sink-0, groupId=connect-jdbc-sink] Found no committed offset for partition kafka_to_TiDB-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1508)
[2025-08-18 19:58:35,607] INFO [jdbc-sink|task-0] [Consumer clientId=connector-consumer-jdbc-sink-0, groupId=connect-jdbc-sink] Resetting offset for partition kafka_to_TiDB-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null isFenced: false)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:447)
[2025-08-18 19:58:46,968] INFO [jdbc-sink|task-0] JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:57)
ステップ7:Kafkaトピックの作成とテスト
トピックを作成し、プロデューサーを起動します:
kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_to_TiDB --property parse.key=false --property "key.separator=:"
その後、コンシューマーを起動して、メッセージが解析されていること確認します:
kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka_to_TiDB --from-beginning
hello
kafka
whats goin on
man
"User signed up"
"User signed up"
"User signed up"
{"id": 123, "status": "active"}
{"temperature": 25.4}
ステップ8:Kafkaへのメッセージ送信
kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_to_TiDB --property parse.key=false --property "key.separator=:"
>{"schema":{"type":"struct","fields":[{"field":"id","type":"int32"},{"field":"user","type":"string"}],"optional":false,"name":"kafka_to_TiDB"},"payload":{"id":1,"user":"Ankit"}}
Kafka Connectのログから、書き込みが正常に行われたことが確認できます:
[2025-08-18 19:58:48,424] INFO [jdbc-sink|task-0] Setting metadata for table "ankit"."kafka_to_TiDB" to Table{name='"ankit"."kafka_to_TiDB"', type=TABLE columns=[Column{'id', isPrimaryKey=false, allowsNull=true, sqlType=INT}, Column{'user', isPrimaryKey=false, allowsNull=true, sqlType=CHAR}]} (io.confluent.connect.jdbc.util.TableDefinitions:64)
[2025-08-18 19:58:48,725] INFO [jdbc-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)
[2025-08-18 19:58:48,726] INFO [jdbc-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)
TiDBで確認
最後に、MySQLクライアントを使ってTiDBに接続します:
./mysql -u 'ankit' -hhostname -P 4000 -p
その後、テーブルをクエリします:
mysql> select * from ankit.kafka_to_TiDB;
+------+-------+
| id | user |
+------+-------+
| 1 | Ankit |
+------+-------+
データが正常に挿入され、KafkaからTiDBへのイベントストリーミングが行われていることが確認できます。
結論
KafkaからTiDBへのデータストリーミングを活用することで、組織はKafkaの大量イベント処理能力を利用しつつ、TiDBの分散型SQL機能によるスケーラブルでリアルタイムなデータ処理を実現できます。この構成により、レイテンシの低減、書き込みのボトルネック回避、そして高負荷時でもアプリケーションのパフォーマンスを安定して維持することが可能です。
本ブログチュートリアルの第2部では、パフォーマンステストと可観測性に焦点を当て、毎秒数百万メッセージの負荷下でこのアーキテクチャがどのように動作するか、またTiDBのパフォーマンスを効果的に監視する方法を詳しく解説します。
実際に試してみたい方は、TiDB Cloud Quick Start LabでTiDBの動作を体験できます。さらに分散型SQLについて深く学びたい場合は、TiDB Universityのセルフペースコースがおすすめです。TiDBの基本から高度なパフォーマンスチューニング、実際のストリーミング統合まで、幅広く学習できます。
TiDB Cloud Dedicated
TiDB Cloudのエンタープライズ版。
専用VPC上に構築された専有DBaaSでAWSとGoogle Cloudで利用可能。
TiDB Cloud Starter
TiDB Cloudのライト版。
TiDBの機能をフルマネージド環境で使用でき無料かつお客様の裁量で利用開始。