Apache SeaTunnel 构建实时数据同步管道(最新版)

具体而言,其回捞的实际过程是舍弃掉每一个Split的高水位与低水位之间的数据,而着重获取Split与Split之间间隙的数据,通过这样的操作流程,能够捕获到在Split处理快照阶段未曾覆盖到的数据,从而保障整个数据处理过程的完整性与准确性,实现Exactly-Once的处理要求

文章作者

王海林 白鲸开源 数据集成引擎研发

Apache SeaTunnel Committer & PMC Member,Apache SkyWalking Committer,多年平台研发经验,目前专注于数据集成领域。

导读

在当今数字化快速发展的时代,数据已然成为企业决策、创新与发展的核心驱动力。实时数据同步作为数据处理流程中的关键环节,对于确保企业各系统间数据的一致性、及时性以及完整性起着举足轻重的作用。Apache SeaTunnel 作为一款开源的、分布式的数据集成平台,专注于大数据领域的数据流动,为实现高效实时数据同步提供了强有力的支持。它不仅支持流处理,还对批处理提供了良好的支持,是一个一体化的解决方案。其最大的特点就是支持多样化的数据源和数据目的地,能够轻松应对每日数以亿计的数据同步需求,已成功应用于各大企业的生产环境中。本文将以 “Apache SeaTunnel 构建实时数据同步管道”为主题,深入探讨其在数据集成领域的卓越表现与应用价值。

今天的介绍会围绕下面四点展开:

1. 背景介绍

2. 上手体验

3.挑战与解决思路

4. 未来展望


背景介绍

1.数据集成发展史 - ETL

v2-5e0afef9d7bac2e2c85dc1b03c137b43_1440w

首先,在数据集成发展早期,数据处理采用的是 ETL 模式,也就是从抽取(Extract)、经过转换(Transform)、再加载到目标(Load)的过程。主要面向结构化数据,数据主要来自于传统的 SQL Server、Oracle、MySQL 等数据库,以及结构化的系统,如 ERP、CRM 等。使用专业的数仓工具,或者一些大型的 DB 来完成存储的任务。在这一个阶段比较有代表性的数据集成工具有 Informatica、Kettle、Talend 等。

v2-4ef0207b7e7432a456e605e7d221477f_1440w

在持续发展进程中,历经如 Hadoop、Hive 或 MPP 等技术后,相较于传统 ETL,一种全新的架构开始形成。原本的转换过程被后置,以面向更贴近业务的分析场景;而 EL(抽取和加载) 过程则前置,从而更迅速地实现原始数据的搬运与加载。至于 T(转换)环节,能够与业务分析需求更为紧密地契合,可依据业务分析需求进行快速迭代响应。在此发展阶段,EL(抽取和加载) 过程主要涉及大数据处理任务以及一些大数据同步组件。在此期间,对业务流程中的转换关注相对较少,主要是将转换环节迁移至更贴近业务之处 。

v2-8c7b27b7ead5ec8d0c121a8a29536079_1440w

随着新技术与新型存储方式的不断发展,数据源正迅速变得更加复杂多样。例如,当下既有传统数据库(DB),也有湖仓存储、向量存储、云和软件即服务(SaaS)的应用程序编程接口(API),以及各类对象存储等技术纷纷涌现,这使得整个数据集成工作变得愈发复杂。

在此背景下,一个与当下实际情况更为契合的概念应运而生。EtLT 是在传统 ETL 基础上发展而来的一种结合模式。在该模式中,EL 过程实现了更为专业化的分工,这就需要更为专业的项目团队来完成兼容性更强、速度更快的 EL 处理工作。而 T(转换)环节,则依旧朝着与业务紧密结合的方向发展,以快速满足业务复杂多变的分析需求。

Apache SeaTunnel 的生态定位聚焦于 EL 环节,其中间包含一个小型的 T,主要用于满足复杂多变的数据源演变需求,能够支持诸如变更数据捕获(CDC)、文件读取、抽取软件 SaaS API,以及部分云组件等。此外,对于正在迭代发展的湖仓存储和向量存储等也提供支持。在数据加载(Load)方面,可实现高性能的数据加载,除了基本的数据加载功能外,还支持结构加载以及部分 API 的加载。在 EL 过程中,会进行轻量级的转换(Transform),能够支持行级别的数据转换或过滤操作。而整个与业务紧密相关的清洗过程,依然由大型的 Transform 模块负责,由更为专业的工具和团队进行处理。

2. 数据集成面临的痛点

v2-582c5ad0d814b8db4635673006ce84e7_1440w

数据集成面临的痛点主要有哪些呢?

首先,数据源复杂多变。如前文所述,当前技术迭代速度极快,由此带来的数据源种类增加、版本兼容性等问题层出不穷。

其次,应用场景复杂。需要支持包括离线、实时,全量、增量、变更数据捕获(CDC)等在内的一系列复杂的同步场景。而且,在整个集成与同步过程中,要尽可能降低对数据源的影响。这就要求在有限资源条件下,实现高吞吐量与低延迟,同时确保整个同步过程的数据一致性,并且该同步过程需具备可监控性与可量化性。鉴于这些特性,对整个数据集成过程提出了诸多要求:所采用的技术栈要简洁,易于维护与操作,且扩展性要强。因为企业内部时常会涌现各种独特需求,需要进行针对性的适配与扩展。

3.Apache SeaTunnel 如何解决?

