JDBC SQL Server Source Connector: 一览与实践

本文档将指导您如何配置 Apache SeaTunnel,使用 JDBC SQL Server Source Connector 来实现数据的有效同步。


在快速发展的数据驱动业务环境中,确保数据在各个系统间高效、准确地同步至关重要。为了进一步的数据处理和分析,经常需要将这些数据同步到其他数据处理系统。Apache SeaTunnel 提供了一个强大而灵活的数据集成框架,使得从 SQL Server 到其他系统的数据同步变得简单且高效。

本文档将指导您如何配置 Apache SeaTunnel,使用 JDBC SQL Server Source Connector 来实现数据的有效同步。

SQL Server

JDBC SQL Server Source Connector

支持 SQL Server 版本

  • 服务器:2008(或更高版本,仅供信息参考)

支持以下引擎

Spark <br/> Flink <br/> Seatunnel Zeta <br/>

主要特点

描述

通过 JDBC 读取外部数据源数据。

支持的数据源信息

数据源 支持的版本 驱动 URL Maven
SQL Server 支持版本 >= 2008 com.microsoft.sqlserver.jdbc.SQLServerDriver jdbc:sqlserver://localhost:1433 下载

数据库依赖

请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录<br/> 例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

SQL Server 数据类型 Seatunnel 数据类型
BIT BOOLEAN
TINYINT<br/>SMALLINT SHORT
INTEGER INT
BIGINT LONG
DECIMAL<br />NUMERIC<br />MONEY<br />SMALLMONEY DECIMAL((指定列的指定列大小)+1,<br/>(获取指定列的小数点右边的数字的数量。)))
REAL FLOAT
FLOAT DOUBLE
CHAR<br />NCHAR<br />VARCHAR<br />NTEXT<br />NVARCHAR<br />TEXT STRING
DATE LOCAL_DATE
TIME LOCAL_TIME
DATETIME<br />DATETIME2<br />SMALLDATETIME<br />DATETIMEOFFSET LOCAL_DATE_TIME
TIMESTAMP<br />BINARY<br />VARBINARY<br />IMAGE<br />UNKNOWN 尚不支持

源选项

名称 类型 必需 默认值 描述
url 字符串 - JDBC 连接的 URL。例如:jdbc:sqlserver://127.0.0.1:1434;database=TestDB
driver 字符串 - 用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
user 字符串 - 连接实例的用户名
password 字符串 - 连接实例的密码
query 字符串 - 查询语句
connection_check_timeout_sec 整数 30 等待用于验证连接的数据库操作完成的秒数
partition_column 字符串 - 并行处理的分区列,仅支持数值类型。
partition_lower_bound 长整数 - 用于扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_bound 长整数 - 用于扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_num 整数 作业并行度 分区计数的数量,仅支持正整数。默认值为作业并行度。
fetch_size 整数 0 对返回大量对象的查询,您可以配置查询中使用的行抓取大小,以减少满足选择条件所需的数据库命中次数,从而提高性能。<br/>零表示使用 JDBC 默认值。
common-options
- 源插件的常见参数,请参阅 源常用选项 以获取详细信息。

提示

如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度进行并行执行。

任务示例

简单:

简单的单一任务以读取数据表

# 定义运行时环境env {# 您可以在此处设置 Flink 配置execution.parallelism = 1job.mode = "BATCH"}source{Jdbc {driver = com.microsoft.sqlserver.jdbc.SQLServerDriverurl = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"user = SApassword = "Y.sa123456"query = "select * from full_types_jdbc"}}transform { # 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息, # 请转到 [seatunnel.apache.org/docs/transform-v2/sql](https://seatunnel.apache.org/docs/transform-v2/sql)}sink {Console {}}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {  # 您可以在此处设置 Flink 配置  execution.parallelism = 10  job.mode = "BATCH"}source {    Jdbc {        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"        user = SA        password = "Y.sa123456"        # 根据需要定义查询逻辑        query = "select * from full_types_jdbc"        # 并行分片读取字段        partition_column = "id"        # 片段数量        partition_num = 10    }}transform {    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql}sink {    Console {}}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {  # 您可以在此处设置 Flink 配置  execution.parallelism = 10  job.mode = "BATCH"}source {    Jdbc {        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"        user = SA        password = "Y.sa123456"        # 根据需要定义查询逻辑        query = "select * from full_types_jdbc"        # 并行分片读取字段        partition_column = "id"        # 片段数量        partition_num = 10    }}transform {    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql}sink {    Console {}}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {  # 您可以在此处设置 Flink 配置  execution.parallelism = 10  job.mode = "BATCH"}source {    Jdbc {        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"        user = SA        password = "Y.sa123456"        # 根据需要定义查询逻辑        query = "select * from full_types_jdbc"        # 并行分片读取字段        partition_column = "id"        # 片段数量        partition_num = 10    }}transform {    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql}sink {    Console {}}

并行:

使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:

env {  # 您可以在此处设置 Flink 配置  execution.parallelism = 10  job.mode = "BATCH"}source {    Jdbc {        driver = com.microsoft.sqlserver.jdbc.SQLServerDriver        url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"        user = SA        password = "Y.sa123456"        # 根据需要定义查询逻辑        query = "select * from full_types_jdbc"        # 并行分片读取字段        partition_column = "id"        # 片段数量        partition_num = 10    }}transform {    # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,    # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql}sink {    Console {}}

分段并行读取示例:

这是一个快速并行读取数据的分片示例

env {  # 您可以在此处设置引擎配置  execution.parallelism = 10}source {  # 这是一个示例源插件,仅用于测试和展示源插件的功能  Jdbc {    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"    user = SA    password = "Y.sa123456"    query = "select * from column_type_test.dbo.full_types_jdbc"    # 并行分片读取字段    partition_column = "id"    # 片段数量    partition_num = 10  }  # 如果您想要获取有关如何配置 Seatunnel 和查看源插件的完整列表的更多信息,  # 请转到 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc}transform {  # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,  # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql}sink {  Console {}  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,  # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc}