是一款易用且高性能的分布式数据集成平台,支持实时海量数据同步,具备稳定、高效的处理能力,每天可同步数百亿级别的数据,现已在国内 3000+ 企业的生产环境中广泛应用。
Databend 则是一款云原生的存算分离数据平台,具备弹性、高并发等特性,适用于现代数据处理需求。
本文将聚焦分析 SeaTunnel 中 MySQL-CDC 插件及其 Sink 输出的数据格式,并进一步探讨在实际场景中将 SeaTunnel 与 Databend 进行集成的可行性与实现路径。
SeaTunnel 整体是一个标准的数据同步工具:
SeaTunnel 的 MySQL CDC 连接器允许从 MySQL 数据库中读取快照数据和增量数据。根据不同的 sink 端,观察 MySQL-CDC 输出的数据是否可以直接被 Databend 使用。
从测试来看,SeaTunnel 所用的 MySQL 同步组件应该是 debezium-mysql-connector(Kafka Connect 也调用该组件)。
任务设定是通过 SeaTunnel 从 MySQL 中同步 wubx.t01 表。
配置文件 v2.mysql.streaming.conf
# v2.mysql.streaming.confenv{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000}source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" }}sink { Console { }}
./bin/seatunnel.sh --config ./config/v2.mysql.streaming.conf -m local
观察终端上日志
SELECT * FROM `wubx`.`t01`
获取到的数据如下:
2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 1, databend2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 3, MySQL2025-05-07 14:28:21,914 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 4, Setunnel01
全量同步完成
insert into t01 values(5,'SeaTunnel');
在 SeaTunnel 中可以直接捕获到增量数据,对应的动作为 kind=INSERT。
2025-05-07 14:35:48,520 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=INSERT : 5, SeaTunnel
update t01 set c1='MySQL-CDC' where id=5;
2025-05-07 14:36:47,455 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_BEFORE : 5, SeaTunnel2025-05-07 14:36:47,455 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=UPDATE_AFTER : 5, MySQL-CDC
delete from t01 where id=5;
2025-05-07 14:37:33,082 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=wubx.t01 SeaTunnelRow#kind=DELETE : 5, MySQL-CDC
从 console 输出的日志格式较为清晰,非常利于排查问题和后续使用。
通过上述 MySQL-CDC 输出终端的测试,可以确认 insert、update、delete 操作均能被正确捕获和处理。接下来我们测试 MySQL-CDC -> MySQL,对应的配置文件 v2.mysql.streaming.m.conf 如下:
#v2.mysql.streaming.m.confenv{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000}source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" }}sink { jdbc { url = "jdbc:mysql://192.168.1.100:3306/wubx?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "wubx" password = "wubxwubx" generate_sink_sql = true # You need to configure both database and table database = wubx table = s_t01 primary_keys = ["id"] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" }}
./bin/seatunnel.sh --config ./config/v2.mysql.streaming.m.conf -m local
观察终端上日志
全量同步语句:
2025-05-07 14:56:01,024 INFO [e.IncrementalSourceScanFetcher] [debezium-snapshot-reader-0] - Start snapshot read task for snapshot split: SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) exactly-once: false2025-05-07 14:56:01,026 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 2 - Snapshotting data2025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Exporting data from split 'wubx.t01:0' of table wubx.t012025-05-07 14:56:01,028 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - For split 'wubx.t01:0' of table wubx.t01 using select statement: 'SELECT * FROM `wubx`.`t01`'2025-05-07 14:56:01,032 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Finished exporting 3 records for split 'wubx.t01:0', total duration '00:00:00.004'2025-05-07 14:56:01,033 INFO [f.s.MySqlSnapshotSplitReadTask] [debezium-snapshot-reader-0] - Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.000058, pos=7737, gtids=12b437c2-ba62-11ec-a554-b4b5b694bca5:1-2215900, row=0, event=0} for split SnapshotSplit(tableId=wubx.t01, splitKeyType=ROW<id INT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null)2025-05-07 14:56:01,519 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=972391330309210113, pipelineId=1, taskGroupId=2}] - Finished reading from splits [wubx.t01:0]
sink 端写数据对应的 prepare 语句
2025-05-07 14:56:01,708 INFO [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is:INSERT INTO `wubx`.`s_t01` (`id`, `c1`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `c1`=VALUES(`c1`)2025-05-07 14:56:01,709 INFO [.e.FieldNamedPreparedStatement] [st-multi-table-sink-writer-1] - PrepareStatement sql is:DELETE FROM `wubx`.`s_t01` WHERE `id` = ?
从上述语句可以看出,对应的 binlog event 可以直接处理:
insert ,update 可以直接用 INSERT INTO wubx
.s_t01
(id
, c1
) VALUES (?, ?) ON DUPLICATE KEY UPDATE id
=VALUES(id
), c1
=VALUES(c1
) 处理
delete 使用: DELETE FROM wubx
.s_t01
WHERE id
= ? 处理
SeaTunnel MySQL-CDC 这块应该比较稳定,底层数据读取使用的是 debezium,这是一个非常成熟的工具,值得信赖。
本节也重点关注在云环境下的数据同步基座,尤其是如何以最低成本完成数据同步工作。在云上进行数据同步时,需要考虑如何以最低成本完成这项工作。在海外项目中,开发者更倾向于使用 kafka-connect,通常先将数据 Sink 到 S3 中,然后批量处理 S3 中的文件,最终得到一份完整的数据。
直接使用配置文件 v2.mysql.streaming.s3.conf:
env{ parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 2000}source { MySQL-CDC { base-url="jdbc:mysql://192.168.1.100:3306/wubx" username="wubx" password="wubxwubx" table-names=["wubx.t01"] startup.mode="initial" }}sink { S3File { bucket = "s3a://mystage" tmp_path = "/tmp/SeaTunnel/${table_name}" path="/mysql/${table_name}" fs.s3a.endpoint="http://192.168.1.100:9900" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "minioadmin" secret_key = "minioadmin" file_format_type="json" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" }}
首先使用 json 格式进行 sink
./bin/seatunnel.sh --config ./config/v2.mysql.streaming.s3.conf -m local
观察终端上日志
2025-05-07 15:14:41,430 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-42] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_1/NON_PARTITION/T_972396021571125249_c679929b12_0_1_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_1_0.json] finish
/mysql/t01/T_972396021571125249_c679929b12_0_1_0.json 内容:
{"id":1,"c1":"databend"}{"id":3,"c1":"MySQL"}{"id":4,"c1":"Setunnel01"}{"id":5,"c1":"SeaTunnel"}
看到这里感觉有些失望,似乎缺少了 kind 和时间字段。
接下来
insert into t01 values(6,'SeaTunnel01');
2025-05-07 15:18:59,380 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-16] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_130/NON_PARTITION/T_972396021571125249_c679929b12_0_130_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_130_0.json] finish
T_972396021571125249_c679929b12_0_130_0.json 内容为:
{"id":6,"c1":"SeaTunnel01"}
update t01 set c1='MySQL-CDC' where id=5;
2025-05-07 15:20:15,386 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-9] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_168/NON_PARTITION/T_972396021571125249_c679929b12_0_168_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_168_0.json] finish
T_972396021571125249_c679929b12_0_168_0.json 对应内容:
{"id":5,"c1":"SeaTunnel"}{"id":5,"c1":"MySQL-CDC"}
一个 update 操作在 json 文件中记录了两条数据,但由于缺少操作类型(kind)和时间字段,难以准确还原数据变更过程。如果包含时间字段,还可以选择保留最新的一条记录。
delete from t01 where id=5;
2025-05-07 15:22:53,392 INFO [.c.s.f.h.HadoopFileSystemProxy] [hz.main.generic-operation.thread-6] - rename file :[/tmp/SeaTunnel/t01/SeaTunnel/972396021571125249/c679929b12/T_972396021571125249_c679929b12_0_247/NON_PARTITION/T_972396021571125249_c679929b12_0_247_0.json] to [/mysql/t01/T_972396021571125249_c679929b12_0_247_0.json] finish
T_972396021571125249_c679929b12_0_247_0.json 对应的内容
{"id":5,"c1":"MySQL-CDC"}
delete 操作同样缺少操作类型(kind),仅记录了一行原始数据,因此难以用于后续的数据处理和溯源。
因此,利用 SeaTunnel 的 S3File sink 以 json 格式进行数据溯源目前并不可行。建议 S3File sink 增加对 maxwell_json 和 debezium_json 格式的支持。
期待这一功能的完善,这样 SeaTunnel 就可以将所有数据同步到 S3,让 S3 承担消息队列的功能。
开源世界非常有趣,如果一个功能无法实现,总会有其他替代方案。
因为 MySQL-CDC 底层基于 Debezium,应该支持 Debezium format。
https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/debezium-json
而且还支持
https://SeaTunnel.apache.org/docs/2.3.10/connector-v2/formats/maxwell-json
也就是说,SeaTunnel 为了保持与 debezium 和 maxwell 的兼容性,支持在 sink 到 Kafka 时选择这两种格式。
{ "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter ", "weight": 5.18 }, "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter ", "weight": 5.17 }, "source": { "version": "1.1.1.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1589362330000, "snapshot": "false", "db": "inventory", "table": "products", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 2090, "row": 0, "thread": 2, "query": null }, "op": "u", "ts_ms": 1589362330904, "transaction": null}
上述格式的数据在 Databend 或 Snowflake 中都比较容易处理,可以根据对应的
"op": "u", "ts_ms": 1589362330904,
使用 merge into + stream 的方式把数据合并到目标表中。
{ "database":"test", "table":"product", "type":"insert", "ts":1596684904, "xid":7201, "commit":true, "data":{ "id":111, "name":"scooter", "description":"Big 2-wheel scooter ", "weight":5.18 }, "primary_key_columns":[ "id" ]}
这个 json 体中包含了 type、ts 和主键字段,后续利用 SQL 进行 ELT 处理也非常方便。
也就是说,如果想用 SeaTunnel 输出这种标准的 CDC 格式日志,还需要引入类似 Kafka 的架构:
与群里的小伙伴交流后发现,确实有人这么做,从 Kafka 中将消息同步到 OSS 上。
创建一个用于记录 binlog 消息体明细的 update 表,用于记录明细
create table t01_update( database varchar, table varchar, type varchar, ts bigint, xid bigint, commit boolean, data variant, primary_key_columns array(varchar));
该表数据源可以从 S3 获取,利用 copy into 可以把对应的数据近实时的加载到 t01_update 这张表里
创建一个目标表: t01
id int,name varchar,description varchar,weight double);
对 t01_update 表创建一个 stream 用于记录该表的增量
create stream stream_t01_update on table t01_update;
在 Databend 实现该数据合并到目标表中
MERGE INTO t01 AS aUSING ( SELECT data:id AS id, data:name AS name, data:description AS description, data:weight AS weight, ts, type FROM stream_t01_update QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) = 1) AS bON a.id = b.idWHEN MATCHED AND b.type = 'update' THEN UPDATE SET a.name = b.name, a.description = b.description, a.weight = b.weightWHEN MATCHED AND b.type = 'delete' THEN DELETEWHEN NOT MATCHED THEN INSERT (id, name, description, weight) VALUES (b.id, b.name, b.description, b.weight);
通过该 SQL,可以实现以窗口去重的方式,将 binlog 原始数据合并到目标表。
通过对 MySQL-CDC 输出形态的分析,目前有三种方案可以实现 SeaTunnel 和 Databend 的整合:
第一种方式是直接开发 Databend 的 SeaTunnel connector,支持 sink 和 source。这种方式实现简单
第二种方式是在 S3File 中增加对 debezium-json 和 maxwell-json 格式的支持,这是一种较为优雅的方案,后续增量数据可以基于 Databend Stream 提供,方便外部数据源直接获取
第三种方式是引入 Kafka 作为 SeaTunnel 的 Sink,这样可以直接使用 debezium-json 和 maxwell-json 格式的消息体,通过数据治理实现 MySQL-CDC 到 Databend 的同步。这种方式方便多个下游系统订阅 Kafka 中的增量数据。
通过对 SeaTunnel 多种格式输出及行为的测试,我们初步了解了 SeaTunnel MySQL-CDC 的能力,为后续与 Databend 的整合做了准备。SeaTunnel 结合 Spark、Flink 等生态,已经可以胜任大型 CDC 任务。如果有相关实践,欢迎与作者交流分享。