SeaTunnel × Gravitino:Schema URL 驱动的表结构自动感知方案

https://github.com/apache/seatunnel点击蓝字关注我们作者 | chl-wxp

https://github.com/apache/SeaTunnel

点击蓝字



关注我们

作者 | chl-wxp

不久前,社区发布了一篇题为《告别手敲 Schema!SeaTunnel 集成 Gravitino 元数据 RestApi 这个新动作有点酷》的文章,引起了小伙伴们的强烈反响,纷纷表示这真是个好东西啊!

Apache SeaTunnel Gravitino 元数据 RestApi 的贡献者行动力也超强,估计不久就能跟大家见面了(“可靠”消息,预计将在 3.0.0 版本得到支持)。为了让大家更加了解这个功能,贡献者还贴心地为社区小伙伴们写了一篇详细的文章,介绍了初版 Gravitino RestApi 的能力以及如何使用它,快来一睹为快吧!

1. 背景与要解决的问题

在使用 Apache SeaTunnel 进行批处理或同步任务时,当source是非结构化或者半结构化的类型时,Source 侧通常需要显式定义 schema(字段名、类型、顺序)。

在真实生产环境中,这会带来几个典型问题:

  • 表结构字段多、类型复杂,手工维护 schema 成本高且易出错
  • 上游表结构发生变更(加字段、改类型)时,需要同步修改 SeaTunnel 作业
  • 对于已有存量表,仅为了同步数据却需要重复描述元数据,存在明显冗余

因此,核心诉求是:

能否让 SeaTunnel 直接复用已有元数据系统中的表结构定义,而不是在作业中重复声明 schema?

本功能正是为了解决这一问题而引入。

2. Gravitino 能力简介

(与本功能相关部分)

Gravitino 是一个统一的元数据管理与访问服务,提供了标准化的 REST API,用于管理和暴露以下对象:

  • Metalake(逻辑隔离单元)
  • Catalog(如 MySQL、Hive、Iceberg 等)
  • Schema / Database
  • Table 及其字段定义

通过 Gravitino:

  • 表结构可以被集中管理
  • 下游系统可以通过 HTTP API 动态获取表的 schema 定义
  • 不再需要在每个计算/同步任务中重复维护字段信息

本次在 SeaTunnel 中引入的能力,正是:

支持在 Source 的 schema 定义中,通过 Gravitino 提供的 schema_url 自动拉取表结构

3. 本地测试环境准备

3.1 准备mysql环境

3.1.1 创建目标表

MySQL 中提前创建好目标表 test.demo_user,建表语句如下:

