如何基于 Apache SeaTunnel 构建高效数据同步管道

已经提交过的这些信号全部都保存下来,这个过程会每一个节点都会给checkpointmanager来发up信号,那么我们一次成功的checkpoint,就是所有的节点都能够流通checkpoint信号,并且都回了Ack包,所有的Ack都收到之后,我们认为这个CK4就完成了,那么它就会移动到CK3后面,咱们认为CK4完全成功,然后开始下一次的CK5,那么这就是我们为了达到断点恢复或者是异常恢复所做的切换设计

大家好,我是王海林,目前是 Apache SeaTunnel 社区的 PMC Member。今天我为大家带来的主题是《Apache SeaTunnel 构建高效数据同步管道》。

172257926492535f7b2d156b842018ac9ca18f676c9bc

Apache SeaTunnel 是一站式数据集成平台,支持离线和实时数据同步,提供了灵活的扩展和高效的并行处理,确保数据一致性。

本文介绍了 SeaTunnel 的架构设计、核心功能 、最佳实践 以及如何参与社区共建,帮助用户快速上手并深入了解其强大功能。

接下来,我将从以下五个方面进行分享:

  1. 项目基本介绍

  2. 入门体验

  3. 核心设计揭秘

  4. 最佳实践推荐

  5. 参与社区共建

项目基本介绍

诞生背景

在数据集成的早期,主要以 ETL (Extract, Transform, Load)概念为主。这一时期的主要任务是从客户的生产环境中同步各种生产系统产生的业务过程数据,例如 ERPCRM 等。

1722579266346418210495f5aff3c24672cbca70f7bca

这些数据通常存储在关系数据库中,并通过专业的 ETL 工具进行抽取和同步,最终进入数据仓库,用于 BI 报表和统计分析。而这个时期代表性的 ETL 工具包括 Informatica、Talend 和 Kettle,Kettle 则是这个时期用得比较多的开源工具。

随着分布式技术的流行,如 Hadoop 和 MPP(大规模并行处理)存储的普及,数据同步逐渐向同步时进行简单抽取,完成后再进行复杂加工和计算处理的方向发展,抽取过程开始使用 MR(MapReduce)等程序框架。

近年来,新的概念 EtLT(Extract, Transform, Load, Transform)逐渐兴起。

172257926640019385db3b5ae07c4eb978faed8c1f712

这种方法将 ETL 中的转换步骤(T)进行分离和提取,抽取过程中只做轻量级转换和脏数据清洗过滤,而将复杂分析和转换放在贴近业务的地方进行。

这一阶段的数据集成更加工程化,着重于处理性能和兼容性。

在这种背景下,Apache SeaTunnel 应运而生。SeaTunnel 专注于数据的抽取、轻量级转换和高效、快速、兼容性广泛的加载(Load),在数据集成的各个阶段都具有独特优势。

用户痛点

了解了数据集成的发展历程后,我们再看看数据集成阶段用户面临的主要痛点:

172257934971857e5f31c81d0d57124573a1ec0478d45

数据源的繁多

随着存储技术的发展,数据源种类迅速增加,并且存在各种版本兼容问题,不同版本可能同时使用,增加了管理的复杂性。

集成场景的复杂性

集成场景的复杂性也在增加,除了离线加载,还有实时监听性同步、全量加载、增量加载以及 CDC(Change Data Capture)等。

降低影响及高性能

在这些场景中,还需要保证对数据源影响最小,不干扰业务使用,同时实现高吞吐、低延时 和同步过程中的指标监控与量化

数据一致性

此外,在满足多种场景的同时,必须保证数据的一致性,避免重复和丢失。这对数据集成技术提出了简单易用、易管理、易维护和可扩展的要求,以便应对复杂的集成需求和企业特有的问题。

Apache SeaTunnel 的解决方案

前面我们说到,SeaTunnel 的目标是成为一站式的数据集成平台。截止到现在,对于 SeaTunnel 的用户来说,只需编写简单的任务配置文件,通过提交客户端运行任务即可

