Author: Fengbiao Liang (TiDB Cloud Solution Engineer at PingCAP)
Editors: Calvin Weng, Tom Dewan
TiDB is a distributed SQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads. Apache Flink is the most popular, open-source computing framework. It provides high-throughput, low-latency data computing and exactly-once semantics.
If you use TiDB with Flink, you may have had to quickly ingest streaming data from Apache Flink to TiDB. However, if you use Apache Flink’s default configuration, data ingestion performance will be limited. To achieve higher performance, you must:
- Enable bulk insert.
- Establish a high concurrency connection.
- Run the SQL
ON DUPLICATE KEY UPDATE
clause.
In this article, we share two ways you can achieve high-performance data ingestion by adjusting some parameters and code.
Test environment
Our test environment was:
- Apache Flink: V1.13.5
- Programing language: Scala V2.12
- MySQL JDBC Connector: V8.0.27
- Database: TiDB V5.3.0
Sample test cases
There are different approaches to achieving high performance in this scenario. We will recommend two of them, both of which enable BULK INSERT. The sample code is written in Scala.
Solution 1: Use the Flink SQL statement
The Flink SQL statement is the easiest way to enable BULK INSERT from other data sources to TiDB.
- Create the SQL table:
var createTableStatement = """
| CREATE TABLE order_item (
| id BIGINT,
| productId INT,
| name STRING,
| price DECIMAL(8,2),
| cnt INT,
| PRIMARY KEY (`id`) NOT ENFORCED
| ) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://<tidb_addr>:4000/?useServerPrepStmts=true
| &cachePrepStmts=true&rewriteBatchedStatements=true',
| 'driver' = 'com.mysql.jdbc.Driver',
| 'table-name' = 't_order_item',
| 'username' = 'root',
| 'password' = '',
| 'sink.buffer-flush.max-rows' = '200',
| 'sink.buffer-flush.interval' = '3',
| 'sink.parallelism' = '200' )
|""".stripMargin
The SQL statement above creates a table in Apache Flink that maps to the TiDB table. For better performance, note the following:
Parameter Comment rewriteBatchedStatements
Rewrite single statement to batch; set to true sink.buffer-flush.max-rows
Batch size of a bulk insert sink.parallelism
Number of concurrent connections to TiDB - Execute the insert SQL statement.
You can execute the following code to insert data to TiDB. The first line creates a table using the SQL statement from step 1.
The second line is the data from a data source table named
mock_orderitem
. When the code runs, it generates the insert SQL statement. For example:insert into t (a) values (10), (11), (12) on duplicate key update a = values(a);
tableEnv.executeSql(createTableStatement); tableEnv.executeSql("""INSERT INTO order_item ( | id, productId, name, price, cnt) | select id, productId, name, price, cnt | FROM mock_orderitem""".stripMargin)
Solution 2: Use JDBC sink in your code
Developers favor this solution because it’s highly flexible. You can customize the data source, structure, and format. The sample code shown below:
- Sets TiDB connection parameters.
- Uses “JdbcStatementBuilder” to generate SQL statements dynamically.
- Executes the SQL statement with multi-threads.
val executionOptions = JdbcExecutionOptions.builder.withBatchIntervalMs(3).withBatchSize(200).build
val connectionOptions = (new JdbcConnectionOptions.JdbcConnectionOptionsBuilder) .withUrl("jdbc:mysql://<tidb_addr>:4000/?useServerPrepStmts=true&cachePrepStmts=true&rewriteBatchedStatements=true")
.withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("").build
var insertSQL = "INSERT INTO t_order_item (id, productId, name, price, cnt) values (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE id=values(id)"
val sb: JdbcStatementBuilder[OrderItem] = new JdbcStatementBuilder[OrderItem] {
override def accept(ps: PreparedStatement, t: OrderItem): Unit = {
ps.setLong(1, t.id);
ps.setInt(2, t.productId);
ps.setString(3, t.name);
ps.setInt(4, t.price);
ps.setInt(5, t.cnt);
}
}
val mySink = JdbcSink.sink(insertSQL, sb, executionOptions, connectionOptions) sourceDataStream.addSink(mySink).setParallelism(200) env.execute("TiDB Bulk Insert Job")
Note the following parameters in your code:
Parameter | Comment |
withBatchIntervalMs(ms)IU |
Interval for bulk insert |
withBatchSize(number) |
Batch size of a bulk insert. |
setParallelism(number) |
Number of concurrent connections to TiDB |
Note:
If you use the ON DUPLICATE KEY UPDATE
clause in your SQL statement, please pay attention to the statement syntax. You should use the VALUES(col_name)
function to refer to column values. For example:
INSERT INTO table (id, name) values (?, ?) ON DUPLICATE KEY UPDATE id=values(id)
Summary
If you are writing a high-performance data ingestion program, we recommend that you read this article and test the parameters carefully. If you want to check whether BULK INSERT works, enable the TiDB general log to view the SQL statements.
If you run into issues, feel free to join our community on Slack and TiDB Internals to share them with us. You can also request a demo for these methods from PingCAP.
Keep reading:
Analytics on TiDB Cloud with Databricks
Data Transformation on TiDB Made Easier
Using Airbyte to Migrate Data from TiDB Cloud to Snowflake
TiDB Dedicated
TiDB Cloudのエンタープライズ版。
専用VPC上に構築された専有DBaaSでAWSとGoogle Cloudで利用可能。
TiDB Serverless
TiDB Cloudのライト版。
TiDBの機能をフルマネージド環境で使用でき無料かつお客様の裁量で利用開始。