
https://github.com/apache/
点击蓝字
关注我们
一、概述
在现代数据架构中,实时捕获和处理数据变更是构建数据湖、实时数仓及业务分析的关键技术。Apache SeaTunnel通过读取数据库的事务日志(如MySQL的Binlog),能够高效、准确地捕获数据表的变更事件(INSERT, UPDATE, DELETE)。
Apache SeaTunnel原生支持通过元数据提取row_kind列,该列记录了每条捕捉到的数据的变更类型(信号),例如 +I(插入)、-U(更新前)、+U(更新后)、-D(删除)。这使得用户能够对变更流进行更为精细的控制,例如通过row_kind字段过滤出特定的变更事件(如仅同步新增插入的数据),从而实现高效化、定制化的实时数据管道。
该技术广泛应用于只接受追加写入的数据湖同步、下游分析系统中完整的变更历史记录保留以及流式ETL过程的精细化逻辑过滤等场景。
二、环境搭建
进行示例演示前,请提前准备好以下环境与组件。
JDK 11
Apache SeaTunnel 2.3.12
Mysql 5.7
三、SeaTunnel配置
首先,确保你的SeaTunnel能连接MySQL。
编辑 config/plugin_config 文件,添加以下两个核心连接器:
connector-cdc-mysqlconnector-jdbc保存后,执行安装脚本:
sh bin/install-plugin.sh如果在线安装网络不畅,可以直接访问 Maven仓库 手动下载对应版本的JAR包,然后放入 connectors 目录即可。
由于MySQL JDBC驱动通常不内置,需要手动下载。将 mysql-connector-java-8.0.28.jar (或你使用的版本) 放到 SeaTunnel 的 lib 目录下。
四、MySQL建表
CREATE TABLE `w` ( `id` int(11) NOT NULL, `name` varchar(50) CHARACTER SET utf8mb4 NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;CREATE TABLE `w2` ( `id` int(11) NOT NULL, `name` varchar(50) CHARACTER SET utf8mb4 NULL DEFAULT NULL, `row_kind` varchar(10) CHARACTER SET utf8mb4 NULL DEFAULT NULL) ENGINE = InnoDB CHARACTER SET = utf8mb4 ;注意:w2不能设置id为主键,否则会根据主键进行修改而不是新增。
五、SeaTunnel任务定义
env { parallelism = 1 job.mode = "STREAMING"}source { MySQL-CDC { server-id = 5000 username = "root" password = "root" table-names = ["cdc.w"] url = "jdbc:mysql://localhost:3306/cdc" }}transform { RowKindExtractor {}}sink { jdbc { url = "jdbc:mysql://localhost:3306/cdc?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" username = "root" password = "root" database = cdc table = w2 generate_sink_sql = true }}
关键说明:
1.通过RowKindExtractor把数据行增加一个row_kind的标志,实现 Append-Only 模式
2.row_kind字段名可以修改
custom_field_name = "op_type" 3.数据类型有缩写和完整写法,默认缩写
transform_type = SHORT # FULL执行
bin/seatunnel.sh -c job/文件 -m local这样w表中的数据变化都会同步到w2表中。
六、测试
insert into w values(1,'张三');insert into w values(2,'李四');mysql> select * from w2;+----+--------+----------+| id | name | row_kind |+----+--------+----------+| 1 | 张三 | +I || 2 | 李四 | +I |+----+--------+----------+update w set name='王五' where id=2;delete from w where id=2;mysql> select * from w2;+----+--------+----------+| id | name | row_kind |+----+--------+----------+| 1 | 张三 | +I || 2 | 李四 | +I || 2 | 王五 | +U || 2 | 王五 | -D |+----+--------+----------+结论:所有的数据都是以插入的形式同步过来。
七、通过元数据实现变更过滤
利用row_kind元数据,可以在同步管道中轻松实现选择性同步。例如,若只需将源表的“新增插入”数据同步到另一个表w2,可以在SQL查询中添加WHERE条件对row_kind字段进行过滤。其核心原理在于行级变更事件标记:
在更新(UPDATE)操作中会输出两个连续的变更事件:-U(UPDATE_BEFORE,表示旧值)和+U(UPDATE_AFTER,表示新值)。删除(DELETE)操作则输出-D事件。通过过滤row_kind= '+I',即可精准捕获并下游传递所有插入事件,而忽略所有UPDATE和DELETE事件,从而实现诸如源流快照、仅追加(Append-Only)数据导入等业务逻辑。
技术实现示例:
transform { RowKindExtractor { plugin_input = "mysql_source" plugin_output = "trans_row" } Sql { plugin_input = "trans_row" plugin_output = "trans_sql" query = "select id,name from trans_row where row_kind = '+I'"; }}增加数据标识以后,通过sql过滤只需要新增的数据,实时写入到目标表w2中。UPDATE和DELETE事件则被该过滤条件拦截,不会传输到下游。
八、测试验证与结果分析
为了验证上述row_kind过滤逻辑的效果,我们在源表w上执行一系列操作,并观察目标表w2的数据变化。
w2就不需要row_kind字段了。
测试步骤与观察:
INSERT INTO w VALUES(1,'张三');INSERT INTO w VALUES(2,'李四');mysql> select * from w2;+----+--------+----------+| id | name | row_kind |+----+--------+----------+| 1 | 张三 | +I || 2 | 李四 | +I |+----+--------+----------+UPDATE w SET name='王五' WHERE id=2;DELETE FROM w WHERE id=2;w2表不会有变化。
Apache SeaTunnel是一个云原生的多模态、高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达9k+,社区达到7000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。
同步Demo
新手入门

最佳实践

测试报告

源码解析



