打开实时数据同步新思路:SeaTunnel 选择性捕获数据库变更怎么用?

在现代数据架构中,实时捕获和处理数据变更是构建数据湖、实时数仓及业务分析的关键技术。

https://github.com/apache/SeaTunnel

点击蓝字



关注我们


一、概述

在现代数据架构中,实时捕获和处理数据变更是构建数据湖、实时数仓及业务分析的关键技术。Apache SeaTunnel通过读取数据库的事务日志(如MySQL的Binlog),能够高效、准确地捕获数据表的变更事件(INSERT, UPDATE, DELETE)。

Apache SeaTunnel原生支持通过元数据提取row_kind列,该列记录了每条捕捉到的数据的变更类型(信号),例如 +I(插入)、-U(更新前)、+U(更新后)、-D(删除)。这使得用户能够对变更流进行更为精细的控制,例如通过row_kind字段过滤出特定的变更事件(如仅同步新增插入的数据),从而实现高效化、定制化的实时数据管道。

该技术广泛应用于只接受追加写入的数据湖同步、下游分析系统中完整的变更历史记录保留以及流式ETL过程的精细化逻辑过滤等场景。

二、环境搭建

进行示例演示前,请提前准备好以下环境与组件。

  • JDK 11

  • Apache SeaTunnel 2.3.12

  • Mysql 5.7

    三、SeaTunnel配置

1. SeaTunnel连接器插件准备


首先,确保你的SeaTunnel能连接MySQL。

编辑 config/plugin_config 文件,添加以下两个核心连接器:

connector-cdc-mysqlconnector-jdbc

保存后,执行安装脚本:

sh bin/install-plugin.sh

如果在线安装网络不畅,可以直接访问 Maven仓库 手动下载对应版本的JAR包,然后放入 connectors 目录即可。

2. MySQL驱动放置


由于MySQL JDBC驱动通常不内置,需要手动下载。将 mysql-connector-java-8.0.28.jar (或你使用的版本) 放到 SeaTunnel 的 lib 目录下。

四、MySQL建表

CREATE TABLE `w`  (  `id` int(11NOT NULL,  `name` varchar(50CHARACTER SET utf8mb4 NULL DEFAULT NULL,  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;CREATE TABLE `w2`  (  `id` int(11NOT NULL,  `name` varchar(50CHARACTER SET utf8mb4 NULL DEFAULT NULL,  `row_kind` varchar(10CHARACTER SET utf8mb4 NULL DEFAULT NULL) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;

注意:w2不能设置id为主键,否则会根据主键进行修改而不是新增。

五、SeaTunnel任务定义

env {  parallelism = 1  job.mode = "STREAMING"}source {  MySQL-CDC {    server-id = 5000    username = "root"    password = "root"    table-names = ["cdc.w"]    url = "jdbc:mysql://localhost:3306/cdc"  }}transform {  RowKindExtractor {
  }}sink {  jdbc {        url = "jdbc:mysql://localhost:3306/cdc?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        username = "root"        password = "root"        database = cdc        table = w2        generate_sink_sql = true  }}

关键说明:

1.通过RowKindExtractor把数据行增加一个row_kind的标志,实现 Append-Only 模式

2.row_kind字段名可以修改

custom_field_name = "op_type"    

3.数据类型有缩写和完整写法,默认缩写

transform_type = SHORT     # FULL

执行

bin/seatunnel.sh -c job/文件 -m local

这样w表中的数据变化都会同步到w2表中。

六、测试

1、插入数据


insert into w values(1,'张三');insert into w values(2,'李四');mysql> select * from w2;+----+--------+----------+| id | name   | row_kind |+----+--------+----------+|  1 | 张三   | +I       ||  2 | 李四   | +I       |+----+--------+----------+

2、修改数据


update w set name='王五' where id=2;delete from w where id=2;mysql> select * from w2;+----+--------+----------+| id | name   | row_kind |+----+--------+----------+|  1 | 张三   | +I       ||  2 | 李四   | +I       ||  2 | 王五   | +U       ||  2 | 王五   | -D       |+----+--------+----------+

结论:所有的数据都是以插入的形式同步过来。

七、通过元数据实现变更过滤

利用row_kind元数据,可以在同步管道中轻松实现选择性同步。例如,若只需将源表的“新增插入”数据同步到另一个表w2,可以在SQL查询中添加WHERE条件对row_kind字段进行过滤。其核心原理在于行级变更事件标记:

在更新(UPDATE)操作中会输出两个连续的变更事件:-UUPDATE_BEFORE,表示旧值)和+UUPDATE_AFTER,表示新值)。删除(DELETE)操作则输出-D事件。通过过滤row_kind= '+I',即可精准捕获并下游传递所有插入事件,而忽略所有UPDATE和DELETE事件,从而实现诸如源流快照、仅追加(Append-Only)数据导入等业务逻辑。

技术实现示例:

transform {  RowKindExtractor {    plugin_input = "mysql_source"    plugin_output = "trans_row"  }  Sql {    plugin_input = "trans_row"    plugin_output = "trans_sql"    query = "select id,name from trans_row where row_kind = '+I'";  }}

增加数据标识以后,通过sql过滤只需要新增的数据,实时写入到目标表w2中。UPDATE和DELETE事件则被该过滤条件拦截,不会传输到下游。

八、测试验证与结果分析

为了验证上述row_kind过滤逻辑的效果,我们在源表w上执行一系列操作,并观察目标表w2的数据变化。

w2就不需要row_kind字段了。

测试步骤与观察:

1.插入数据:


INSERT INTO w VALUES(1,'张三');INSERT INTO w VALUES(2,'李四');mysql> select * from w2;+----+--------+----------+| id | name   | row_kind |+----+--------+----------+|  1 | 张三   | +I       ||  2 | 李四   | +I       |+----+--------+----------+

2.更新数据:


UPDATE w SET name='王五' WHERE id=2;DELETE FROM w WHERE id=2;

w2表不会有变化。

Apache SeaTunnel

Apache SeaTunnel是一个云原生的多模态、高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达9k+,社区达到7000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。




同步Demo

MySQL→Doris | MySQLCDC | MySQL→Hive | HTTP → Doris  | HTTP → MySQL | MySQL→StarRocks|MySQL→Elasticsearch |Kafka→ClickHouse

新手入门

SeaTunnel 让数据集成变得 So easy!3 分钟入门指南
 0 到 1 快速入门 /初探/深入理解 
  分布式集群部署 | CDC数据同步管道 | Oracle-CDC
图片

最佳实践

中控技术天翼云多点OPPO | 清风马蜂窝孩子王哔哩哔哩唯品会众安保险兆原数通 | 亚信科技|映客|翼康济世|信也科技|华润置地|Shopee|京东科技|58同城|互联网银行|JPMorgan
图片

测试报告

SeaTunnel VS GLUE |  VS Airbyte |  VS DataX|SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比
图片

源码解析

Zeta引擎源码解析(一) |(二) |(三)| API 源码解析 |2.1.1源码解析|封装 Flink 连接数据库解析





仓库地址: 
https://github.com/apache/seatunnel
网址:
https://seatunnel.apache.org/
Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在Community Over Code(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:
https://github.com/apache/seatunnel/issues
贡献代码:
https://github.com/apache/seatunnel/pulls
订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org
开发邮件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-3uouszk3m-PtLLNyZsJVqE5Gb6gn24mA
关注 X.com: 
https://x.com/ASFSeaTunnel