大多数常见的数据存储源都已内置集成,无需额外开发,SeaTunnel 的生态系统丰富,兼容数据库、消息队列、文件、云存储、数据湖仓和 SaaS API 等。

SeaTunnel 采用插件架构,方便用户扩展和定义自己的数据流,支持多场景,如离线同步、实时同步和 CDC。此外,SeaTunnel 还支持运行监控,能够统计任务在运行过程中的读写和性能指标。

SeaTunnel 具有 checkpoint 概念,支持分布式快照、中断暂停恢复和两阶段提交 API,保证数据的一致性。

架构设计

SeaTunnel 的架构设计基于插件化架构,从下图我们可以看到,左侧是用于读取数据的插件,右侧是用于写入数据的插件,当然我们图里面只放了一部分,更全的插件支持请访问官网查看。

172257926639700023149b8fa6b60e718b124f1881b3a

核心 API 设计主要包括 Table API、Source API 和 Sink API。数据在读取后被转换为 Table API,再通过 Sink API 写入目标存储。

SeaTunnel 支持多种引擎,包括 SeaTunnel 社区专门为数据同步场景研发的 Zeta 引擎、Flink 引擎和 Spark 引擎。用户可以将插件任务提交到不同的引擎运行。

入门体验

对 SeaTunnel 有了基本了解后,接下来,我将带大家进行一个简单的入门体验,帮助大家快速上手 Apache SeaTunnel。

安装

如果是从官网下载 SeaTunnel 安装包,下载完成后需要进行插件的安装。由于安装包不包含插件,用户需要编辑插件的配置文件,并下载安装这些插件到安装包中。

17225794052264e3f9497f1c9bff41c2352aeb3b33f4f

如果是从源码编译安装,可以从 GitHub 克隆 SeaTunnel 工程,使用 mvn 命令打包 dist 模块,构建出一个完整的安装包。

该安装包包含所有连接器插件,无需额外下载。

17225794052306b3e1ddba9b8a9c481336fd18bce6295

任务配置

安装完成后,我们需要编写一个简单的任务配置文件来提交任务。

任务配置文件主要分为三个部分:

  1. ENV:这里包含一些关于 job 的设置性配置。最主要的配置有两个,一个是 parallelism,用于设置任务运行时的并行线程数。如果没有配置,默认是单线程运行;如果数据量较大,可以将其配置为多线程以提高读写速度。另一个是 job.mode,用于标识任务是批处理(Batch)还是流处理(Stream)。

  2. Source:在这一部分配置读取数据的插件。插件的名称和内部配置都在这个区块中。例如,以下示例中使用了 JDBC 插件,配置了 result_table_name 用于标识 source 在配置文件中的定位符。其他配置项根据具体插件的文档进行设置。

  3. Sink:这一部分配置写入数据的插件。配置规则与 Source 类似,第一层是插件名称,内部配置项包括 source_table_name,用于指定要接收的数据来源。例如,如果 Source 插件配置了 my-source-1,那么 Sink 插件的 source_table_name 也应配置为 my-source-1,表示数据流从 Source 到 Sink 的连接。

以下是一个简单的任务配置示例:

1722579266397f2ae1953f22d27156c6580f2e2dc186d

任务执行

接下来,我们需要执行这个简单的任务。

这里主要介绍 Zeta 引擎 ,在 SeaTunnel 的 bin 目录下,有一个 seatunnel.sh 脚本用于提交任务,提交任务的主要参数包括任务文件的路径和运行模式。

任务文件路径是刚刚编写并保存的配置文件的路径,-m 是运行模式,运行模式可以是 cluster 或 local

1722579405247cb3c8e8ca7a4f0dc6bd71701658266e1

cluster 模式将任务提交到远程集群,这需要事先启动集群;

local 模式则在本地运行任务,适合测试和无需启动集群的情况;

以下是一个简单的启动命令示例:

bin/seatunnel.sh -c config/example-job1.conf -m local

