【Demo 视频教程】使用SeaTunnel从MySQL同步到Doris

数据类型映射MySQL数据类型SeaTunnel数据类型BIT(1)ltbrgtINTUNSIGNEDBOOLEANTINYINTltbrgtTINYINTUNSIGNEDltbrgtSMALLINTltbrgtSMALLINTUNSIGNEDltbrgtMEDIUMINTltbrgtMEDIUMINTUNSIGNEDltbrgtINTltbrgtINTEGERltbrgtYEARINTINTUNSIGNEDltbrgtINTEGERUNSIGNEDltbrgtBIGINTBIGINTBIGINTUNSIGNEDDECIMAL(200)DECIMAL(xy)(获取指定列的指定列大小.lt38)DECIMAL(xy)DECIMAL(xy)(获取指定列的指定列大小.gt38)DECIMAL(3818)DECIMALUNSIGNEDDECIMAL(获取指定列的指定列大小+1ltbrgt(获取指定列小数点右侧的数字个数.)))FLOATltbrgtFLOATUNSIGNEDFLOATDOUBLEltbrgtDOUBLEUNSIGNEDDOUBLECHARltbrgtVARCHARltbrgtT

随着数据技术的快速发展,了解并掌握各种工具和技术变得尤为重要。为此,我们准备在 Apache SeaTunnel 社区发起如何使用连接器的 Demo 演示计划,邀请所有热爱数据同步技术的同学分享他们的知识和实操经验!

1714022144775c498402148dbdce81aa498432619596a

本期我们邀请到了社区 PMC 高俊老师,参与录制的主题是:如何从 MySQL 同步到 Doris,如果您对此计划感兴趣,也欢迎联系社区运营同学参与 Demo 录制!无论您是数据工程师、开发者还是技术爱好者,都欢迎您参与并展示您的技术才能。

敲重点 ~ 敲重点 ~ 如果你是用户,想看什么同步场景的 Demo!请下滑到最底部留言,我们优先出品呼声最高的同步场景 Demo!

Demo 计划目标

我们的目标是创建一个共享和学习的平台,通过具体的 Demo 演示和对应的文档帮助社区成员更好地理解和应用各种数据连接器。这些 Demo 可以帮助新手快速学习,同时也为资深专家提供一个展示创新解决方案的舞台。

MySQL 同步到 Doris


JDBC MySQL 连接器

描述

通过 JDBC 读取外部数据源数据。

支持的 MySQL 版本

  • 5.5/5.6/5.7/8.0

支持的引擎

Spark<br/> Flink<br/> SeaTunnel Zeta<br/>

使用依赖

Spark/Flink 引擎

  1. 您需要确保 jdbc 驱动 jar 包已放在 ${SEATUNNEL_HOME}/plugins/ 目录下。

SeaTunnel Zeta 引擎

  1. 您需要确保 jdbc 驱动 jar 包已放在 ${SEATUNNEL_HOME}/lib/ 目录下。

MySQL 连接器关键特性

支持查询 SQL 并可实现投影效果。

支持的数据源信息

数据源支持的版本驱动URLMaven 下载
MySQL不同的依赖版本有不同的驱动类。com.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306:3306/test下载

数据库依赖

请下载与 'Maven' 对应的支持列表,并复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录 <br/> 例如 MySQL 数据源:cp mysql-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

MySQL 数据类型SeaTunnel 数据类型
BIT(1)<br/>INT UNSIGNEDBOOLEAN
TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEARINT
INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINTBIGINT
BIGINT UNSIGNEDDECIMAL(20,0)
DECIMAL (x,y)(获取指定列的指定列大小.<38)DECIMAL(x,y)
DECIMAL (x,y)(获取指定列的指定列大小.>38)DECIMAL(38,18)
DECIMAL UNSIGNEDDECIMAL (获取指定列的指定列大小 + 1,<br/>(获取指定列小数点右侧的数字个数.)))
FLOAT<br/>FLOAT UNSIGNEDFLOAT
DOUBLE<br/>DOUBLE UNSIGNEDDOUBLE
CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSONSTRING
DATEDATE
TIMETIME
DATETIME<br/>TIMESTAMPTIMESTAMP
TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n)BYTES
GEOMETRY<br/>UNKNOWN尚未支持

数据源选项

