如何基于 Apache SeaTunnel 同步数据到 Iceberg

目前仅支持添加字段和部分类型变更iceberg.table.primary-keysstringno-表的主键配置,多个主键用quotquot分割,与quoticeberg.table.upsert-mode-enabledquot一起使用,用于同主键数据的增量更新iceberg.table.upsert-mode-enabledbooleannofalse将其设置为true以启用upsert模式,默认为false,用于Iceberg中数据的增量更新iceberg.table.partition-keysstringno-创建表时指定的分区字段,多个分区字段使用quotquot分隔

概述

Apache SeaTunnel

Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线 & 实时)同步和转化的开源数据集成平台, 支持 spark、flink 及自研 Zeta 引擎,有庞大的用户社群.

Apache Iceberg

Apache Iceberg 是一个开源的表格格式,它旨在改善大数据生态系统中复杂的数据湖管理。作为 Apache 软件基金会的一部分,Iceberg 专为提供更强大、更灵活的数据湖表格管理功能而设计,它通过提供一种更加高效和可靠的方式来处理大规模数据集,从而解决了传统数据湖在数据可靠性、性能和可维护性方面的挑战。

主要特点

  1. 模式演变和兼容性:Iceberg 支持模式的演变,同时保证了向前和向后的兼容性。这使得在不破坏已有数据的情况下添加、删除、更新字段成为可能。

  2. 隐藏分区:分区信息作为表模式的一部分进行存储,这消除了需要手动管理分区目录的复杂性。分区对查询透明,即可进行常规查询而无需指定具体的分区。

  3. 多计算引擎支持:Iceberg 可以与现代计算引擎无缝集成,包括 Apache Spark、Apache Flink、PrestoDB 和 Trino 等。同一数据集可以被多个引擎并发访问且保持一致性。

  4. 存储引擎支持: HDFS / S3

  5. 原子操作:Iceberg 支持原子性写入操作。这意味着表更新要么全部成功,要么全部失败,确保了数据的一致性。

  6. 快照管理:支持表的快照功能,允许用户回滚到历史版本,以及进行增量读取操作。这对于数据恢复和审计尤为重要。

  7. 高效读写:通过提供文件层面的元数据,使得读写操作可以更高效地进行。该功能减少了需要扫描的数据量,改善了查询性能。

使用场景

  • 数据湖构建和管理:对于需要构建和管理大型数据湖的企业和组织,Apache Iceberg 提供了一个高效、可扩展且易于管理的解决方案。

  • 多计算引擎环境:在使用多个计算引擎进行数据处理的环境中,Iceberg 能够提供一致的数据视图和并发控制。

  • 数据科学和分析:提供了更强大且灵活的数据组织方式,使得进行复杂分析和数据科学项目更加容易。

SeaTunnel Iceberg sink

介绍

Apache SeaTunnel connector-Iceberg 是专门为 Iceberg 引擎开发的数据同步组件, 主要为了方便 SeaTunnel 用户能更加友好的使用 Iceberg 来构建企业级数据湖仓

Iceberg sink 特性

  • 支持数据批量数据写入

  • cdc 模式下的数据同步

  • 支持配置自动建表

  • 支持 schema evolution

  • 支持指定分区键

  • 支持数据提交到指定的 branch

Sink 参数配置

NameTypeRequiredDefaultDescription
catalog_namestringyesdefault用户指定的目录名称。默认为 default。
namespacestringyesdefaultIceberg 数据库名称。默认为 default
tablestringyes-Iceberg 表名称。
iceberg.catalog.configmapno-指定用于初始化 Iceberg 目录的属性,具体配置参考:Iceberg Catalog Properties
hadoop.configmapno-指定 Hadoop 配置的属性,具体配置参考: Hadoop Configuration
iceberg.hadoop-conf-pathstringno-指定加载 'core-site.xml'、'hdfs-site.xml'、'hive-site.xml' 文件的路径。
case_sensitivebooleannofalse控制是否以区分大小写的方式匹配 schema。
iceberg.table.write-propsmapno-传递给 Iceberg 写入器初始化的属性,这些属性具有优先权,可以在 Iceberg Write Properties 找到具体参数。
iceberg.table.auto-create-propsmapno-Iceberg 在自动创建表时指定的配置,具体参照: Table Behavior Properties
iceberg.table.schema-evolution-enabledbooleannofalse将其设置为 true 可以使 Iceberg 表在同步过程中支持模式演变。目前仅支持添加字段 和 部分类型变更
iceberg.table.primary-keysstringno-表的主键配置,多个主键用 "," 分割 ,与 "iceberg.table.upsert-mode-enabled" 一起使用,用于同主键数据的增量更新
iceberg.table.upsert-mode-enabledbooleannofalse将其设置为 true 以启用 upsert 模式,默认为 false, 用于 Iceberg 中数据的增量更新
iceberg.table.partition-keysstringno-创建表时指定的分区字段,多个分区字段使用 "," 分隔。
iceberg.table.commit-branchstringno-指定数据提交的分支