v2-9cb2939fc457f4cc9e8e94fb25910661_1440w

Apache SeaTunnel 是如何解决这些问题的呢?

首先,在生态方面,目前它能够支持包括数据库、消息队列、文件、云存储、湖仓以及软件即服务(SaaS)的应用程序编程接口(API)等在内的多种数据源的集成场景,可见其生态体系极为丰富。

其次,从应用场景来看,Apache SeaTunnel 同时支持离线和实时的同步场景。

在资源消耗层面,Apache SeaTunnel 进行了诸多优化设计。主要通过连接共享,将大量任务以及多表操作整合为单任务集成,以此降低对数据源的影响,并减少管理复杂度。

关于数据一致性,Apache SeaTunnel 支持利用分布式快照进行状态保存,从而能够从任意快照位置实现数据恢复。

最后,在简单易用性上,Apache SeaTunnel 目前支持使用配置性语言编写配置文件,以此来提交作业。

4.Apache SeaTunnel 架构

v2-a447adfa2ce5714129c471d465259712_1440w

Apache SeaTunnel 的架构构建原理如下:

其关键在于构建了一套统一的标准 API 体系。在该体系中,Source API 主要承担数据读取功能,它被设计为能够与多种类型的数据库相兼容,通过这一 API,可将不同数据库的读取操作进行统一抽象,使其在同一逻辑层面上得以处理。不仅如此,对于云存储以及 SaaS 等多样化的数据存储形式,Source API 同样能够实现有效的抽象读取,确保数据获取的一致性与通用性。

与之相对应的是 Sink API,其主要作用是实现数据写入操作的抽象化。通过 Sink API,能够适应各种不同的数据写入场景,充分考虑到不同存储系统之间的差异,从而保证数据能够准确无误地写入到目标存储位置。

在 Source API 和 Sink API 之间或之上,设置了 Table API。这一 API 发挥着至关重要的作用,它不仅能够实现数据在不同组件之间的顺利衔接与传递,确保数据流程的连贯性,还具备传递数据结构信息的能力,使得数据在整个处理过程中能够保持结构的完整性和一致性。

除了上述针对插件的三类 API 外,还存在一类引擎 API。当前,Apache SeaTunnel 支持三种执行引擎,分别为自主研发的 Zeta 引擎,以及业界广泛应用的 Flink 和 Spark 引擎。基于此架构,Apache SeaTunnel API 的插件连接器具有显著的优势,即只需进行一次编写,便能够在多种不同的执行引擎上稳定运行,极大地提高了开发效率和系统的兼容性。

此外,在支持多执行引擎的同时,Apache SeaTunnel 架构还具备对系统运行过程中的监控信息、事件信息以及各类性能指标进行收集和汇报的功能。在这样的架构支撑下,Apache SeaTunnel 能够有效应对各种复杂的数据集成场景,成功实现对多种数据源的兼容适配,并对不同数据源的行为进行统一的抽象处理,从而为用户提供高效、稳定的数据处理服务。

5. 实时数据集成如何与Apache SeaTunnel 结合?

v2-c91d1ac2e6951796d8ed35201d07bc4d_1440w

Apache SeaTunnel 与实时数据集成的结合方式如下:

在Apache SeaTunnel 中,实现实时数据集成主要存在两种途径。其一为针对数据库变更流的处理方式,Apache SeaTunnel 配备了一组与 CDC(变更数据捕获)相关的连接器。这些连接器的核心功能在于精准捕获多种常见数据库(如 MySQL、PG、SQL Server、Oracle 等)在日志层面所产生的变更流信息。

其二是关于事件流的处理机制,在这一方面,Apache SeaTunnel 借助 MQ(消息队列)相关连接器来承担数据读取任务。例如,对于 Kafka、PaaS 等类型的消息队列,能够从中有效摄取事件流数据。

而在实时数据写入环节,由 Sink 连接器负责具体的操作。在此过程中,主要抽象出了四种关键的数据操作类型,分别为 INSERT,即向源库或存储中原有的位置插入新数据;UPDATE_BEFORE 和 UPDATE_AFTER,用于表示源库中数据发生更改的前后状态;以及 DELETE,即删除数据操作。

通过对这些数据处理行为的抽象化处理,Apache SeaTunnel 能够有序地从源库中捕获上述各类数据操作行为,并将其准确无误地存储至目标存储位置。

在数据读取与写入的过程中,实时数据流的交互依赖于统一的连接器 API。这种设计使得Apache SeaTunnel 在处理实时数据集成时,其相关操作能够如同普通连接器一般,在其他平台上稳定运行。目前,Apache SeaTunnel 支持在 Zeta 引擎以及 Flink 引擎上执行实时数据集成任务。

上手体验

在前面的背景介绍基础上,下面进行一个简单的 Demo。

1.安装 -下载安装

v2-851f63569c94de30fe3418d0aa364ae8_1440w

在进行Apache SeaTunnel 的下载安装时,鉴于其对大量插件提供支持这一特性,其官方提供的下载包并不会涵盖所有的连接器插件。主要有两方面原因:一方面是出于对安装包体积的考量,若将所有插件都囊括其中,会导致安装包过大,不利于下载与存储;另一方面则是受到相关 License 授权因素的限制,部分插件可能因授权问题无法直接集成在下载包内。

因此,在用户成功获取安装包后,需要手动对插件文档及配置文件进行编辑操作,以便从中筛选并保留自身所需的插件。完成插件筛选后,需执行相应的插件安装脚本,以确保所选插件能够正确安装并集成到系统中。