通过这个命令,可以在本地运行我们刚刚配置的任务。

核心功能特性探讨

在前面简单体验了 SeaTunnel 之后,我们接下来探讨其核心功能设计和原理,以更好地理解 SeaTunnel 如何完成复杂的数据集成任务。

连接器的插件架构

SeaTunnel 的连接器插件架构是其核心设计之一。

从下面这张图左侧来看的话,首先就是 config,我们会编写任务的配置文件,然后 config 提交到引擎上面运行,从配置文件提交任务开始,SeaTunnel 会首先解析配置文件,找到 Source 区块并对应到具体的 Source 插件代码。

17225792664135e2481fbb84b7c99a353033def36bb7e

每个 Source 插件包含固定的结构,包括 Enumerator API(枚举器)、Reader API(读取器)和 State API(状态管理)。

Sink 部分(图最右侧)与 Source 类似,也包含 State APIWriter API(写入器)和 Committer API(两阶段提交)。通过这种插件化的设计,SeaTunnel 能够灵活应对各种数据源和目标存储的需求。

逻辑结构与运行结构

SeaTunnel 的逻辑结构和运行结构通过图示可以更清晰地理解,黑色的线主要是逻辑结构,彩色的是运行结构。

17225792664374fd9110e0df7177e50e6839b6956d771

逻辑结构上,配置文件提交后,会创建 Enumerator(枚举器)用于划分数据区块,然后通过 Reader API 创建读取器, Enumerator 会将数据块均匀发送给所有 Reader

Reader 数量与配置的并行线程数(parallelism)相对应,以实现并行数据读取。

17225792670409bcf94218e9de21506eae4b3fb7113b4

在 Sink 部分,配置文件会创建与 Reader 数量相同的 Writer 并一一对应自动绑定进行数据写入,同时创建 Committer 用于所有 Writer 的状态提交。

通过这种逻辑转化和运行结构,SeaTunnel 实现了数据从读取到写入的高效处理。

状态 API 的设计

在数据读写过程中,数据会从 Enumerator 开始流动到 reader 到 writer 到 committer ,那么这整个过程我们要实现中断恢复或者是异常情况的恢复,能够重新回到原来的起点去,接着读数据。

172257926708633f036cda79fcda53d75bc9568922255

这个过程中,SeaTunnel 通过状态 API(State API)实现中断恢复和异常恢复。

每个运行节点会不断触发 checkpoint(检查点),保存内存状态并持久化。

无论是 Zeta、Flink 还是 Spark 引擎,都需要通过 splitEnumerator(枚举器)的 snapshotState 作为入口,将状态传递到 ReaderWriter 和 Committer

整个过程就是在不断地去保存所有运行节点的内存状态,然后每一次保存成功之后都能够持久化地存储下来。

接下来是运行时的逻辑结构

最下面我们可以看到分布式快照管理器(checkpoint manager)会不断触发 checkpoint,保存所有运行节点的内存状态并持久化。这样,在发生中断或异常时,可以从上一次成功的 checkpoint 恢复,配置触发的间隔,继续数据处理。

17225792671012a4385efb19d0d9e9f111e374e3028fd

图里面我们可以看到在右侧,实际上是一些触发成功的,在左边是一些正在触发的和待触发的,那以 CK4 为例子看的话,它首先会从一开始发到枚举器 Enumerator ,它在这个时候会保存自己内部的状态,有多少 split 没有发出去,有多少已经发出去了?然后枚举器会把这个 CK4 继续往后发到所有管理的 reader 上面,也就是有多少并行度,这边就会发多少个出去,然后每个 reader 会去存储自身的状态,读了哪个 split?还剩多少?会把这些状态给存下来。

然后 reader 继续往后发,会发到对应的 writer,我们可以看到 reader 各自会发各自的 writer, writer 接收到了数据,如果有缓存的话,也会去保存一些状态,这个状态保存完之后,会把 CK4 的信号继续往后发,直到 committer 收到所有 writer 的 CK4 信号,最终把这个 checkpoint 处理完,把它自身的还没有提交的状态保存下来。