同步模式

批处理

  • 批模式数据导入,append 模式,不进行数据的增量更新

  • 支持 flink , spark ,zeta 引擎

env {  parallelism = 1  job.mode = "BATCH"  # You can set spark configuration here  spark.app.name = "SeaTunnel"  spark.executor.instances = 2  spark.executor.cores = 1  spark.executor.memory = "1g"  spark.master = local}source {  FakeSource {    row.num = 100    schema = {      fields {        c_map = "map<string, string>"        c_array = "array<int>"        c_string = string        c_boolean = boolean        c_tinyint = tinyint        c_smallint = smallint        c_int = int        c_bigint = bigint        c_float = float        c_double = double        c_decimal = "decimal(30, 8)"        c_bytes = bytes        c_date = date        c_timestamp = timestamp      }    }    result_table_name = "fake"  }}transform {}sink {  Iceberg {    catalog_name="seatunnel_test"    iceberg.catalog.config={      "type"="hadoop"      "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"    }    namespace="seatunnel_namespace"    table="iceberg_sink_table"    iceberg.table.write-props={      write.format.default="parquet"      write.target-file-size-bytes=10    }    iceberg.table.partition-keys="c_timestamp"    case_sensitive=true  }}

流写入 (CDC)

  • 配置 mysql cdc 进行数据的增量采集

  • Sink 指定 iceberg.table.primary-keys 和 iceberg.table.upsert-mode-enabled=true 进行数据增量写入

  • 配置 iceberg.table.schema-evolution-enabled=true 支持 schema 的演进(当前仅支持增加字段和部分类型变更)

  • 支持 flink /zeta 引擎的数据同步,不支持 spark

env {  parallelism = 1  job.mode = "STREAMING"  checkpoint.interval = 5000}source {  MySQL-CDC {    result_table_name="customer_result_table"    catalog {      factory = Mysql    }    debezium = {      # include ddl      "include.schema.changes" = true    }    database-names=["mysql_cdc"]    table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]    format=DEFAULT    username = "st_user"    password = "seatunnel"    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"  }}transform {}sink {  Iceberg {    catalog_name="seatunnel_test"    iceberg.catalog.config={      "type"="hadoop"      "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-cdc-sink/"    }    namespace="seatunnel_namespace"    table="iceberg_sink_table"    iceberg.table.write-props={      write.format.default="parquet"      write.target-file-size-bytes=10    }    iceberg.table.primary-keys="id"    iceberg.table.partition-keys="f_datetime"    iceberg.table.upsert-mode-enabled=true    iceberg.table.schema-evolution-enabled=true    case_sensitive=true  }}

总结

基于 Apache SeaTunnel 来构建数据湖项目, 我们可以直接引用 SeaTunnel 强大的组件生态,不用独立构造新的项目来实现业务需求,同时 Apache SeaTunnel 的标准的架构设计也为熟悉开源的朋友提供了快速独立扩展的机会,可以在此基础上快速扩展自己的需求,做出符合自己业务需要的组件, 也欢迎大家试用 Iceberg-connect , 希望能帮大家真正解决实际生产场景中遇到的问题,

也希望大家能积极反馈使用中的问题,并贡献场景,大家共同来解决,并促进 Iceberg-connect 组件的完善, 一起共创数据开发的新场景. > 本文由 白鲸开源科技 提供发布支持!