名称类型是否必须默认值描述
urlString-JDBC 连接的 URL。示例: jdbc:mysql://localhost:3306:3306/test
driverString-用于连接远程数据源的 jdbc 类名,如果使用 MySQL,值为 com.mysql.cj.jdbc.Driver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-查询语句
connection_check_timeout_secInt30等待用于验证连接的数据库操作完成的时间(秒)
partition_columnString-用于并行处理的分区列,仅支持数字类型主键,且只能配置一个列。
partition_lower_boundBigDecimal-扫描的分区列最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_boundBigDecimal-扫描的分区列最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_numInt任务并行度分区数,仅支持正整数,默认值为任务并行度。
fetch_sizeInt0对于返回大量对象的查询,您可以配置查询中使用的行取回大小以通过减少满足选择条件所需的数据库访问次数来提高性能。<br/> 零表示使用 jdbc 默认值。
propertiesMap-额外的连接配置参数,当 properties 和 URL 有相同的参数时,具体优先权由驱动实现决定。例如,在 MySQL 中,properties 优先于 URL。
table_pathInt0表的完整路径,您可以使用这个配置替代 query。例如:<br/>mysql: "testdb.table1" <br/>oracle: "test_schema.table1" <br/>sqlserver: "testdb.test_schema.table1" <br/>postgresql: "testdb.test_schema.table1"
table_listArray0要读取的表列表,您可以使用这个配置替代 table_path 示例:[{ table_path = "testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name from testdb.table2"}]
where_conditionString-所有表 / 查询的通用行过滤条件,必须以 where 开始。例如 where id > 100
split.sizeInt8096表的分割大小(行数),读取表时会将表分割成多个分片。
split.even-distribution.factor.lower-boundDouble0.05分块键分布因子的下界。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下界,则表块将被优化以均匀分布。否则,如果分布因子较小,则表将被视为分布不均,如果估计的分片数超过 sample-sharding.threshold 指定的值,则会使用基于抽样的分片策略。默认值为 0.05。
split.even-distribution.factor.upper-boundDouble100分块键分布因子的上界。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上界,则表块将被优化以均匀分布。否则,如果分布因子较大,则表将被视为分布不均,并且如果估计的分片数超过 sample-sharding.threshold 指定的值,则会使用基于抽样的分片策略。默认值为 100.0。
split.sample-sharding.thresholdInt10000此配置指定触发基于抽样的分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-bound 和 chunk-key.even-distribution.factor.lower-bound 指定的范围,并且估计的分片数(计算为近似行数 / 块大小)超过此阈值时,将使用基于抽样的分片策略。这可以帮助更有效地处理大型数据集。默认值为 1000 个分片。
split.inverse-sampling.rateInt1000在基于抽样的分片策略中使用的抽样率的倒数。例如,如果此值设置为 1000,意味着在抽样过程中应用了 1/1000 的抽样率。此选项提供了控制抽样粒度的灵活性,从而影响最终分片数。在处理非常大的数据集时尤其有用,此时可能更倾向于较低的抽样率。默认值为 1000。
common-options-数据源插件通用参数,请参阅 Source Common Options 了解详情。

并行读取器

JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用某些规则来分割表中的数据,这些数据将交给读取器进行读取。读取器的数量由 parallelism 选项决定。

分割键规则:

  1. 如果 partition_column 不为 null,将使用它来计算分割。该列必须在支持分割的数据类型中。

  2. 如果 partition_column 为 null,seatunnel 将读取表的架构并获取主键和唯一索引。如果主键和唯一索引中有多于一个列,将使用第一个在支持分割的数据类型中的列来分割数据。例如,表有主键 (nn guid, name varchar),因为 guid 不在支持分割的数据类型中,所以将使用列 name 来分割数据。

支持分割的数据类型:

  • 字符串

  • 数字 (int, bigint, decimal, ...)

  • 日期

与分割相关的选项

split.size

一个分割中的行数,读取表时会将表分割成多个分割。

split.even-distribution.factor.lower-bound

不推荐使用

分块键分布因子的下界。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下界,则表块将被优化以均匀分布。否则,如果分布因子较小,则表将被视为分布不均,并且如果估计的分片数超过 sample-sharding.threshold 指定的值,则会使用基于抽样的分片策略。默认值为 0.05。

split.even-distribution.factor.upper-bound

不推荐使用

分块键分布因子的上界。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上界,则表块将被优化以均匀分布。否则,如果分布因子较大,则表将被视为分布不均,并且如果估计的分片数超过 sample-sharding.threshold 指定的值,则会使用基于抽样的分片策略。默认值为 100.0。

split.sample-sharding.threshold

此配置指定触发基于抽样的分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-bound 和 chunk-key.even-distribution.factor.lower-bound 指定的范围,并且估计的分片数(计算为近似行数 / 块大小)超过此阈值时,将使用基于抽样的分片策略。这可以帮助更有效地处理大型数据集。默认值为 1000 个分片。