特别需要注意的是,若在使用过程中涉及到数据库的连接器,用户还需将对应的数据库驱动程序放置在Apache SeaTunnel 安装目录下的 Lib 文件夹内。只有依照上述步骤逐一操作,才可以顺利完成Apache SeaTunnel 的下载安装流程。

2. **安装 -从源码编译安装**

v2-d0ab16f24be8f0915bfce04d374eccd3_1440w

此外,还可选择从源码进行安装。在采用源码安装的方式时,其主要的优势在于能够直接构建一个包含所有插件的完整安装包。这一过程是借助 dist 模块来实现组装的。与通过下载安装的方式相比,从源码安装避免了因下载安装包而产生的等待时间,在一定程度上提高了安装效率。

3. **编写任务配置文件 - CDC + DDL 同步(MySQL)**

v2-257359e4c5a1e3b58b736cae9ad2fbe4_1440w

完成安装操作后,便可着手进行任务配置的编写工作。当前存在一种任务描述语言,而任务配置模板主要由四个关键部分构成。

其一为 ENV 部分,其主要功能是对整个任务的运行环境进行设置,其中涵盖了并行线程数的设定以及任务类型的确定。在任务类型方面,既支持批处理任务,也支持流处理任务。

其二是 Source 部分,其作用在于指定数据读取所使用的连接器插件,并针对每个插件进行相应的个性化配置,因为每个插件都具有其独特的配置要求,这些配置与插件自身的特性紧密相连。

在完成 Source 部分的指定后,便可进入 Transform 部分的设置。若存在数据转换需求,可依据具体情况选择合适的 Transform 插件来执行数据转换操作;若无需进行数据转换,则可直接跳过此部分。

最后是 Sink 部分,在确定了 Source 之后,需要通过 Sink 插件来指定数据的输出存储位置。例如,在示例中,采用了 MySQL 的 source 进行数据读取,并将读取的数据写入到基于 JDBC 的 MySQL 中,在此过程中还开启了 DDL 同步功能,并且额外添加了一个转换操作,即为所有表新增一个时间列,此示例即为一个关于 CDC + DDL 同步的演示案例。

4. **提交执行任务**

v2-6ac9f4ff8c84e13bdd9abd69ca0e8bbb_1440w

在完成配置文件的编写工作之后,接下来的步骤便是提交并运行相应任务。在此过程中,涉及到一个名为 seatunnel.sh 的命令客户端。该客户端的主要参数包括两个方面:一是需要指定任务配置文件的具体路径,以便其能够加载并读取任务的描述信息;二是要确定任务的运行模式,即选择远程运行还是本地运行。目前,该系统支持两种运行模式,其中一种为 Local 模式,在此模式下,任务将在当前共享进程中直接运行;另一种则是可将任务提交到远程集群上进行异步运行。

03

挑战与解决思路

接下来将深入探究在安装和使用过程中所面临的各种挑战,以及相应的解决策略。

1. **连接器插件架构**

v2-9d35254b8d41a587b2e9d40f796f5b38_1440w

首先是关于连接器的插件架构方面。目前,Apache SeaTunnel 所支持的数据源将近百种,为了使这些数据源能够相互进行数据同步,Apache SeaTunnel 必须构建一套具有极高抽象程度的 API。该 API 的作用在于精准地描述各类存储之间的行为差异,从而实现对不同存储的统一处理,确保数据在不同数据源之间能够顺畅地同步。

v2-96eed5ac1d0fa91ebc7f1c57f837c96c_1440w

接下来,我们聚焦于该插件架构的逻辑结构。在其架构中,位于左侧的 source 插件 API 包含了三种子类 API。其一为枚举器,其主要功能是对原存储中的数据块进行划分,以便这些数据块能够在多个并发线程上运行,故而它是一种负责划分数据块的 API。其二是 Reader API,其职责是直接执行数据读取任务。其三则是状态保存 API。通过这三种 API 的有机组合,Source 插件 API 能够实现数据的拆分并行读取,并且可以随时进行状态的保存操作。

而在 Sink 方面,其同样具备类似的三类 API。其中包括用于状态同步的 API、负责具体数据写入的 API 以及最后的状态提交 API。在实际运行过程中,Sink 和 Source 通过彼此协同的状态,共同完成整个运行过程中的状态保存工作,以及数据的读取、写入与传递,这便是其逻辑结构的基本原理。

v2-ecf4c5f0563e7f357fc7ac6761040bd1_1440w

在实际运行过程中,其运行机制如下:当枚举器启动后,它会与并行的多线程读取器展开交互,将其拆分好的数据块分发给各个并行的读取器。随后,并行读取器读取数据,并将读取到的数据传递给 Writer 以执行数据写入操作。在 Writer 完成写入操作后,可能会留存一些状态信息。对于采用两阶段提交机制的连接器而言,此时可能会涉及到第二阶段的状态实际提交过程,从而存在一个从 Writer 到 Committer 的状态汇聚提交流程。以上便是连接器插件化架构的设计原理。

2. **轻量级 Transform**

v2-ee098cc9589e3d157dbcf6e0fd0bebf8_1440w

除了读写连接器插件外,Apache SeaTunnel 还具备一个轻量级的 Transform 功能。在其运行机制中,Transform 支持在 Source 与 Sink 之间进行串行化操作。具体而言,Source 所提供的数据会按照顺序从 Transform 队列的头部开始,依次流向队列的尾部,通过这一流程完成整个数据的转化任务。