已经提交过的这些信号全部都保存下来,这个过程会每一个节点都会给 checkpoint manager 来发 up 信号,那么我们一次成功的 checkpoint,就是所有的节点都能够流通 checkpoint 信号,并且都回了 Ack 包,所有的 Ack 都收到之后,我们认为这个 CK4 就完成了,那么它就会移动到 CK3 后面,咱们认为 CK4 完全成功,然后开始下一次的 CK5,那么这就是我们为了达到断点恢复或者是异常恢复所做的切换设计。

类型映射与自动建表

在数据同步的过程中我们可能经常会遇到一个问题,就是有很多目标存储要去同步的话,那首先要去把表建起来,那如果表特别多的话就比较麻烦了,就会有很大量的操作工作。

还有一个就是我们源和 source 它们是不同的存储,数据类型是完全不一样的,也需要进行统一的兼容转换,SeaTunnel 则提供了类型映射和自动建表功能,以解决异构数据类型的兼容和大量表的自动创建。

1722579267123848d8f0c99b25ec103a4b0853230f8ab

通过 TypeConvert API,Source 将外部系统类型导入到 SeaTunnel 类型,Sink 将 SeaTunnel 类型导出到目标存储类型,实现统一的类型转换。

1722579267147ed6c9f2b0537b91a74fe91ced67949c3

自动建表通过 SchemaSaveModel 和 DataSaveModel 定义表结构和数据处理策略。

1722579267176f836bba89796f6f506326849de7606a9

例如,如果表已经存在,是否需要重建;如果表中已有数据,是否需要删除或执行自定义清理操作。每个连接器在其内部实现和处理这些 API。

多表同步

在同步数据的过程中如果表很多的话,会对管理维护造成很大的一个困扰,对运维的负担也很大,甚至是对连接资源的消耗,如果都是独立地去做同步任务的话,那么表非常多的时候连接可能就没那么多,可能也不够用!

所以这块 SeaTunnel 支持在一个任务中进行多表同步,以减少管理和维护负担,并节省连接资源。

1722579267788123162999c702ccb4e4a392c239e7368

对于 Source,通过枚举器(Enumerator)划分数据区块并分配给读取器(Reader),支持动态分配不同的表。

172257926494397799b01358168f0ec44d65d82fb563f

对于 Sink,定义了 SupportMultiTableSink 接口,通过实现该接口,可以动态生成多表写入的执行计划。

1722579264984c71a00f86a5012b7620a3b03b5013f92

在运行时,Sink 为每张表生成子 Sink(Sub Sink),将其包裹在一个综合接口中,形成复杂的 MultiTableSink

这种设计不仅简化了任务管理,也优化了连接资源的使用。

共享缓存

在一些场景中,需要将一个存储中的数据写入多个存储。SeaTunnel 通过共享缓存机制,避免重复读取数据。

17225792649580ba2ebc371001d83ef549b113c88a7de

Source 读取数据后放入缓存,不同的 Sink 可以从缓存中获取数据,减少对源端数据源的影响。

尽管共享缓存会带来写入速率的相互影响,但整体上优化了数据传输和资源使用。

CDC(Change Data Capture)设计流程

CDC(Change Data Capture)设计流程分为快照读取和增量读取两个阶段。

快照读取阶段处理已有表的历史数据,增量读取阶段通过 binlog 进行数据监听和读取。 1722579264966322870fd70a94ed4b0ac89f48d4aa22d

快照读取

首先,我们处理已有表的存量历史数据,这称为快照读取。在快照读取阶段,每个 split(划分的最小粒度)对应一个表的一部分数据范围(从某个键到某个键)。

17225792656698ce112344b8f4818780560f72a8b1404