split.inverse-sampling.rate

在基于抽样的分片策略中使用的抽样率的倒数。例如,如果此值设置为 1000,意味着在抽样过程中应用了 1/1000 的抽样率。此选项提供了控制抽样粒度的灵活性,从而影响最终分片数。在处理非常大的数据集时尤其有用,此时可能更倾向于较低的抽样率。默认值为 1000。

partition_column [string]

用于分割数据的列名。

partition_upper_bound [BigDecimal]

扫描的分区列最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。

partition_lower_bound [BigDecimal]

扫描的分区列最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。

partition_num [int]

不推荐使用,正确的做法是通过 split.size 控制分割数

我们需要将其分割成多少分割,只支持正整数。默认值是任务并行度。

小贴士

如果表不能分割 (例如,表没有主键或唯一索引,并且未设置 partition_column),它将以单一并发运行。

使用 table_path 替换 query 以读取单个表。如果需要读取多个表,请使用 table_list

任务示例

此示例查询测试 "数据库" 中的 type_bin 'table' 16 数据,并以单一并行方式查询其所有字段。您还可以指定要查询的字段以最终输出到控制台。

定义运行时环境

env {parallelism = 4job.mode = "BATCH"}source{Jdbc {url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "123456"query = "select * from type_bin limit 16"}}transform {# 如果您想了解更多关于如何配置seatunnel和查看transform插件完整列表的信息,# 请访问 https://seatunnel.apache.org/docs/transform-v2/sql}sink {Console {}}

按主键或唯一索引并行

配置 table_path 将开启自动分割,您可以配置 split.* 来调整分割策略

env {  parallelism = 4  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"        driver = "com.mysql.cj.jdbc.Driver"        connection_check_timeout_sec = 100        user = "root"        password = "123456"        query = "select * from type_bin"        partition_column = "id"        split.size = 10000        # Read start boundary        #partition_lower_bound = ...        # Read end boundary        #partition_upper_bound = ...    }}sink {  Console {}}

指定查询边界并行

指定查询数据的上下边界更为高效

env {  parallelism = 4  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"        driver = "com.mysql.cj.jdbc.Driver"        connection_check_timeout_sec = 100        user = "root"        password = "123456"        table_path = "testdb.table1"        query = "select * from testdb.table1"        split.size = 10000    }}sink {  Console {}}

读取多个表

配置 table_list 将开启自动分割,您可以配置 split.* 来调整分割策略

env {  job.mode = "BATCH"  parallelism = 4}source {  Jdbc {    url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"    driver = "com.mysql.cj.jdbc.Driver"    connection_check_timeout_sec = 100    user = "root"    password = "123456"    table_list = [      {        table_path = "testdb.table1"      },      {        table_path = "testdb.table2"        # 使用查询文件过滤行和列        query = "select id, name from testdb.table2 where id > 100"      }    ]    #where_condition= "where id > 100"    #split.size = 8096    #split.even-distribution.factor.upper-bound = 100    #split.even-distribution.factor.lower-bound = 0.05    #split.sample-sharding.threshold = 1000    #split.inverse-sampling.rate = 1000  }}sink {  Console {}}

多表读取

配置 table_list 将开启自动分割,您可以配置 split.* 来调整分割策略

env {  job.mode = "BATCH"  parallelism = 4}source {  Jdbc {    url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"    driver = "com.mysql.cj.jdbc.Driver"    connection_check_timeout_sec = 100    user = "root"    password = "123456"    table_list = [      {        table_path = "testdb.table1"      },      {        table_path = "testdb.table2"        # 使用查询文件过滤行和列        query = "select id, name from testdb.table2 where id > 100"      }    ]    #where_condition= "where id > 100"    #split.size = 8096    #split.even-distribution.factor.upper-bound = 100    #split.even-distribution.factor.lower-bound = 0.05    #split.sample-sharding.threshold = 1000    #split.inverse-sampling.rate = 1000  }}sink {  Console {}}

以上是视频中出现的代码及文档说明,大家可以结合视频中的讲解进行实操,最后如果您对录制感兴趣,请继续往下阅读

如何参与 Demo 录制?

提交您的 Demo

  1. 准备您的演示:选择一个您熟悉的连接器,准备一个 5 到 10 分钟的视频演示。确保视频清晰展示了如何配置和使用该连接器,解决了什么问题,以及可能的最佳实践。

  2. 提交视频:请添加社区同学微信 18819063834 上传您的视频和相关描述。