大家好,我是范佳,是 Apache 社区的 PMC member。今天给大家分享一些基于 Apache SeaTunnel 二次开发的内容。
这部分内容主要涉及代码层面的知识,如果大家有什么疑问,欢迎来社区找我交流!
大部分数据开发工程师基于 Apache SeaTunnel 的二次开发,可能做的就是任务提交,任务的一些监控,还有在任务没有跑起来之前,我们可能需要预先知道跑起来之后的可能一些结果。
基于以上内容,我将从五个部分来分享相关内容:
SeaTunnel 介绍
SeaTunnel 的启动能力
任务监控的定制化
行为预览与 Sink 的关联
Apache SeaTunnel 是一个高性能的实时和离线数据批处理平台,自加入 Apache 软件基金会以来已有两年多时间,期间社区不断发展,增加了许多新功能和特性。
感兴趣的同学可以访问官网:https://seatunnel.apache.org/
SeaTunnel 支持多种数据处理引擎,包括市场上流行的开源引擎如 Spark 和 Flink,以及 SeaTunnel 自研的 Zeta 引擎。这使得 SeaTunnel 能够灵活应对不同的数据处理需求,无论是大规模数据集还是实时数据流。
项目提供了广泛的连接器支持,使得 SeaTunnel 可以轻松接入各种数据源和目的地,从而简化了数据集成过程。这一特性对于需要将数据从多个源汇总到单一系统的企业尤为重要。
对 HTTP 的支持是 SeaTunnel 的又一亮点,特别是对于开发者来说,因为它可以显著降低适配成本。通过 HTTP 支持,开发者可以更容易地将 SeaTunnel 集成到现有的 Web 应用和服务中。
SeaTunnel 的流批一体功能确保了无缝的数据处理,无论是流处理还是批处理,都能在同一个平台上高效运行。这一特性简化了架构,减少了维护的复杂性。
作为一个数据同步引擎,SeaTunnel 提供了流速控制功能,这对保护下游系统不被过载非常关键。尤其是在上游数据量大而下游系统承载能力有限的场景中,流速控制显得尤为重要。
自动建表功能可以极大地帮助简化数据处理流程,特别是对下游系统来说。这一功能允许 SeaTunnel 根据数据自动创建表结构,减少了手动介入的需要,提高了数据管道的灵活性和效率。
一般来讲,我们基于开源软件二开,第一步就是启动,而启动 SeaTunnel 任务的第一步是准备用户界面,确保二开后的用户可以通过界面触发或定时提交任务。
一旦用户界面设置完成,以下是使用 Shell 脚本提交任务的基本步骤:
编写 Shell 脚本:创建一个 Shell 脚本,用于封装启动命令和任务参数。
执行命令:通过执行 Shell 脚本来提交任务到 SeaTunnel 引擎。
在任务提交时,我们的引擎会返回一个任务 ID。
这个 ID 在使用脚本模式启动时只会打印在日志文件中。如果需要监控任务,需要解析日志文件以获取任务 ID。
然而,这种方式比较滞后,因为 ID 是引擎端生成的,可能需要等待一段时间才能得到。
为了解决这个问题,我们新增了一个功能,允许在提交任务时配置自定义 ID。这个 ID 可以由第三方服务或集成 SeaTunnel 的平台生成,然后传递给 SeaTunnel,SeaTunnel 会使用该 ID 作为任务的唯一标识。
这项功能虽然小,但对于二次开发或集成非常有用,避免了解析日志或等待 SeaTunnel 生成 ID 的过程。
通过 Shell 脚本启动任务时,可以在日志文件中获取任务 ID。
我们也支持通过 HTTP 提交任务。这种方式无需额外启动客户端,对第三方集成更加友好。
HTTP 提交任务的方式更加自然和通用。
对于更深度、精细化和功能更强大的任务提交方式,推荐使用 SeaTunnel Client。
SeaTunnel Client 是一个核心类,通过它可以提交所有任务。无论是引擎内部代码还是外部集成代码,都可以使用这个 Client 提交任务到集群。
通过 SeaTunnel Client,我们可以在 JVM 进程中直接提交任务。例如,在一个 Spring 服务中,用户点击启动按钮后,后端可以直接使用 SeaTunnel Client 提交任务,而不需要启动一个额外的 HTTP 或 Java 进程。
这种方式的好处包括:
及时响应任务状态:任务的状态可以通过回调机制及时返回客户端。
异步操作:任务执行时,客户端会返回一个 Future,可以通过 Future 进行异步操作。例如,任务结束时注册回调函数来处理业务逻辑。
深度集成:这种方式使 SeaTunnel 与二次开发的平台集成更加深度和紧密。
启动任务后,我们需要对 SeaTunnel 进行监控,以了解任务的状态。
例如,任务是否启动成功?运行了多久?数据是否成功读取?任务是否失败?失败的原因是什么?这些都是二次开发时需要关注的内容,因为我们不能保证所有任务都能正常运行。
我们可以通过以下三种方式监控任务状态:
Shell 脚本:通过 Shell 脚本查看所有任务的状态,例如任务是正在运行、失败还是完成。
HTTP:通过 HTTP 接口获取任务状态,例如任务是否失败以及失败原因。
SeaTunnel Client:SeaTunnel Client 不仅可以提交和取消任务,还可以查询任务状态。
比如说下面的截图,这个就是一个 job result
然后这个 job result
也是我们 SeaTunnel client 返给我们的,然后我们就可以看到里面的状态。
如何调用 SeaTunnel Client?
传入任务 ID 即可获取任务状态,任务是正在运行还是失败。对于集成开发来说,获取任务异常信息非常重要。如果通过 Shell 脚本查看日志,用户需要手动解析日志文件。
这在集成的 Web 页面中并不方便。因此,我们推荐通过 HTTP 或 SeaTunnel Client 获取异常信息。
除了监控任务状态之外,我们还需要有指标。
例如,任务虽然在运行,但它是否真正读取到了数据?读取了多少数据?写入了多少数据?吞吐量是多少?这些都是需要关注的指标。
SeaTunnel 引擎内部提供了对应的指标获取方式,有以下三种方式: 1.Shell 脚本:通过 Shell 脚本可以查询任务的各项指标。 2.HTTP:通过 HTTP 接口可以获取任务的各项指标。 3.SeaTunnel Client:通过 SeaTunnel Client 可以查询任务的各项指标。
我们可以监控的核心指标包括:
●读取数量
●读取的字节数
●QPS(每秒查询率)
●每秒字节数
●写入数量
●写入的字节数
对于 CDC(Change Data Capture),我们比较关心的是 CDC 的延迟,即从 CDC 源端的数据产生到 SeaTunnel 读取到它的延迟是多少。
目前,我们的支持是每个任务级别的,但对于每个任务中的每张表的支持还比较弱,因为 SeaTunnel 支持多表任务,即一个任务可以读取和写入多张表。我们正在改进这方面的支持。
除了查询指标外,我们还可以将指标定时对外暴露,例如暴露到 Prometheus 或 SeaTunnel 的指标体系中。
目前,SeaTunnel 对这方面的支持还比较弱,但我们希望在未来能更好地支持将指标对外抛出到第三方组件,如 Prometheus,这样对用户会更友好。
我们提供的默认指标可能不能满足所有用户或开发者的需求。那么,如何定制属于内部系统或二次开发系统的指标呢?
定制化指标集成实际上很简单。可以通过我们的 context 对象来实现。这个 context 对象包含一个 MetricsContext 对象,我们可以向其中注册自定义指标。
定义指标名称:这是一个字符串,代表指标的名称。
注册指标:通过 MetricsContext 对象注册自定义指标。
更新指标值:当需要更新指标值时,通过调用提供的方法将值更新到对象中。
这样就完成了定制化指标的集成,通过这种方式注册的自定义指标,可以通过 HTTP、Shell 脚本或 SeaTunnel Client 查询和展示。
除了指标外,如果需要一些瞬发性的事件处理,例如在某些事件发生时收到通知,可以使用 SeaTunnel 内部设计的事件系统。
事件示例
SeaTunnel 的事件系统可以处理以下事件:
Reader 打开和关闭事件
Task 打开和关闭事件
自定义事件 当这些事件发生时,SeaTunnel 会将事件汇总,并发送到 Master 节点进行处理。
后续我们会实现 DDL 事件的发送功能。社区正在开发的 DDL 功能主要是为了应对 schema 变化,例如在 MySQL CDC 运行过程中,schema 发生变化会产生 DDL 事件。
我们可以将这些 DDL 事件包装成对应的事件发送出去。外部系统可以接收到这些事件,比如某个表增加了一列或删除了一列,然后进行相应的展示或处理。这是事件系统的作用。
自定义事件
就像我们可以自定义 metrics 一样,事件也可以自定义。自定义事件的方式与 metrics 非常相似。用户可以实现自己的事件来处理特定业务需求。
在 metrics 中,可以通过 context 对象获取 MetricsContext。同样地,在事件系统中,我们可以获取 EventListener,然后通过它注册和处理自定义事件。
我们提供了对应的接口 EventHandler,它是一个 SPI 实现。用户可以实现自己的 handler,然后将其放到 lib 目录下,或者打包到应用中。
有了这个 handler 之后,Master 节点会发现所有的 EventHandler,并调用它们的 handle 方法。具体的事件处理逻辑由实现的 handler 决定。
我们内部提供了一个默认的实现:JobEventHttpReporterHandler。这个 handler 会将事件通过 HTTP 接口发送到用户配置的地址。
用户可以通过这个接口接收引擎中的事件,例如任务开始、任务结束、数据到达等。
事件系统不仅用于捕获运行时的事件,还可以用于 DDL 事件。例如,MySQL CDC 运行过程中,schema 变化会产生 DDL 事件。我们可以将这些 DDL 事件包装并发送出去,外部系统可以接收到这些事件并进行相应处理,例如展示 schema 变化、执行后续操作等。
除了任务级别的监控,我们还需要关注集群节点的健康状况。作为一个集群系统,了解整个集群是否正常运作非常重要。这些信息可以通过 SeaTunnel Client 获取。
通过 SeaTunnel Client,我们可以获取到集群的一些健康信息。这些信息包括但不限于:
内存使用情况
GC 次数
RPC 操作延迟
RPC 请求次数
这些与性能和集群稳定性相关的信息能够帮助我们更好地监控和维护系统。
例如,我们可以通过 SeaTunnel Client 获取集群节点的健康状况,并在页面上展示出来。如果在 3 个节点的集群中,只有 2 个节点正常,我们可以通过接口判断并处理异常节点。
SaveMode 与 Sink 密切相关,决定了在写入数据之前执行的一系列操作。这些操作包括自动建表、表重建、数据清空或数据追加。
通过配置 schema_save_mode 和 data_save_mode,可以定义这些行为。
schema_save_mode:定义如何处理 schema。例如,是否重新创建 schema,或者在不存在时创建。
data_save_mode:定义如何处理数据。例如,是否清空数据,或者追加数据。
我们预览的核心是 SaveMode 到底会怎么操作。这一块是纯代码层面,如果要集成的话,肯定需要写代码。虽然不像 HTTP 那么简单,但它非常有用。
例如,我现在任务还没开始跑,或者即将定时运行。我想知道在配置了表重建的情况下,任务到底会创建表还是不会创建表。在任务运行前,我们可以通过行为预览确定 SaveMode 和 data SaveMode 的行为。这对于涉及表操作的情况尤其重要,因为表操作可能比较敏感。
比如说我们从 source 端读取的是 MySQL 的表,MySQL 表在二次开发中可能会涉及到一个 CatalogTable。
我们会将外部系统的表抽象成内部统一的 CatalogTable。例如,从 MySQL 读取一张表,然后转换成系统内部的 CatalogTable。
如果任务配置读取表 a,我们可以通过页面上的一些操作,预览表 a 的输出结构。
具体步骤如下:
获取 TableSourceFactory。
将配置传入,构建 TableSource。
通过 source 提供的方法获取 productCatalogTable。
这种预览在任务还没有真正跑起来时就可以执行,确保任务读取的表结构是正确的。
例如,我们有 SQL 作为 transform 操作,希望在 SQL 中改一个字段的类型,同时增加和删除一些字段。
预览功能可以在任务运行前确认这些操作是否会如期执行。
具体步骤如下:
获取 TransformFactory,构建 transform。
将配置传入,并传入 source 端生成的 CatalogTable。
获取 transform 输出的表结构,确认 transform 操作是否正确。
从 transform 输出的表结构,需要传入 Sink 进行写入操作。涉及到自动建表时,我们可以通过 SaveModeHandler 确认以下内容:
Sink 是否需要建表?
表名是什么?
字段有哪些?
SaveMode handler 会根据 schema_save_mode 和 data_save_mode 配置,以及 catalog 中的表判断是否需要建表。
当我们具体操作 Catalog,比如说 Catalog 有一个 exist 的方法去判断我们的 table pass ,根据我们的 schema_save_mode, data_save_mode 去判断我们的接下来的这一块的行为到底是什么?
SaveMode Handler 提供了能力,例如:
判断当前的 schema_save_mode 和 data_save_mode 配置。
处理表的路径(TablePath)。
调用 catalog 提供的方法判断表是否存在。
通过 SaveModeHandler 提供的能力,可以预览和确认任务在运行时是否会创建表或进行其他操作。
我们执行 SQL 时,可以提前看到将要建表的 SQL。例如:
对于某些用户来说,SQL 可能比较敏感。
预先知道 SQL 是否合理,如果不合理,可以在基础上修改后手动创建。
我们提供了 Catalog 预览功能,调用 preAction 方法可以预览建表或删除表的 SQL。
在建表时,输出表结构的类型非常重要。我们需要知道内存中看到的类型在自动建表时会被建成什么类型。
为此,SeaTunnel 内部有一套叫 TypeConverter 的接口体系。
类型转换:将 SeaTunnel 的类型转换成数据库的类型。
反向转换:读取表时,将数据库的类型转换成 SeaTunnel 的类型。
通过 TypeConverter,我们可以预览并确认 SeaTunnel 和数据库之间的字段类型交互。例如,通过转换和反向转换,我们可以知道表字段类型在 SeaTunnel 和数据库之间的具体表现。
在行为预览中,我们可以通过 TypeConverter 接口体系实现类型转换的预览。预览与实际运行时的转换结果一致,因为实际运行中也是通过这套代码进行类型转换。
通过集成 Type Converter 接口,我们可以在预览时确认建表的具体类型。
例如:
将 SeaTunnel 类型转换为数据库类型。
读取表字段时,确认数据库字段类型在 SeaTunnel 中的表现。
今天给大家主要分享了以下内容:
多种启动方式:包括三种主要的启动方式。
指标获取:如何获取指标信息。
错误信息获取:如何获取并处理错误信息。
自定义指标:如何创建和获取自定义指标。
事件系统:如何创建、读取和处理事件,包括自定义事件和现有事件的读取。
系统相关信息获取:如何获取集群和节点的健康状况等信息。
数据结构预览:如何预览数据结构、建表 SQL 和外部系统的交互类型。
SeaTunnel 内部已经实现了许多功能,通过集成这些功能,可以实现更高效、更兼容的二次开发。
希望这些接口和设计能让大家在集成和二次开发时更加简单和高效,欢迎大家基于这些标准化体系进行扩展,并将实现的功能回馈给社区,使 SeaTunnel 更加丰富和强大。
通过本文的分享,能够帮助大家对 SeaTunnel 的二次开发有更深入的了解。如果大家有任何问题,欢迎随时与我交流。谢谢大家!