该轻量级 Transform 主要作用于行级别。它能够实现行级数据的修改操作,例如对某些列进行删减或增加,从而改变整个表的结构。此外,它还具备处理轻量级 DDL 适配的能力。比如,当数据传输过程中某些列发生改变,且在此列上又出现了 DDL 处理时,Transform 能够进行相应的适配操作,确保数据处理的连贯性和准确性。

v2-8c7377d38b39dc8686df2d5a2353f68d_1440w

除了 Row 级别的适配功能外,还存在 Table 级别的适配机制。在数据读取过程中,采用的是一个任务能够运行多表读取的架构。在此架构下,Transform 并非必须对所有表都进行转换操作。当存在特定需求时,系统支持侧流操作。即对于那些不需要进行转换的表,数据能够直接跳过 Transform 环节流向目标端;而对于需要转换的表,则依旧按照常规流程,从 Transform 队列的头部依次流向队列的尾部,从而完成整个数据转换过程,以满足不同表在数据处理过程中的差异化需求。

3. **分布式快照**

v2-29e5cca0c9d2ea86f4d58d06f014cbe5_1440w

接下来探讨的是关于状态保存的相关内容。鉴于整个任务在运行时可能会在集群环境中运行,涉及多个节点同时执行,在此过程中极有可能遭遇错误、异常情况,甚至面临宕机及后续恢复等问题。

针对这些情况,在连接器的架构设计中,于 API 层面抽象出了一套完备的状态保存机制。此机制从枚举器起始,贯穿 Reader,直至 Committer,实现了对整个数据处理过程的状态保存。

尤为值得一提的是,这套状态保存的 API 已经成功适配了三个主流的执行引擎,分别为 Zeta 引擎、Flink 引擎以及 Spark 引擎,从而确保了在不同引擎环境下都能有效实现状态的稳定保存与可靠恢复。

v2-87746e202ac8ed1b842f5e30d8cb60d7_1440w

下面详细阐述状态保存的具体过程。以一个简单示例来说明,存在一个 Checkpoint Manager 专门负责状态管理工作。它会按顺序触发 checkpoint,这些 Checkpoint 以序号依次递增,例如已存在 CK1、CK2、CK3,当前正在触发的是 CK4。

其操作流程是,从整个任务的起始头部,也就是枚举器开始。当枚举器接收到 Checkpoint 信号后,会保存自身内部的状态信息,具体包括已读取的表的数量、尚未读取的表的数量以及正在读取的表的情况等,并将此状态信息反馈给 Checkpoint Manager。之后,枚举器会继续将信号传递给其下游的各个组件,对于配置了多线程的各个 Reader,它们在接收到信号后,同样会执行保存自身内部状态的操作,并将状态信息回传。接着,Reader 会将信号进一步传递到 Writer,Writer 也会进行相同的状态保存操作并回传信息。最终,所有的状态信息都会汇聚到 Committer,由 Committer 完成最后的确认操作,至此 CK4 的状态保存流程完整结束。

整个过程本质上是一个分发与计数的过程,通过这种方式能够确保信号经过的节点与最终反馈的节点保持一致,从而认定整个 Checkpoint 的状态保存是完整且有效的。一旦作业出现异常或暂停情况,就能够依据已成功保存的状态节点,即 Checkpoint 状态节点,随时进行恢复操作。这便是关于数据一致性的设计原理与实现方式。

4. **CDC **读取

v2-87746e202ac8ed1b842f5e30d8cb60d7_1440w

在 Apache SeaTunnel 体系内,CDC(变更数据捕获)的读取流程被划分为两个不同阶段。其一为快照读历史数据阶段,在此阶段,会针对主表逐一进行读取操作,并且对每个表实施 Split 切分处理,尤其是针对一些大型表,会通过切分将其转化为规模较小的数据块,以提升处理效率。其二是增量读 Binlog 阶段,在该阶段中,会读取所有表的日志信息,同时对那些不需要的表日志予以过滤,从而筛选出有价值的数据。

然而,在此过程中存在一个关键问题。由于数据库处于持续的更改状态,如何确保在数据读取过程中保持一致性,即既不多读数据,也不少读数据,成为了一个亟待解决的重要问题。

v2-a64d8666cdf73ef2be951e989fc48ae5_1440w

对于确保数据读取一致性的问题,主要从两个方向进行处理。首先是针对快照阶段的 Exactly-Once 机制。在快照阶段,完成 Split 划分后,在每一次对 Split 进行读取操作之前,均会记录低水位与高水位。当 Split 读取完毕后,立即比对所记录的高水位和低水位。若两者之间存在差异,这便表明数据库在此期间发生了变化。此时,就需要执行合并操作,即将已经读取到的 Split 数据与相应日志变更部分的数据进行合并。此外,还需对高低水位的情况进行汇报,以便系统能够及时掌握数据的动态变化情况,确保数据处理的准确性与一致性。

v2-7c627cfcc7338e9f4c4a2410468db79e_1440w