快照读取的目的是确保数据的一致性,这里有一个 Exactly-Once 的处理方法:

  1. 记录水位线:在每个 split 读取前后,分别记录 low-watermark 和 high-watermark(日志上的水位线)。

  2. 比对水位线:读取完成后,比对水位线。如果水位线发生变化,说明数据在读取过程中被更改,需要合并数据并重读。

  3. 内存表处理:在读取时,先将数据放入内存表,并按键(key)索引。然后,从日志中捞取数据,并按键覆盖内存表中的数据。这样,即使数据发生变化,按键覆盖也能确保数据的一致性。如果数据被删除,内存表也会同步删除。

通过这种方式,快照读取阶段可以确保数据不重复、不丢失。

增量读取

在快照读取完成后,进入增量 binlog 读取阶段。

172257926565939c8602e20e17b93d99bb7351e96cd24

此时,处理的原则如下:

  1. 选择日志起点:从所有 split 中最小的 high-watermark 位置开始读取 binlog。

  2. 过滤区间数据:过滤 split 与 split 之间的间隙数据,丢弃每个 split 的 low-watermark 和 high-watermark 之间的数据,保留 high-watermark 之后的数据。

  3. 确保一致性:通过过滤和回捞处理,确保增量读取的数据不重不丢。

1722579265698c3f5f6b5eeea762b5c25e2c0eea8088c

数据一致性保证

从快照阶段到增量阶段,通过上述 Exactly-Once 的处理方式,可以保证数据一致性。每个阶段都通过记录和比对水位线,处理内存表,确保数据的准确性和完整性。

这种设计确保了在处理 CDC 读取时,即使数据在读取过程中被修改,也能通过严格的水位线管理和内存表合并,保证数据的一致性和准确性。

CDC 读取的动态加减表

在任务运行过程中,可能会需要动态地增加或减少同步的表。在传统设计中,这通常通过暂停任务、修改配置然后恢复任务来实现。

而在 SeaTunnel 中,这一过程与 checkpoint 机制密切相关。

17225792656640bc5be83aee2593b38fefad0f440acc6

SeaTunnel 会在状态切换时保存当前的任务状态,从而允许任务在特定状态下暂停并在恢复时继续执行,确保数据一致性。

CDC 写入的顺序保持

为了在并行读写过程中保持数据顺序,SeaTunnel 采用了 key 拆分的方式。如果并行处理时同一个 key 的顺序被打乱,写入的数据就可能会发生错乱。

1722579265658b56df477f9fe6929438b588e06ecf5c31722579265658b56df477f9fe6929438b588e06ecf5c3

为了解决这个问题,SeaTunnel 会将相同 key 的数据(如 insert、update、delete)分配到同一写入线程中,确保这些操作按顺序执行。

例如,在日志中,i 代表 insert,u 代表 update,d 代表 delete

如果一个 key 对应的数据依次发生了 insert、update、delete 操作,通过 key 拆分,这些操作会被分配到同一个写入线程,从而保持顺序一致性,避免数据错乱。

CDC 写入的 Exactly-Once

为了实现写入的 Exactly-Once 语义,SeaTunnel 在写入过程中也做了一些特殊处理。由于状态的暂停和恢复,可能会发生重读或重放某个 checkpoint 的数据。 1722579265676e67ef150c4b12cb82683a9f9c45b9d8e

这时,Sink 可以通过支持 upsert 操作来处理重复数据。具体来说,如果数据库支持 upsert 写入,那么 insert 和 update 操作都将被处理为 upsert,确保数据的一致性。

CDC 资源优化

在 CDC 读取过程中,资源优化是一个重要的考虑因素。通常情况下,CDC 读取会先读取历史数据,然后再读取增量的 binlog 数据。

历史数据量可能非常大,需要较多的并行读取线程;而 binlog 数据量相对较少,不需要那么多的读取线程。

17225792677772714c64bb5a45bd998d171d8a261c198

为了有效地管理资源,SeaTunnel 引入了 idle 状态的概念。

以下是具体的处理方式:

17225792677648a3f30e99456b8e1ce295cb30e60e7b3

  • 读取历史数据:在读取历史数据阶段,需要较多的并行读取线程以提高处理速度。

  • 进入 binlog 阶段:一旦开始读取 binlog 数据,数据量通常较少,只需要一个并行任务去读取 binlog 数据。

  • 报告 idle 状态:当某些读取线程处于空闲状态时,这些线程会向枚举器报告 idle 状态。

  • 动态调整线程:枚举器接收到 idle 状态报告后,会动态地关掉所有处于 idle 状态的 task group,只保留一个读取线程继续处理 binlog 数据。

  • 释放资源:被关闭的 task group 会释放其占用的数据库连接资源,从而减少资源占用。

17225792677836e1c01c36cd2fac7bc29c50dcca7f9c9

最佳实践

env 配置

并行线程 (parallelism)
  • parallelism 表示任务并行线程的数量,即创建多少对 reader 和 writer 的组合。例如,设置 parallelism 为 3,就会有三对 reader 和 writer 并行读写。

  • 需要注意,并不是所有连接器插件都支持并行处理。在配置之前,要确保所使用的连接器支持该功能。

  • 确认连接器是否支持数据切分。如果连接器的枚举器不支持数据切分,例如某些文件连接器,那么无论并行线程设置多少,其他线程都会空闲。例如,一个文件只能在一个线程中处理,其他线程则会处于空状态。

checkpoint.interval
  • checkpoint.interval 主要用于流任务。对于批任务,该设置默认关闭。如果要配置流任务的 checkpoint.interval,建议不要设置得过短,以免影响数据读写同步的效率。

  • 某些 Sink 插件在写数据时,会与 checkpoint 挂钩。checkpoint 触发时,可能会刷新内存中的缓存数据。如果出现数据延迟,可能与 checkpoint.interval 的设置有关。

  • 常见的触发错误是 CheckpointCoordinator inside have error,通常表明在 checkpoint 执行时,writer 或 committer 在将数据写入目标库时失败或超时。可以通过调整 checkpoint 执行的超时时间来解决此问题。

Source 分片注意事项

Source 任务配置中的分片是提高并行读写效率的关键。要确保 Source 支持分片,并了解其配置方法。

不同的连接器有不同的分片方式,配置分片键的类型、大小和数量也会影响读取效率。

此外,数据的分布情况也会导致倾斜问题,需要根据具体场景进行优化。

Sink 的多表配置

用户反馈比较多的一个问题是多表配置。

目前,多表配置在文档上还有一些未完善的地方。Source 的表到 Sink 是迭代的,Sink 可以通过变量提取动态分配表信息,如 ${database_name}、${table_name} 等。变量可以组合使用,添加前后缀,例如 ${database_name}_xyz、${table_name}_abc 等。

像 Paimon、Iceberg、Doris 、Starrocks 、Hive 等连接器都可以通过这种方式提取多表信息进行写入,具体配置方式根据场景和数据库有所不同。

以下是一些代码示例:

172257926778758e22332dd7e7cda2fd4db613bdb74a517225792678663a85c9fd558f657ab202f0b94eeb328a172257926839239a14d55426f90916a4318677ff43af81722579268482280ee8ea55b54a1f0f6a7d65c63a185f1722579268458922a47b7f6817ad3b8f1d89e97bfb51a1722579268477d6131ac06bbb2349e7848ac2f028cd91

参与社区共建

欢迎大家参与 SeaTunnel 社区的共建。我们需要大家一起来共同维护和丰富我们的连接器生态。可以通过官网的 DOWNLOAD 链接选择合适的版本进行试用。

在试用过程中,如果发现问题,可以在 GitHub Issue 上反馈。如果想参与新功能的开发或 Bug 修复,也可以在 Issue 中找到需要帮助的问题。

参与贡献https://github.com/apache/seatunnel/issues?q=is:open+is:issue+label:"help+wanted"

平时的交流和寻求帮助,可以通过邮件列表或者关注社区公众号。感谢大家的支持和参与!