作者 | 陈飞 中付支付大数据工程师
今天和大家分享一个 简单但常见的 MySQL 到 MySQL 数据同步与合并场景案例,这个案例也是我在实际工作中遇到的问题,希望能抛砖引玉,欢迎有更丰富经验的大佬一起分享交流。
版本要求:

在我们的业务系统中,存在两个 MySQL 源库:
source_asource_b这两个库中存在一张表结构相同的表,但数据来自不同的业务线,两边都会同时产生数据,因此存在 主键重复 的问题。
我们的目标是将这两个源库的表数据 合并同步到一个目标库(我们称为 C 库),以便于统一分析和查询。
我们采用了如下方式来实现这个同步与合并的方案:

data_source,用于标识数据来源(source_a 或 source_b)原主键 + data_source 作为联合主键,确保不会因为两个源的主键重复而导致冲突source_a 与 source_bdata_source下面我们直接进入实战环节,关于 SeaTunnel 的基础知识,这里就不再赘述,上一期的大佬已经讲得非常清楚了,我们直接进入正题。
上期文章链接:

要使用 mysql-cdc 连接器,有两个必要的前置条件:
binlog_format 必须设置为 ROWbinlog_row_image 设置为 FULL-- 检查当前配置
SHOWVARIABLESLIKE'binlog_format';
SHOWVARIABLESLIKE'binlog_row_image';
-- 如果未开启,可在 my.cnf 文件中添加以下配置:
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
binlog-row-image = FULL
以上权限说明及设置方式可以参考官网文档,文档中提供了详细的权限说明与示例,建议大家同步查阅。

-- 创建同步账号
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'your_password';
-- 授予必要权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;
适合 服务器可访问外网 且 无需复杂定制 的场景。
mysql-cdc, jdbc)wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
config/plugin_config保留需要的插件
bin/install-plugin.sh
适合对插件有特殊需求或希望获得完整插件支持的用户。
sh ./mvnw clean install -DskipTests -Dskip.spotless=true
seatunnel-dist/target/apache-seatunnel-2.3.9-bin.tar.gz
自行编译后生成的包中默认已集成所有插件及对应依赖,无需额外操作。
本案例使用的插件:
mysql-cdc
jdbc
插件说明与驱动依赖也可参考对应的文档!
SeaTunnel 支持多种部署方式:
使用 Seatunnel 自带引擎(Zeta)
作为 Spark / Flink 作业运行
使用 Zeta 引擎时的三种模式:

集群搭建完成后,我们开始准备配置文件。
一般情况下,SeaTunnel 的配置文件可以分为以下四个部分:

parallelism:并行度,表示任务运行的并发度,数值越大越快,具体要结合资源情况设置。job.mode:作业运行模式。由于我们使用的是 mysql-cdc 插件,因此必须设置为 Streaming 模式。checkpoint.interval:检查点间隔,Streaming 模式下默认是 30 秒一次,可以根据需要调整。
使用的插件是 mysql-cdc,需要配置以下内容:
database-names 和 table-names 显式指定,也可以使用正则表达式模糊匹配。mysql-cdc 前,需要确保:binlog 功能已开启;binlog-format 设置为 ROW;binlog-row-image 设置为 FULL;binlog、主从复制、查询所有表等权限。
在本案例中,我们需要给每条数据添加一个字段,用于标识数据来源,例如:data_source 字段,值可以是 source_a 或 source_b。

这个转换过程使用 sql 插件实现,通过添加常量字段的方式,将数据来源信息加到每条数据中。
需要注意:
source_table 是保留字,表示上一个处理环节中的表名。
Sink 使用的是 jdbc 插件,配置项主要包括:
以上配置组合完成后,我们就可以实现从多个源库(如 source_a 和 source_b)实时同步数据到目标库的需求。在同步的同时,我们还增强了字段,使数据能够被标识来源并统一写入。
整个流程既支持复杂数据结构,又能灵活适配业务场景,是一个适合实际生产的数据集成案例。
在配置 Sink 时,我们还可以做一些写入性能方面的优化:
schema_save_mode:结构保存策略
data_save_mode:数据保存策略
support_upsert_by_query_primary_key_exist:是否支持根据主键做 Upsert
primary_keys:指定写入数据的主键
data_source 字段。./seatunnel.sh --config ../config/demo/collect_a.config -e cluster --cluster sz-seatunnel --name collect_a --async
./seatunnel.sh --config ../config/demo/collect_b.config -e cluster --cluster sz-seatunnel --name collect_b --async
--config:指定配置文件
-e:运行模式 cluster/local
--cluster:集群名称,部署集群时配置,默认是seatunnel
--name:任务名称
--async:后台运行
到这里,配置部分就全部完成了。接下来我们来看下实际运行的效果:

a 表和 b 表,c 表为空。a 的同步进程。c 表,已经写入了 a 表的数据,且 data_source 字段为 source_a。b 的同步进程。c 表,写入了 b 表的数据,data_source 字段为 source_b。a 表的数据。c 表。
到这里整个数据同步和合并的流程就全部完成啦!
非常感谢大家的聆听 ,希望这个案例能为大家提供一些思路,也欢迎大家分享自己在 Apache SeaTunnel 使用中的更多经验,我们一起交流学习!
Apache SeaTunnel是一个云原生的多模态、高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布Apache SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达8k+,社区达到6000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。
同步Demo
新手入门

最佳实践

测试报告

源码解析



