如何基于Apache SeaTunnel 读取Oracle的数据

env{parallelism=4job.mode=quotBATCHquot}source{Jdbc{url=quotjdbcoraclethin@datasource011523xequotdriver=quotoracle.jdbc.OracleDriverquotuser=quotrootquotpassword=quot123456quotquery=quotSELECT*FROMTESTTABLEquot}}transform{#Ifyouwouldliketogetmoreinformationabouthowtoconfigureseatunnelandseefulllistoftransformplugins#pleasegotohttpsseatunnel.apache.orgdocstransform-v2sql}sink{Console{}}按partitioncolumn并行读取使用配置的分片字段和分片数据并行读取查询表,如果想读取整个表,可以使用此方法

引言

在大数据时代,企业面临着数据的快速增长和多样化需求,如何高效地处理和整合来自不同数据源的数据成为了关键问题。

v2-993c1944e1d096e2382b4d338d0842e6_b

Apache SeaTunnel作为一款开源数据集成工具,提供了灵活的数据处理和实时数据同步能力,广泛应用于数据仓库、数据湖及实时分析场景中。与此同时,Oracle 数据库以其高性能和可靠性,成为许多企业数据存储的首选。结合 Apache SeaTunnel 与 Oracle 数据库,可以实现高效的数据迁移与转换。

Apache SeaTunnel

Apache SeaTunnel 是一款数据集成工具,支持批处理和流处理。它允许用户通过简单的配置实现数据的抽取、转换和加载(ETL)。SeaTunnel 提供了多种连接器,能够轻松集成不同的数据源和目标,包括关系型数据库、NoSQL 数据库、文件系统等。其灵活性和扩展性使其成为企业数据集成的重要选择。

Oracle 数据库

Oracle 数据库是一款广泛使用的关系型数据库管理系统,以其强大的事务处理能力和安全性而著称。它适用于处理大型企业级应用程序的数据存储需求。Oracle 支持多种数据类型和复杂查询,能够高效地管理和分析大量数据,适合用于金融、电信、医疗等行业。

Oracle JDBC 连接器概述

连接器描述

Oracle JDBC 连接器通过 JDBC 方式读取外部数据源的数据,支持 Apache SeaTunnel 的多种引擎,包括 Spark、Flink 和 SeaTunnel Zeta。

使用依赖

Spark/Flink 引擎

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/plugins/ 目录中。

  2. 为支持国际化字符集,复制 orai18n.jar 到 ${SEATUNNEL_HOME}/plugins/ 目录。

SeaTunnel Zeta 引擎

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/lib/ 目录中。

  2. 为支持国际化字符集,复制 orai18n.jar 到 ${SEATUNNEL_HOME}/lib/ 目录。

主要特性

  • 批处理支持

  • 精确一次(exactly-once)语义

  • 列投影(column projection)

  • 并行处理(parallelism)

  • 支持用户定义的分割(user-defined split)

支持的数据源信息

数据库依赖

请下载对应于 Maven 的支持列表,并将其复制到 ${SEATUNNEL_HOME}/plugins/jdbc/lib/ 工作目录中。

例如 Oracle 数据源:

cp ojdbc8-xxxxxx.jar $SEATUNNEL_HOME/lib/

为支持国际化字符集,复制 orai18n.jar 到 ${SEATUNNEL_HOME}/lib/ 目录中。

数据类型映射

源选项

并行读取

JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用一定规则拆分表中的数据,并将其交给读取器进行读取。读取器的数量由 parallelism 选项决定。

拆分键规则

  1. 如果 partition_column 不为空,则使用该列进行计算。该列必须为 支持的拆分数据类型

  2. 如果 partition_column 为空,SeaTunnel 将读取表的模式并获取主键和唯一索引。如果主键和唯一索引中有多个列,则使用第一个支持的拆分数据类型列进行拆分。例如,表的主键为 (guid, name),因为 guid 不在 支持的拆分数据类型 中,因此使用 name 列进行拆分。

支持的拆分数据类型

  • String

  • Number (int, bigint, decimal, ...)

  • Date

拆分相关选项

split.size

每个拆分中包含的行数,捕获的表在读取时被拆分为多个部分。

split.even-distribution.factor.lower-bound

不推荐使用

块键分布因子的下限。用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于该下限(即 (MAX(id) - MIN(id) + 1) / row count),则表块将被优化为均匀分布。否则,如果分布因子小于下限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold 指定的值,将使用基于采样的分片策略。默认值为 0.05。

split.even-distribution.factor.upper-bound

不推荐使用

块键分布因子的上限。用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于该上限(即 (MAX(id) - MIN(id) + 1) / row count),则表块将被优化为均匀分布。否则,如果分布因子大于上限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold 指定的值,将使用基于采样的分片策略。默认值为 100.0。

split.sample-sharding.threshold

此配置指定估计分片数量的阈值,以触发样本分片策略。当分布因子超出指定的上下限时,且估计的分片数量(计算为近似行数 / 块大小)超过此阈值时,将使用样本分片策略。这可以帮助更高效地处理大型数据集。默认值为 1000 个分片。

split.inverse-sampling.rate

样本分片策略中使用的采样率的反值。例如,如果该值设置为 1000,则在采样过程中应用 1/1000 的采样率。此选项提供灵活性,以控制采样的粒度,从而影响最终的分片数量。尤其在处理非常大的数据集时,较低的采样率更为合适。默认值为 1000。

partition_column [string]

拆分数据的列名。

partition_upper_bound [BigDecimal]

扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。

partition_lower_bound [BigDecimal]

扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。

partition_num [int]

不推荐使用,正确的方法是通过 split.size 控制拆分的数量。

我们需要拆分成多少个块,仅支持正整数,默认值为作业并行性。

小贴士

如果表无法拆分(例如,表没有主键或唯一索引,且未设置 partition_column),则将以单一并发执行。

使用 table_path 替代 query 进行单表读取。如果需要读取多个表,使用 table_list

示例任务

以下是一些使用 JDBC 源连接器的任务示例:

简单示例

该示例在 TEST_TABLE 表中查询所有字段,您还可以指定要查询哪些字段以最终输出到控制台

env {  parallelism = 4  job.mode = "BATCH"}source{    Jdbc {        url = "jdbc:oracle:thin:@datasource01:1523:xe"        driver = "oracle.jdbc.OracleDriver"        user = "root"        password = "123456"        query = "SELECT * FROM TEST_TABLE"    }}transform {    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,    # please go to https://seatunnel.apache.org/docs/transform-v2/sql}sink {    Console {}}

按 partition_column 并行读取

使用配置的分片字段和分片数据并行读取查询表,如果想读取整个表,可以使用此方法。

env {  parallelism = 4  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:oracle:thin:@datasource01:1523:xe"        driver = "oracle.jdbc.OracleDriver"        user = "root"        password = "123456"        partition_column = "ID"        partition_num = 10    }}sink {  Console {}}

按主键或唯一索引并行读取

通过配置 table_path 开启自动分片,可以配置 split.* 来调整分片策略。

env {  parallelism = 4  job.mode = "BATCH"}source {    Jdbc {        url = "jdbc:oracle:thin:@datasource01:1523:xe"        driver = "oracle.jdbc.OracleDriver"        user = "root"        password = "123456"        table_path = "DA.SCHEMA1.TABLE1"        query = "select * from SCHEMA1.TABLE1"        split.size = 10000    }}sink {  Console {}}

并行读取

根据配置的上下限更高效地读取数据。

source {    Jdbc {        url = "jdbc:oracle:thin:@datasource01:1523:xe"        driver = "oracle.jdbc.OracleDriver"        user = "root"        password = "123456"        partition_column = "ID"        partition_lower_bound = 1        partition_upper_bound = 500        partition_num = 10    }}

多表读取

配置 table_list 将开启自动拆分,以配置split.来调整分割策略*。

env {  job.mode = "BATCH"  parallelism = 4}source {  Jdbc {    url = "jdbc:oracle:thin:@datasource01:1523:xe"    driver = "oracle.jdbc.OracleDriver"    connection_check_timeout_sec = 100    user = "root"    password = "123456"    "table_list"=[        {            "table_path"="XE.TEST.USER_INFO"        },        {            "table_path"="XE.TEST.YOURTABLENAME"        }    ]    #where_condition= "where id > 100"    split.size = 10000    #split.even-distribution.factor.upper-bound = 100    #split.even-distribution.factor.lower-bound = 0.05    #split.sample-sharding.threshold = 1000    #split.inverse-sampling.rate = 1000  }}sink {  Console {}}

总结

Apache SeaTunnel为连接和集成Oracle数据库提供了灵活的解决方案。通过简单的配置,用户可以高效地从Oracle数据库读取和处理数据,满足不同的业务需求。

无论是在实时数据处理还是批量数据集成场景中,SeaTunnel都能为用户带来显著的便利和高效。

本文由 白鲸开源科技 提供发布支持!