CREATE TABLE `demo_user` (  `id` bigint unsigned NOT NULL AUTO_INCREMENT,  `user_code` varchar(32) NOT NULL,  `user_name` varchar(64) DEFAULT NULL,  `password` varchar(128) DEFAULT NULL,  `email` varchar(128) DEFAULT NULL,  `phone` varchar(20) DEFAULT NULL,  `gender` tinyint DEFAULT NULL,  `age` int DEFAULT NULL,  `status` tinyint DEFAULT NULL,  `level` int DEFAULT NULL,  `score` decimal(10,2) DEFAULT NULL,  `balance` decimal(12,2) DEFAULT NULL,  `is_deleted` tinyint DEFAULT NULL,  `register_ip` varchar(45) DEFAULT NULL,  `last_login_ip` varchar(45) DEFAULT NULL,  `login_count` int DEFAULT NULL,  `remark` varchar(255) DEFAULT NULL,  `ext1` varchar(100) DEFAULT NULL,  `ext2` varchar(100) DEFAULT NULL,  `ext3` varchar(100) DEFAULT NULL,  `ext4` varchar(100) DEFAULT NULL,  `ext5` varchar(100) DEFAULT NULL,  `created_by` varchar(64) DEFAULT NULL,  `updated_by` varchar(64) DEFAULT NULL,  `created_time` datetime DEFAULT NULL,  `updated_time` datetime DEFAULT NULL,  `birthday` date DEFAULT NULL,  `last_login_time` datetime DEFAULT NULL,  `version` int DEFAULT NULL,  PRIMARY KEY (`id`),  UNIQUE KEY `uk_user_code` (`user_code`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

3.1.2 创建要同步的表结构

在实际应用中,我们把表结构统一管理起来,可能管理在paimonhivehudi等元数据组件中,但在这里为了方便,测试用的表结构信息指向测试的目标表,也就是上一个步骤创建的test.demo_user

3.2 注册该表结构到Gravitino中

Gravitino支持直连数据库,并会扫描库下所有表

img

该表已经作为 local-mysql catalog 下的一个 table 被 Gravitino 管理。

img_1

Metalake:test_Metalake

3.3 表结构访问关系说明

Gravitino 中表结构可以通过如下 REST API 访问:

http://localhost:8090/api/metalakes/test_Metalake/catalogs/${catalog}/schemas/${schema}/tables/${table}

在本次测试中,实际使用的 schema_url 为:

http://localhost:8090/api/metalakes/test_Metalake/catalogs/local-mysql/schemas/test/tables/demo_user

该接口返回的 JSON 中,包含了 demo_user 表的完整字段定义。

img_2

3.4 本地部署seatunnel

由于该功能还并未发版,需要手动编译最新的seatunnel的dev分支代码,并部署到本地。

3.5 准备数据文件

本次测试用例是csv作为数据文件,总共是2000条数据。

img_3

4. SeaTunnel 作业配置说明

4.1 核心配置示例

env {  parallelism = 1  job.mode = "BATCH"}source {  LocalFile {    path = "/Users/wangxuepeng/Desktop/seatunnel/apache-seatunnel-2.3.13-SNAPSHOT/test_data"    file_format_type = "csv"    schema {      schema_url = "http://localhost:8090/api/metalakes/test_Metalake/catalogs/local-mysql/schemas/test/tables/demo_user"    }  }}sink {  jdbc {    url = "jdbc:mysql://localhost:3306/test"    driver = "com.mysql.cj.jdbc.Driver"    username = "root"    password = "123456"    database = "test"    table = "demo_user"    generate_sink_sql = true  }}

4.2 配置要点说明

  • schema.schema_url

    • 指向 Gravitino 中的表元数据 REST 接口
    • SeaTunnel 在任务启动时会自动拉取表结构
    • 无需在作业中手工声明字段列表
  • generate_sink_sql = true

    • Sink 侧根据解析后的 schema 自动生成 INSERT SQL

5. 数据与任务执行结果


日志截图 :

img_4

数据库截图:

img_5

任务运行过程中:

  • Source 根据 schema_url 自动解析字段结构
  • CSV 文件字段与表结构自动对齐
  • 数据成功写入 MySQL demo_user 

6. 问题解答

6.1 功能支持的范围

该功能在dev分支目前是已经支持文件类型的连接器,包括localhdfss3等。

6.2 使用schema_url是否支持多表

改功能的引入并不影响多表的功能,甚至可以混合使用,比如:

source {  LocalFile {    tables_configs = [      {        path = "/seatunnel/read/metalake/table1"        file_format_type = "csv"        field_delimiter = ","        row_delimiter = "\n"        skip_header_row_number = 1        schema {          table = "db.table1"          fields {            c_string = string            c_int = int            c_boolean = boolean            c_double = double          }        }      },      {        path = "/seatunnel/read/metalake/table2"        file_format_type = "csv"        field_delimiter = ","        row_delimiter = "\n"        skip_header_row_number = 1        schema {          table = "db.table2"          schema_url = "http://gravitino:8090/api/metalakes/test_metalake/catalogs/test_catalog/schemas/test_schema/tables/table2"        }      }    ]  }}

7. 功能总结

通过引入 基于 Gravitino schema_url 的 schema 自动解析能力,SeaTunnel 在数据同步场景中具备了以下优势:

  • 消除重复 schema 定义,降低作业配置复杂度
  • 复用统一的元数据管理系统,提升一致性
  • 表结构变更对作业更加友好,维护成本显著降低

该能力非常适合:

  • 已有完善元数据平台的企业场景
  • 大表、多字段、频繁变更 schema 的同步任务
  • 希望提升 SeaTunnel 作业可维护性的用户

8. 参考链接

  • 代码 PR
    https://github.com/apache/seatunnel/pull/10402

  • schema_url 配置说明
    https://seatunnel.apache.org/zh-CN/docs/introduction/concepts/schema-feature#schema_url

Apache SeaTunnel

Apache SeaTunnel是一个云原生的多模态、高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达9.1k+,社区达到7000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。




同步Demo

MySQL→Doris | MySQLCDC | MySQL→Hive | HTTP → Doris  | HTTP → MySQL | MySQL→StarRocks|MySQL→Elasticsearch |Kafka→ClickHouse

新手入门

SeaTunnel 让数据集成变得 So easy!3 分钟入门指南
 0 到 1 快速入门 /初探/深入理解 
  分布式集群部署 | CDC数据同步管道 | Oracle-CDC
图片

最佳实践

中控技术天翼云多点OPPO | 清风马蜂窝孩子王哔哩哔哩唯品会众安保险兆原数通 | 亚信科技|映客|翼康济世|信也科技|华润置地|Shopee|京东科技|58同城|互联网银行|JPMorgan
图片

测试报告

SeaTunnel VS GLUE |  VS Airbyte |  VS DataX|SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比
图片

源码解析

Zeta引擎源码解析(一) |(二) |(三)| API 源码解析 |2.1.1源码解析|封装 Flink 连接数据库解析





仓库地址: 
https://github.com/apache/seatunnel
网址:
https://seatunnel.apache.org/
Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在Community Over Code(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:
https://github.com/apache/seatunnel/issues
贡献代码:
https://github.com/apache/seatunnel/pulls
订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org
开发邮件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
关注 X.com: 
https://x.com/ASFSeaTunnel