SeaTunnel DB2 Source Connector 使用文档(含详细操作步骤)

DB2是IBM的一款关系型数据库管理系统,JDBC DB2 Source Connector是一个用于通过JDBC读取外部数据源数据的连接器。Apache SeaTunnel如何通过DB2 Source Connector支持DB2数据同步?本文档有详细操作指南供参考。

1712046840013c890515f1055143925ae4fb85b86ec70

DB2是IBM的一款关系型数据库管理系统,JDBC DB2 Source Connector是一个用于通过JDBC读取外部数据源数据的连接器。Apache SeaTunnel如何通过DB2 Source Connector支持DB2数据同步?本文档有详细操作指南供参考。

支持引擎

Spark
Flink
SeaTunnel Zeta引擎

主要特性

  • 批处理(batch)
  • 精确一次(exactly-once)
  • 列投影(column projection)
  • 并行处理(parallelism)
  • 支持用户自定义分割(support user-defined split)
支持查询SQL并可以实现投影效果。

描述

通过 JDBC 读取外部数据源数据。

支持数据源

数据源支持版本驱动程序URLMaven
DB2
不同的依赖版本有不同的驱动程序类。
com.ibm.db2.jdbc.app.DB2Driver
jdbc:db2://127.0.0.1:50000/dbname
Download

数据库依赖

数据库依赖需要下载对应'Maven'的支持列表并复制到'$SEATNUNNEL_HOME/plugins/jdbc/lib/'工作目录。
例如,DB2数据源:
cp db2-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

DB2数据类型SeaTunnel数据类型
BOOLEAN
BOOLEAN
SMALLINT
SHORT
INT
INTEGER
INTEGER
BIGINT
LONG
DECIMAL
DEC
NUMERIC
NUM
DECIMAL(38,18)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE PRECISION
DECFLOAT
DOUBLE
CHAR
VARCHAR
LONG VARCHAR
CLOB
GRAPHIC
VARGRAPHIC
LONG VARGRAPHIC
DBCLOB
STRING
BLOB
BYTES
DATE
DATE
TIME
TIME
TIMESTAMP
TIMESTAMP
ROWID
XML
Not supported yet

源选项

名称
类型
必需
默认
描述
url
String
Yes
-
JDBC连接的URL。参考案例:jdbc:db2:127.0.0.1:50000/dbname
driver
String
Yes
-
用于连接到远程数据源的jdbc类名。如果你使用db2,值是com.ibm.db2.jdbc.app.DB2Driver
user
String
No
-
连接实例用户名
password
String
No
-
连接实例密码
query
String
Yes
-
查询语句
connection_check_timeout_sec
Int
No
30
用于验证连接到完成以等待数据库操作的时间(以秒为单位)
partition_column
String
No
-
用于并行处理的分区列名,仅支持数值类型,仅支持数值类型主键,并且只能配置一个列。
partition_lower_bound
Long
No
-
partition_column的扫描最小值,如果未设置,SeaTunnel将查询数据库获取最小值。
partition_upper_bound
Long
No
-
partition_column的扫描最大值,如果未设置,SeaTunnel将查询数据库获取最大值。
partition_num
Int
可选
作业并行度
分区计数的数量,仅支持正整数。默认值为作业的并行度。
fetch_size
Int
可选
0
对于返回大量对象的查询,您可以配置在查询中使用的行抓取大小,以提高性能,减少满足选择条件所需的数据库访问次数。零表示使用 JDBC 的默认值。
common-options

可选
-
源插件的通用参数,请参考 Source Common Options 获取详细信息。

提示

如果未设置 partition_column,则将以单一并发方式运行;如果设置了 partition_column,则根据任务的并发度并行执行。

任务示例

简单示例:

此示例在单一并发模式下查询您的测试数据库中类型为 'table' 的 16 条数据,并查询其所有字段。您还可以指定要查询的最终输出到控制台的字段。
# Defining the runtime environment
env {
 # You can set flink configuration here
 execution.parallelism = 2
 job.mode = "BATCH"
}
source{
   Jdbc {
       url = "jdbc:db2://127.0.0.1:50000/dbname"
       driver = "com.ibm.db2.jdbc.app.DB2Driver"
       connection_check_timeout_sec = 100
       user = "root"
       password = "123456"
       query = "select * from table_xxx"
   }
}

transform {
   # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
   # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
   Console {}
}

并行示例:

如果想要读取整个表,可以根据您配置的分片字段和分片数据,在并行方式下读取查询表。

source {
   Jdbc {
       url = "jdbc:db2://127.0.0.1:50000/dbname"
       driver = "com.ibm.db2.jdbc.app.DB2Driver"
       connection_check_timeout_sec = 100
       user = "root"
       password = "123456"
       # Define query logic as required
       query = "select * from type_bin"
       # Parallel sharding reads fields
       partition_column = "id"
       # Number of fragments
       partition_num = 10
   }
}

并行边界:

在查询中指定数据的上限和下限边界更加高效。根据您配置的上限和下限边界来读取您的数据源,效率更高。

source {
   Jdbc {
       url = "jdbc:db2://127.0.0.1:50000/dbname"
       driver = "com.ibm.db2.jdbc.app.DB2Driver"
       connection_check_timeout_sec = 100
       user = "root"
       password = "123456"
       # Define query logic as required
       query = "select * from type_bin"
       partition_column = "id"
       # Read start boundary
       partition_lower_bound = 1
       # Read end boundary
       partition_upper_bound = 500
       partition_num = 10
   }
}