接下来阐述该合并过程的具体操作方式。在Apache SeaTunnel 中,通过构建一个内存表来实现这一过程。首先,将 Split 数据缓存至该内存表内,随后对高水位至低水位之间的数据变更操作进行重放。在重放过程中,依据数据的键(key)以及操作行为来对内存表中的数据进行合并处理。例如,若遇到 Insert 类型的操作,新数据将覆盖原有数据;若为 Delete 类型的操作,则相应的数据会被删除。通过采用这样的处理方式,能够确保在一个 Split 读取操作完成后,数据既无丢失情况发生,也不存在重复的问题,从而有效地保证了数据的完整性和准确性。

v2-6b0233881f39a8b46d7222a0360ff3da_1440w

除了在快照阶段需要执行上述处理操作之外,在增量阶段同样需要进行与之类似的 Exactly-Once 处理。

鉴于增量阶段涉及从某个特定启动位置开始读取并过滤数据,在此过程中,首先会从所有 Split 的高低水位当中筛选出一个最小的高水位以及一个最大的高水位,进而针对这两个水位所界定的区间数据开展过滤回捞操作。

具体而言,其回捞的实际过程是舍弃掉每一个 Split 的高水位与低水位之间的数据,而着重获取 Split 与 Split 之间间隙的数据,通过这样的操作流程,能够捕获到在 Split 处理快照阶段未曾覆盖到的数据,从而保障整个数据处理过程的完整性与准确性,实现 Exactly-Once 的处理要求。

v2-88f09041ac8b40cda37a9549039caf2a_1440w

通过前文所述的 Exactly-Once 处理方式,能够确保数据在读取过程中保持一致性,即实现不多读、不少读的精准数据获取。

此外,在 CDC(变更数据捕获)的应用场景中,对表的动态加减有着强烈需求。在实际业务中,我们常常会遇到这样的情况:当任务已经启动运行后,由于业务需求的变化,需要突然添加或删减某些表。在Apache SeaTunnel 中,当前支持一种半自动化的表动态加减操作流程。具体来说,当需要进行表的增减操作时,首先要对任务的状态进行保存,然后停止正在运行的任务。接着,对任务配置进行修改,在配置文件中相应地删减或添加表。完成配置修改后,再次启动任务,此时任务将从上一次保存的内存状态中恢复运行,从而保证任务的连续性以及数据处理的准确性。

v2-8c386b3857bc997f9641e23e9c881673_1440w

在执行动态加表操作后的恢复过程中,首先会进行流程回退。这是因为在加表之前,任务可能已经运行至增量日志状态。为了确保新添加的表能够完整地融入数据处理流程,任务需要先回退至快照状态,以便读取新加表的所有历史数据。当新建表的历史数据读取完毕后,便进入流程复位阶段。此时,任务会回到暂停前的日志位置,继续执行增量数据读取操作。在这个过程中,同样会重复之前的 Exactly-Once 处理步骤,对已经处理过的数据进行过滤,以保证数据的准确性与完整性。这就是动态加表后的恢复流程。

5. **CDC DDL **同步

v2-db232cb8baa278e1879ff13d0b9a5a93_1440w

在 CDC(变更数据捕获)过程中,另一个较为关键的挑战在于 DDL(数据定义语言)的同步。这其中主要存在两个问题:一是存在数据流与结构流混合的情况,即 DDL 操作指令混杂在数据流之中;二是 DDL 的实现与每种数据库的具体实现机制和语法紧密相关,不同数据库在 DDL 的执行方式、语法规则等方面存在显著差异。

v2-bb0646a2d55d180e8a833feafdcb42ce_1440w

因此,在Apache SeaTunnel 中实现 DDL 同步的首要步骤是进行语法解耦。具体做法是抽象出一套通用的事件结构。例如,若有一个在 AB 表中添加列 c 的操作,在Apache SeaTunnel 中会将其抽象为一个事件结构,即 “在已有的 AB 表上添加新列 c”。通过这种对事件的抽象处理,后续的整个处理流程便能够基于这种通用的事件结构展开,而无需过多考虑不同数据库的特定语法差异,从而实现更具通用性和一致性的处理过程。

v2-607cb027d2420921e16770e4383da89a_1440w

此外,还会针对 DDL 的结构流和数据流实施拆分操作。具体而言,会在 DDL 操作之前插入一个 Before 信号,同时在其之后插入一个 After 信号。

由于 DDL 的变更往往关联着数据存储结构的改变,这种改变极有可能致使老数据出现无法写入、新数据写入顺序错乱等问题。鉴于此,为了确保数据的正常写入与处理,必须要对 DDL 操作及其相关的数据操作进行隔离。在这一过程中,具体做法是为每一个连续的 DDL 操作在其前后分别插入 Before 和 After 信号,以此来实现有效的隔离,保障数据处理流程不受结构变更的不良影响。

v2-8cfdce60e7d4f6852e07392cce111373_1440w

Before 信号会执行预先处理操作。首先,其会暂停整个读取数据的流程,使得整个读取数据的过程停滞在此处。随后,依靠 Checkpoint 机制来推动 Before 信号的处理工作,该信号会按照顺序从 Source 依次传递至 Sink,将所有内存中的数据全部刷出并存储至目标存储位置,以此确保整个内存处于清空状态。

与此同时,还会暂停对整个 Checkpoint 的超时监控。这是因为后续要执行的 DDL 操作可能会耗费较长时间,而这一时间长度往往受限于目标存储的实际情况。部分数据库由于存储的数据量较多,在执行整个变更过程时,极易出现任务因超时而被迫中断的情况。所以,为避免此类情况发生,会预先对 Checkpoint 超时监控进行暂停操作,以上便是前置 Before 信号的相关处理流程及缘由。

