tidb_feature_1800x600 (1)

※このブログは2025年10月21日に公開された英語ブログHow to Stream Data from Kafka to TiDB」の拙訳です。

現代のアプリケーションは、ユーザーによる操作、トランザクション、ログ、メトリクスなど、リアルタイムに発生する膨大なイベントデータを生成しています。こうしたスケールに対応するため、多くのチームはApache Kafkaを利用しています。Kafkaはアプリケーションとデータパイプラインを疎結合にし、信頼性の高い、高スループットなデータ配信を実現する分散メッセージングシステムです。

一方、ストレージ側ではTiDBが分散型のSQLデータベースとして、水平スケール、トランザクション処理と分析処理の両立、そして高負荷下でも低レイテンシなパフォーマンスを提供します。

KafkaとTiDBを組み合わせることで、高い書き込みスループットと高速なデータ処理が求められるリアルタイムワークロードに強力な基盤を構築できます。

この2部構成のブログチュートリアルでは、KafkaとTiDBを統合する方法を解説します。第1部では、KafkaからTiDBへデータをストリームする仕組みと、このアーキテクチャがますます人気を集めている理由を取り上げます。第2部では、Kafkaが毎秒数百万メッセージを処理する状況でのTiDBのパフォーマンスや、TiDBの内部パフォーマンスを監視する方法を紹介します。

なぜデータをKafka経由でストリーミングするのか?

最近携わったある顧客のプロジェクトでは、アプリケーションがメッセージを直接Kafkaに送信する構成を採用していました。Kafkaからは、SQL ServerやCassandraなどの永続ストレージ層へとデータが流れるようになっています。

このような設計は、大量の書き込みを扱うシステムでは一般的です。高負荷時にアプリケーションからデータベースへ直接書き込みを行うと、レイテンシが長期化する問題が発生し、アプリケーション全体のパフォーマンス低下につながる可能性があります。Kafkaはアプリケーションとデータベースの間でバッファとして機能し、頻繁な書き込みを一旦受け止め、非同期で処理してからストレージ層へ届けることで、この問題を緩和します。

データの取り込みと永続化を疎結合にすることで、トラフィックが急増した場合でもKafkaは安定したパフォーマンスと信頼性を維持することができます。

How Kafka decouples application data streams.

図1:Kafkaがアプリケーションのデータストリームを疎結合化する仕組み

TiDBとは?

TiDBは、水平スケーラビリティ、強い一貫性、高可用性を備えたオープンソースの分散型SQLデータベースです。コンピュートとストレージを分離したアーキテクチャを採用しており、それぞれのレイヤーを独立してスケールさせることができます。これはコストとパフォーマンスを最適化するうえで大きな利点となります。

TiDBはMySQL互換であるため、既存のアプリケーション、ドライバー、SQL構文をほとんど変更なしでそのまま利用できます。この互換性により、MySQL、PostgreSQL、MongoDBなど他のデータベースからの移行も容易になります。

How TiDB complements Kafka for unified workloads.

図2:TiDBがKafkaを補完し統合ワークロードを実現する仕組み

まずは、クラウド上で動作しているテスト用のTiDBインスタンスの例から見てみましょう。

ankitkapoor@Ankits-MacBook-Air bin % ./mysql -uankit -hxxx -P 4000 -p


mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| INFORMATION_SCHEMA |
| PERFORMANCE_SCHEMA |
| ankit              |
| kafka              |
| mysql              |
| test               |
+--------------------+
6 rows in set (0.10 sec)

TiDBからKafkaはどうなるのか?

TiDBは、TiCDCを使用してKafkaにデータをストリーミングすることも可能です。詳細はこちらのドキュメントに記載されています。

TiCDC (TiDB Change Data Capture) は、TiDBの変更をリアルタイムでキャプチャし、下流に複製するコンポーネントです。TiDBクラスタ内のすべての変更を追跡する内部的な記録であるRaftログを読み取り、それらの変更をKafkaや別のTiDBクラスタ、クラウドストレージなどの外部システムにプッシュします。

参考までに、TiDBのRaft log filesは通常このような形式になっています:

-rw-r--r--  1 ankitkapoor  cc    69B 11 Sep 02:29 0000000000000001.rewrite
-rw-r--r--  1 ankitkapoor  cc     0B 11 Sep 02:39 LOCK
-rw-r--r--  1 ankitkapoor  cc   869K 11 Sep 02:39 0000000000000001.raftlog  <— Raft log

TiCDCはTiDBからKafkaへのデータを扱いますが、本記事ではその逆、KafkaからTiDBへのデータについて焦点を当てます。

KafkaからTiDBへ:概要

KafkaからTiDBへのデータストリーミングは、スケーラブルで信頼性の高いデータパイプラインを構築するためのオープンソースフレームワークであるKafka Connectを使って行われることが多いです。PySparkのような他のツールでも実現可能ですが、Kafka Connectは特に本番環境において、より簡単で高パフォーマンスな方法を提供します。

TiDBはMySQL互換であるため、既存のMySQL JDBCドライバーを利用して、KafkaとTiDB間のデータストリームを構築することができます。

必要要件

本ガイドに従うには、以下のコンポーネントが必要です:

  1. Kafka
  2. Zookeeper
  3. Kafka-topics
  4. Kafka-console-producer
  5. Kafka-console-consumer
  6. Kafka Sinkコネクタ
  7. MySQLクライアント
  8. TiDBクラスタ

テスト環境

  • ローカルマシン:macOS 15.6.1
  • MySQLクライアント:9.4.0 (最近のバージョンであれば問題ありません)
  • データベース:TiDB Cloud Serverless (一般利用可能)

本ブログで扱わない内容

本ブログでは、ZooKeeperKafkaトピックmessages、ストリーミングの基本概念など、Kafkaの基礎知識を理解していることを前提としています。これらのトピックは公式のKafkaドキュメントに詳しく記載されているため、ここでは繰り返し説明しません。

はじめに

ステップ1:Kafkaのインストール

​​brew install kafka

表示されるメッセージ:

To start kafka now and restart at login:
  brew services start kafka
Or, if you don't want/need a background service you can just run:
  /opt/homebrew/opt/kafka/bin/kafka-server-start /opt/homebrew/etc/kafka/server.properties

Linuxの場合は、公式のセットアップガイドをご参照ください。

ステップ2:ZooKeeperのインストール

brew install zookeeper

表示されるメッセージ:

To start zookeeper now and restart at login:
  brew services start zookeeper
Or, if you don't want/need a background service you can just run:
  SERVER_JVMFLAGS="-Dapple.awt.UIElement=true" /opt/homebrew/opt/zookeeper/bin/zkServer start-foreground

ZooKeeperの起動:

brew services start zookeeper

ステップ3:必要なファイルのダウンロード

以下をダウンロードします:

MySQLコネクタのJARをConfluentのライブラリに移動し、以下の2つの設定ファイルを作成します:

  1. Connect-standalone.properties
  2. Mysql-sink-connector.properties

ステップ4:Kafka Connectの設定

設定ファイル connect-standalone.properties

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

plugin.path=/pathto_sink_jdbc_connector/

設定ファイル mysql-sink-connector.properties

name=jdbc-sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

topics=kafka_to_TiDB ( one which we will be creating later, you can choose your desired name )

connection.url=jdbc:hostname:4000/yourdatabase

connection.user=user_name

connection.password=password

auto.create=false

auto.evolve=false

insert.mode=insert

pk.mode=none

table.name.format=tb_kafka_to_TiDB

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true

value.converter.schemas.enable=true

transforms=filter

transforms.filter.type=org.apache.kafka.connect.transforms.ReplaceField$Value

transforms.filter.include=id,user

ステップ5:対象のTiDBテーブルを作成

CREATE TABLE `tb_kafka_to_TiDB` (

  `id` int DEFAULT NULL,

  `user` char(255) DEFAULT NULL

)

ステップ6:Kafka Connectの起動

設定ファイルと同じディレクトリで、以下のコマンドを実行します:

connect-standalone connect-standalone.properties mysql-sink-connector.properties

注意:このコマンドは、2つの設定ファイルConnect-standalone.propertiesとMysql-sink-connector.propertiesを作成したフォルダで実行してください。

正常に起動した場合のログには、以下の内容が含まれます:

kafka_to_TiDB-0 (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker:58)

[2025-08-18 19:58:35,603] INFO [jdbc-sink|task-0] [Consumer clientId=connector-consumer-jdbc-sink-0, groupId=connect-jdbc-sink] Found no committed offset for partition kafka_to_TiDB-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1508)

[2025-08-18 19:58:35,607] INFO [jdbc-sink|task-0] [Consumer clientId=connector-consumer-jdbc-sink-0, groupId=connect-jdbc-sink] Resetting offset for partition kafka_to_TiDB-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null isFenced: false)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:447)

[2025-08-18 19:58:46,968] INFO [jdbc-sink|task-0] JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:57)

ステップ7:Kafkaトピックの作成とテスト

トピックを作成し、プロデューサーを起動します:

kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_to_TiDB --property parse.key=false --property "key.separator=:"

その後、コンシューマーを起動して、メッセージが解析されていること確認します:

kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka_to_TiDB --from-beginning

hello

kafka

whats goin on

man

"User signed up"

"User signed up"

"User signed up"

{"id": 123, "status": "active"}

{"temperature": 25.4}

ステップ8:Kafkaへのメッセージ送信

kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_to_TiDB --property parse.key=false --property "key.separator=:"

>{"schema":{"type":"struct","fields":[{"field":"id","type":"int32"},{"field":"user","type":"string"}],"optional":false,"name":"kafka_to_TiDB"},"payload":{"id":1,"user":"Ankit"}}

Kafka Connectのログから、書き込みが正常に行われたことが確認できます:

[2025-08-18 19:58:48,424] INFO [jdbc-sink|task-0] Setting metadata for table "ankit"."kafka_to_TiDB" to Table{name='"ankit"."kafka_to_TiDB"', type=TABLE columns=[Column{'id', isPrimaryKey=false, allowsNull=true, sqlType=INT}, Column{'user', isPrimaryKey=false, allowsNull=true, sqlType=CHAR}]} (io.confluent.connect.jdbc.util.TableDefinitions:64)

[2025-08-18 19:58:48,725] INFO [jdbc-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)

[2025-08-18 19:58:48,726] INFO [jdbc-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)

TiDBで確認

最後に、MySQLクライアントを使ってTiDBに接続します:


./mysql -u 'ankit' -hhostname -P 4000 -p

その後、テーブルをクエリします:


mysql> select * from ankit.kafka_to_TiDB;
+------+-------+
| id   | user  |
+------+-------+
|    1 | Ankit |
+------+-------+

データが正常に挿入され、KafkaからTiDBへのイベントストリーミングが行われていることが確認できます。

結論

KafkaからTiDBへのデータストリーミングを活用することで、組織はKafkaの大量イベント処理能力を利用しつつ、TiDBの分散型SQL機能によるスケーラブルでリアルタイムなデータ処理を実現できます。この構成により、レイテンシの低減、書き込みのボトルネック回避、そして高負荷時でもアプリケーションのパフォーマンスを安定して維持することが可能です。

本ブログチュートリアルの第2部では、パフォーマンステストと可観測性に焦点を当て、毎秒数百万メッセージの負荷下でこのアーキテクチャがどのように動作するか、またTiDBのパフォーマンスを効果的に監視する方法を詳しく解説します。

実際に試してみたい方は、TiDB Cloud Quick Start LabでTiDBの動作を体験できます。さらに分散型SQLについて深く学びたい場合は、TiDB Universityのセルフペースコースがおすすめです。TiDBの基本から高度なパフォーマンスチューニング、実際のストリーミング統合まで、幅広く学習できます。


Have questions? Let us know how we can help.

Contact Us

TiDB Cloud Dedicated

TiDB Cloudのエンタープライズ版。
専用VPC上に構築された専有DBaaSでAWSとGoogle Cloudで利用可能。

TiDB Cloud Starter

TiDB Cloudのライト版。
TiDBの機能をフルマネージド環境で使用でき無料かつお客様の裁量で利用開始。