Apache SeaTunnel基于JDBC连接器开发教程

MySqlTypeConverter类只存在convert和reconvert,分别对应JDBC类型转换为SEATUNNEL类型以及SEATUNNEL类型转换为JDBC类型,同时值得注意的是,在运行SEATUNNEL任务时,若存在该类中没有被定义的类型转换规则,则会抛出运行时异常UNSUPPORTEDDATATYPE,所以铁子们要想支持兼容更多的数据类型,应该从此类下手

说明

以下内容旨在帮助开发人员,快速了解熟悉SeaTunnel2.3.8程序框架,并能够进行JDBC连接器开发内容。

必要知识

在进行开发前,你一定要仔细阅读如下框架文档,它们存在于SeaTunnel源码中的Docs目录,这些文档能够帮你快速上手和规范开发,如果你已经阅读过则无需关注。

  • docs/zh/concept/config.md

  • docs/zh/concept/connector-v2-features.md

  • docs/zh/concept/JobEnvConfig.md

  • docs/zh/connector-v2/sink/Jdbc.md

  • docs/zh/connector-v2/sink-common-options.md

  • docs/zh/connector-v2/source-common-options.md

  • 连接器开发必看(modb.pro/db/447922)

SeaTunnel基础开发流程

拉取项目

 git clone https://github.com/apache/seatunnel.git

编译构建

  1. 选择Profiles

  2. mvn打包安装本地包

mvn clean install -Dmaven.test.skip=true

运行工程样例

seatunnel-examples是SeaTunnel本地环境运行模块,运行SeaTunnelEngineLocalExample的样例配置文件fake_to_console.conf,可查看SeaTunnel运行环境是否成功。

v2-e218cf0f00279f8bebe98b849633c289_1440w

打包发布

选择Profiles后,通过Dist模块可直接构建发布包。

v2-5adfb1dcf1bb67dacfba9f652b5b48f9_1440w

JDBC连接器开发

包目录介绍

v2-bb2fb0c05d9877c749c43e30cec10757_1440w

使用技巧

做JDBC连接器开发我们一般仅需要关注两部分,第一个是catalog包目录,第二个是internal中的dialect

这两部分已经提供了区别不同数据库的差异性描述,其余大部分代码是公共使用,不建议轻易修改,否则可能会影响所有引用类。

catalog中的类介绍

v2-a35787370e5ce3376e4f1f86ad5a9c9b_1440w

MySqlCatalogFactory中使用factoryIdentifier()来标识数据库类型,optionRule()用于定制连接器参数效验规则,createCatalog()是工厂类用来创建实例的方法。


@AutoService(Factory.class)public class MySqlCatalogFactory implements CatalogFactory {    @Override    public String factoryIdentifier() {        return DatabaseIdentifier.MYSQL;    }    @Override    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {        String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);        Preconditions.checkArgument(                StringUtils.isNoneBlank(urlWithDatabase),                "Miss config <base-url>! Please check your config.");        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);        return new MySqlCatalog(                catalogName,                options.get(JdbcCatalogOptions.USERNAME),                options.get(JdbcCatalogOptions.PASSWORD),                urlInfo);    }    @Override    public OptionRule optionRule() {        return JdbcCatalogOptions.BASE_RULE.build();    }}

MySqlCatalog 中包含了对于数据库元数据的查询,例如SELECT_DATABASE_EXISTS(库信息查询)、SELECT_TABLE_EXISTS (表信息查询),还定义了一些库表生成语句,例如获取表DDL语句,并且使用getTable()方法还能直接获取到外部表。


MysqlCreateTableSqlBuilderMysqlDataTypeConvertor是用于获取表DDL语句和类型转换器方法,较为简单这里就不一一赘述。

dialect中的类介绍

v2-4811f81b39207687b22d3a9f962c192c_1440w

MySqlDialectFactory使用工厂方式创建MySqlDialect实例,使用acceptsURL()方法判断不同JDBC URL,从而识别不同数据库类型。

create()的重载方法,主要是用于数据库端兼容多个数据库方言的场景。


MySqlDialect 的getRowConverter()getTypeConverter()用于获取行对象转换器和类型转换器。

行对象转换器主要提供JDBC数据对象 ⇔ Leftrightarrow ⇔ SeaTunnelRow对象互转方法。

类型转换器主要提供JDBC数据类型 ⇔ Leftrightarrow ⇔ SeaTunnel数据类型互转的方法,这两者共同作用于引擎侧内部对象和JDBC数据集的转入转出。

MySqlDialect中还定义了一些不通数据库操作的特性功能,例如Upsert功能实现和转义符号等。

MySqlDialect还能使用defaultParameter()方法为JDBC URL串定制一些默认的参数。


MySqlTypeConverter类只存在convertreconvert,分别对应JDBC类型转换为SEATUNNEL 类型以及SEATUNNEL类型转换为JDBC类型,同时值得注意的是,在运行SEATUNNEL任务时,若存在该类中没有被定义的类型转换规则,则会抛出运行时异常UNSUPPORTED_DATA_TYPE,所以铁子们要想支持兼容更多的数据类型,应该从此类下手。


MySqlTypeMapper 是老版本的TypeConverter,该类也是引用的MySqlTypeConverter。 MySqlVersion 是版本控制。

总结

通过上述内容,感兴趣的伙伴可以快速上手SeaTunnel 2.3.8的JDBC新连接器开发,实现高效的数据处理和集成。

SeaTunnel 社区致力于打造一个世界尖端的开源数据集成工具,如果对 SeaTunnel 源码感兴趣的伙伴也欢迎加入我们的贡献者种子群(欢迎添加我们的微信:davidzollo),专为社区新贡献者而建立,欢迎大家一起交流,一起进步,一起为社区壮大贡献自己的力量! 本文完!