v2-94a06f4d76951f81af1ef12692f7d106_1440w

前置信号处理完毕后,便会正式执行 DDL 的变更操作。从内存节点的流向视角来看,首先,DDL 变更指令会抵达 Source 节点,在此处,Source 中的数据结构会发生改变,比如会真正添加一列 c 。接着,该指令会沿着数据管道继续流向 Sink 节点。在 Sink 节点,同样会执行相应的更改操作,即在 Sink 的内存结构中添加 c 列。最后,这条 DDL 变更指令会流向最终的外部存储,促使外部存储也完成相应的 DDL 变更,以确保数据结构在整个数据处理链路中的一致性更新。

v2-67af44b4baa0eb5ad039eb1b68bb56f1_1440w

在 DDL 执行完毕后,紧接着会出现 After 信号。After 信号的主要作用是对该时间点所有节点的内存状态进行保存。这一操作主要是为后续可能出现的异常恢复情况做准备。由于 DDL 涉及数据结构的改变,这与普通数据处理不同,它不能像常规数据处理那样随意中断并恢复,而是必须依据信号切换的间隔来进行恢复操作,只有这样,才能保证在恢复后,老数据不会出现问题。

当后置的 After 信号处理完成后,整个 DDL 状态变更过程的状态得以保存,变更过程也宣告完成,此时便可以进行下一步的正常恢复处理,即恢复到 DDL 变更前的正常数据处理流程。

v2-c7fa2644d9229e4a48e96fc869bfc37a_1440w

首先,需退出数据读取空转机制,使得数据能够从 Source 端开始正式向后继续读取。随后,要恢复之前关闭的 Checkpoint 超时监控功能。在正常的数据读取过程中,会存在一些 Checkpoint 超时监控处理流程,这与 DDL 执行时的情况不同,在正常数据读取时,不会像 DDL 执行那样需要等待较长时间 。

6. **CDC **资源优化

v2-cccfb68a9b8769a4261a69dc233077dc_1440w

在 CDC(变更数据捕获)过程中,除了 DDL(数据定义语言)变更这一关键环节外,还有一个较为重要且常见的现象。在初始阶段,我们往往需要同步海量的历史数据。

此时,所需的资源量较大。为了提高数据读取效率,我们可能会开启多个并行线程进行数据读取操作。每个线程不仅会占用数据库的客户端连接,以便与数据库进行数据交互,同时也会占用集群内部的运行资源,例如计算资源、连接资源等,以保障数据读取任务的顺利执行。

v2-410af51c34ede475ce563655dcf731a7_1440w

当进入到增量阶段后,数据量相较于历史数据阶段通常会有所减少,不再有历史数据那般庞大的规模。而且,增量阶段采用的是单并行的读取方式,在此情况下,诸多并行资源便处于空闲状态,其中涵盖了数据库连接资源也处于闲置未被使用的状态。

Apache SeaTunnel 针对这一情况,引入了运行时的 idle 状态管理机制。通过这一机制,会对正在运行的任务组进行状态判断,对于那些暂时没有数据读取操作的任务组,便会将其标记为 Idle 状态,以便后续能够依据此标记对资源进行合理调配与管理。

v2-7b97c519a838a0c12487454a73b9a39e_1440w

进入 Idle 状态的任务组,将由状态管理器接管驱动,并对其进行资源回收。最终,仅保留正常读取 Binlog 的任务组,此时,整个系统将仅持有一个与 CDC 相关的数据库连接,以此实现对连接资源的优化配置 。

7. **CDC **写入

v2-b4dd0eac58008d8c5c6998a9761c25eb_1440w

如前文所提及,在 CDC(变更数据捕获)中存在并行读取的情况,实际上,在写入端同样会进行并行处理。例如,对于包含同一个 ID 的插入(insert)、更新(update)和删除(delete)操作,以及多个不同 ID 之间混合的数据流,在进行并行化处理时,会针对同一个 ID 的数据进行分区。通过这种方式,确保同一个 ID 的数据能够维持其原有的顺序。而多个不同 ID 的数据,则可以在保证数据完整性和一致性的前提下,以并行化的方式进行写入操作,从而提高数据写入的效率。

v2-cb1fdc7b5569cf39d07e63dfe7839ae5_1440w

在数据写入过程中,确实会面临 Exactly-Once 问题。部分连接器能够支持两阶段提交机制,借助 Checkpoint 进行状态保存与提交操作,在这种情况下,数据处理较为顺畅。然而,并非所有存储系统都支持两阶段提交。

为此,系统还提供了对 Upsert(插入或更新)处理的支持。若目标存储支持 Upsert 操作,便可以将插入(insert)和更新(update)操作统一转换为更新(update)进行处理。

需要注意的是,有些数据库或存储可能并不支持 Upsert 操作。针对这种情况,系统具备一种兼容机制,即先对存储的特性进行查询,再据此决定具体的写入行为。诚然,这种方式可能相对低效,但其优势在于能够兼容各类存储在功能上的差异,从而确保在不同存储环境下均能实现 Exactly-Once 的写入效果,保证数据的一致性与准确性。

8. **多表同步**

v2-e5e967e939692e98437f482d9e3280d7_1440w

此外,关于多表同步,如前文已提及的连接优化问题,其中一个优化方向是在任务层面进行连接优化,而另一个方向则是针对读取资源的连接优化。若一个任务能够支持多表读取,那么就可以复用连接,提高连接的使用效率。

因此,在Apache SeaTunnel 中进行了多表读取的设计。在数据源(source)端,能够支持多表混合的数据管道。其会逐个读取各表的数据,并且每个数据都会附带其所属表的 ID。在数据接收端(sink),也具备所有表的数据结构,这样数据就能与相应结构进行匹配,从而执行多表的读写流程。

不过,在 Sink 端,编码逻辑实际上始终是面向单表的。所以在运行阶段,单表的 Sink API 会根据 Source 传入的多表数据进行迭代,动态生成多个子任务(subtask-sink)。这种方式一方面降低了编码难度,另一方面在资源规划上也实现了更好的优化,例如可以实现动态任务的启动、关闭,或者采用延迟加载等策略。

v2-462d336c1a2ba2ef19968b7d8f235220_1440w

在数据源(source)端,如前所述,会对主表进行分片处理 ,随后将每个分片数据发送至读取器。在读取器中,会记录每个数据所属表的 ID。经由这一流程,多个表的数据能够在同一管道流中进行传输。直到数据抵达数据接收端(sink)时,系统才会依据数据所携带的表 ID,识别其具体的表结构,进而执行相应的写入操作。

v2-4d6286eeb133ccca896ab0839ebde6f3_1440w

在数据接收端(sink),系统会针对数据源(source)传递过来的静态表结构进行表的划分操作,为每个表初始化一个独立的 sink 实例。值得注意的是,每个表在进行数据处理时也支持并行处理模式。

从整体逻辑结构来看,对表的处理过程存在一种运行时的抽象概念,我们将其称为 “Table Writer”。它具备处理多个表同时进行并行数据写入的能力,其中的工作线程(worker)是被所有表共享的,即所有表的数据都可以通过这些共享的 Worker 进行写入操作。

另外,在状态提交方面,也进行了多表融合的抽象处理。会有一个主提交管理器负责托管所有的子任务(sub task),也就是各个表的提交操作,最终通过这个管理器来实现主表的提交工作。

通过上述一系列设计与处理方式,在整个 Sink 端能够实现资源的灵活缩放调配,以及连接资源的有效共享,从而提升系统在多表数据处理过程中的性能与效率。

9. **一读多写**

v2-0eb5d96840db5d00b625bb30483a7cda_1440w

另一个在进行 CDC 数据读取时常见的需求场景是,需要将一次读取的数据同步到多种存储介质中。例如,在将数据写入数据库(DB)的同时,把日志记录备份到消息队列(MQ),或者同步导出到文件等。在Apache SeaTunnel 中,为满足这类需求,在数据源(source)和数据接收端(sink)之间设计了一个共享的内存消息队列。在这种架构下,sink 不再直接与 source 建立连接,而是与该内存消息队列(Q)进行连接。

通过这种方式,从 Source 发布的数据能够通过 Q 共享给多种不同类型的 Sink,从而避免了频繁从 Source 读取数据,减少了重复的数据拉取操作,实现了性能优化。然而,这种设计也存在一定的缺陷。由于所有 Sink 共享同一个 Q,Sink 的写入速率可能会相互影响,整体性能会受到写入速率最低的 Sink 的限制。

10. **连接器 ClassLoader 隔离**

v2-2533aba3f9c2ce98f33fce5e739502fb_1440w

在前面探讨连接器插件架构时,可以发现针对各类数据存储,我们抽象出了一套通用的 API。这意味着在集群环境中,能够自由地组合不同的数据存储来提交任务。然而,这种灵活性可能引发两个层面的类加载器(classloader)冲突,这在 Java 应用中是一个值得关注的要点。

一类是任务(task)级别的冲突。例如,数据源(source)和数据接收端(sink)自身存在类加载器冲突,这可能源于某些软件包的版本不一致问题。比如,针对同一数据存储的多个不同版本软件包,它们之间可能存在冲突,进而引发 Task 级别的冲突。另一类是作业(job)级别的冲突。假设不同的 job 之间存在冲突,而这些 Job 又需要在同一个集群上运行。

在Apache SeaTunnel 中,针对上述问题实施了一层类加载器隔离机制。对于数据源 Source 及其在运行期间所扮演的角色,我们抽象出一个数据源类加载器(source classloader)。该类加载器主要包含数据源的连接器软件包、对应的数据库驱动,以及转换(transform)连接器的软件包。实际上,它与读取器(reader)采用相同的类加载器,这样能够确保它们加载共享的类。

在数据接收端 Sink 方面,使用了另一个独立的 Sink 类加载器(sink class loader)。通过这种方式,实现了 Sink 与 Source 之间的隔离,从而避免二者之间产生冲突。而且,这个类加载器属于作业(job)级别,具有特定的软件包路径特征。因此,不同的 Job 之间可以通过合理配置,实现类加载器资源的共享。

通过对 Sink 和 Source 的类加载器进行隔离,能够有效解决 Job 之间因 JAR 包引起的类加载器冲突问题,以及 Task 之间的类冲突问题。这使得整个集群能够兼容各种不同类型的数据存储,并顺利执行相关任务 。

11. **类型映射与自动建表**

v2-9ad84d6d2489d354ff093c4dd6bf3d48_1440w

在数据同步进程中,另一个极为关键的议题是结构同步。为此,构建了一套抽象的类型体系以及建表 API。该体系和 API 主要用于将原始存储中的数据类型进行提升,使其转换为我们所定义的抽象类型。

在此过程中,数据源(source)负责将这些抽象类型传递至数据接收端(sink)。随后,Sink 会依据建表的具体需求,把接收到的抽象类型再映射为目标存储所适配的类型。通过这一系列连贯的操作流程,实现了异构类型之间的同步,确保了不同数据存储系统在结构层面的一致性与兼容性 。

v2-c3a618403a2fc032e2aa347b3028bb38_1440w

此处以从 MySQL 到 Oracle 的数据同步过程为例加以说明。

首先,在数据源(source)端,会对数据表的结构进行加载操作,进而组合形成一个抽象的 “Catalogtable”。该 “Catalogtable” 中抽象了ApacheSeaTunnel 的数据类型,此即为数据类型的提升过程。其中,数据的每一行(row)同样也是由抽象类型所代表,体现了抽象的表结构。

当这一抽象的表结构传输至数据接收端(sink)后,便会开展相应的转化工作。例如,会依据 Oracle 的转换器(converter)进行反向转化,对该表结构实施本地化适配处理,之后再进行建表操作。不仅如此,其数据流到达 sink 端时,也会按照目标存储(此处为 Oracle)上的具体数据类型进行二次转化。不过,并非所有的数据类型都存在这样的转化需求,通常主要是一些特殊类型才可能涉及到数据层面的转化操作。

v2-8fbb9895c133533f1106e6f87d004591_1440w

在数据同步过程中涉及建表的相关环节,我们构建并实现了一系列用于通用行为的 API。具体而言,这些 API 涵盖了多种不同的建表策略,例如:其一,可实现对所有表结构进行全部重建;其二,当目标表不存在时,执行新建操作,而若目标表已然存在,则直接忽略该操作;其三,倘若目标表不存在,直接触发报错机制。

与此同时,针对数据处理方面,同样也抽象出了若干通用行为。比如,能够删除目标存储内的数据;或者仅采用追加写入的方式,不对目标存储的数据做额外处理;又或者提供自定义的处理过程,该过程可由每个连接器负责实现,其具体形式既可以是一段 SQL 语句,也可以是一段脚本,最终由相应的连接器来完成加载并执行相关操作。

04

未来展望

在最后一个章节中将介绍Apache SeaTunnel 未来的发展规划。

v2-11ef469a29dc37821852d77f482a08b1_1440w

首先,在引擎方面,当前支持实时 CDC(变更数据捕获)DDL 同步的引擎为 Zeta。而在未来的规划中,有可能会引入 Flink 来实现 DDL(数据定义语言)的同步功能,同时也会对 Spark 在 CDC 同步方面的应用展开探索,探寻其应用方向与潜力。

其次,在连接器层面,会进一步扩充现有的 DDL 处理功能。就数据源(source)读取这一环节而言,目前已经支持从 MySQL 中读取 DDL,也能够实现从 Oracle 中读取 DDL。在数据接收端(sink)方面,现阶段已涵盖了基于 JDBC 的 MySQL、Oracle 等相关组件。不过,这一范围仍处于持续扩充之中,当前计划对大家常用的组件进行拓展,预计会有将近二十个左右的 sink 组件,并且这些组件很有可能会逐步增加对 DDL 写入的支持功能。

除此之外,还会持续强化在消息队列(MQ)处理 CDC 数据方面的功能。因为除了直接从数据源读取数据后传输至数据接收端这种常规模式外,在部分场景下,中间可能需要通过 MQ 进行缓存处理。所以,针对从 MQ 中同步 CDC 数据这一环节的处理能力,会不断加以增强。甚至在未来,还可能会支持从 MQ 中直接进行 DDL 的同步操作。

再者,就转换(transform)功能而言,目前已经有一部分开始支持 DDL 的适配工作了。在未来,有可能会继续扩充关于 DDL 的适配范围,并且还会专门衍生出一类专注于 DDL 处理的 transform,用于对 DDL 进行编辑操作。另外,关于分库分表相关的 transform 功能推进工作也会持续开展下去,以不断完善和拓展Apache SeaTunnel 的功能体系。

以上就是本次分享的内容。

v2-c3fb979aedf9e88a8e905aae286300bd_1440w

如果对Apache SeaTunnel 抱有兴趣,欢迎大家进行试用,也欢迎加入社区贡献自己的力量。另外,如果大家想要参与其他有关Apache SeaTunnel 的讨论,可以前往我们的邮件组,也可以通过其他诸如论坛、公众号等渠道,与众多志同道合的伙伴一同展开交流探讨。

以上就是本次分享的内容,谢谢大家。

白鲸开源

白鲸开源是一家开源原生的 dataops 商业公司,由多个 Apache Foundation Member成立,80%员工都是 Apache Committer,运营2 个全球 Apache 开源项目(DolphinScheduler, SeaTunnel),同时根据全球最佳实践发布商业版版本WhaleScheduler和WhaleTunnel。我们致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

社区介绍

Apache SeaTunnel是一个云原生的高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会Apache Software Foundation正式宣布Apache SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达 8k+,社区达到6000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等130多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。

白鲸开源已基于Apache SeaTunnel 开发的并推出了商业版软件 WhaleTunnel,提供企业级功能增强、服务、运维、Debug、定期漏洞扫描和修复,无论是产品功能、稳定性、兼容性、速度还是安全性,都比开源版 Apache SeaTunnel 有巨大的进步!