HTTP接口数据也能定时同步入湖?用DolphinScheduler×SeaTunnel快速搞定!

我们之前曾评估使用过SeaTunnel做CDC入湖验证:SeaTunnel-CDC入湖实践,这些场景都是能直连数据库的场景,业务需求中经常会出现无法直连数据库做CDC进行数据同步的场景,而这些场景就需要使用API进行数据对接,用Apache DolphinScheduler定时同步数据。

背景与目标

我们之前曾评估使用过SeaTunnel做CDC入湖验证:SeaTunnel-CDC入湖实践,这些场景都是能直连数据库的场景,业务需求中经常会出现无法直连数据库做CDC进行数据同步的场景,而这些场景就需要使用API进行数据对接,用Apache DolphinScheduler定时同步数据。

举个实际中的例子:

  • ERP(SAP)的库存数据进行同步入湖仓做库存分析

同时,本次目标希望其他同事能依样画葫芦,在以后的对接http接口到湖仓的时候能够自行完成,而非每遇到一个对接需求,就需要通过代码方式进行对接。

准备工作

  • seatunnel 2.3.10

首先,您需要在${SEATUNNEL_HOME}/config/plugin_config文件中加入连接器名称,然后,执行命令来安装连接器,确认连接器在${SEATUNNEL_HOME}/connectors/目录下即可。

本例中我们会用到:connector-jdbcconnector-paimon

写入StarRocks也可以使用connector-starrocks,本例中的场景比较适合用connector-jdbc,所以使用connector-jdbc

# 配置连接器名称--connectors-v2--connector-jdbcconnector-starrocksconnector-paimon--end--
# 安装连接器sh bin/install-plugin.sh 2.3.10

SeaTunnel任务

我们先至少保证能在本地完成SeaTunnel任务,再完成对Apache DolphinScheduler的对接。

  • http to starRocks
    example/http2starrocks

env {  parallelism = 1  job.mode = "BATCH"}source {  Http {    plugin_output = "stock"    url = "https://ip/http/prd/query_sap_stock"    method = "POST"    headers {      Authorization = "Basic XXX"      Content-Type = "application/json"    }    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""    format = "json"    content_field = "$.ET_RETURN.*"    schema {      fields {        MATNR = "string"        MAKTX = "string"        WERKS = "string"        NAME1 = "string"        LGORT = "string"        LGOBE = "string"        CHARG = "string"        MEINS = "string"        LABST = "double"        UMLME = "double"        INSME = "double"        EINME = "double"        SPEME = "double"        RETME = "double"      }    }  }}# 此转换操作主要用于字段从命名等方便用途transform {  Sql {    plugin_input = "stock"    plugin_output = "stock-tf-out"    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"  }}# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写sink {    jdbc {        plugin_input = "stock-tf-out"        url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        user = "lab"        password = "XXX"        compatible_mode="starrocks"        query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""        }}# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景// sink {//   StarRocks {//     plugin_input = "stock-tf-out"//     nodeUrls = ["ip:8030"]//     base-url = "jdbc:mysql://ip:9030/"//     username = "lab"//     password = "XXX"//     database = "scm"//     table = "ods_sap_stock"//     batch_max_rows = 1000//     data_save_mode="DROP_DATA"//     starrocks.config = {//       format = "JSON"//       strip_outer_array = true//     }//     schema_save_mode = "RECREATE_SCHEMA"//     save_mode_create_template="""//       CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (//         MATNR STRING COMMENT '物料',//         WERKS STRING COMMENT '工厂',//         LGORT STRING COMMENT '库存地点',//         MAKTX STRING COMMENT '物料描述',//         NAME1 STRING COMMENT '工厂名称',//         LGOBE STRING COMMENT '地点描述',//         CHARG STRING COMMENT '批次编号',//         MEINS STRING COMMENT '单位',//         LABST DOUBLE COMMENT '非限制使用库存',//         UMLME DOUBLE COMMENT '在途库存',//         INSME DOUBLE COMMENT '质检库存',//         EINME DOUBLE COMMENT '受限制使用的库存',//         SPEME DOUBLE COMMENT '已冻结的库存',//         RETME DOUBLE COMMENT '退货'//       ) ENGINE=OLAP//       PRIMARY KEY ( MATNR,WERKS,LGORT)//       COMMENT 'sap库存'//       DISTRIBUTED BY HASH (WERKS) PROPERTIES (//       "replication_num" = "1"//       )//     """//   }// }
  • http to paimon
    example/http2paimon

env {  parallelism = 1  job.mode = "BATCH"}source {  Http {    plugin_output = "stock"    url = "https://ip/http/prd/query_sap_stock"    method = "POST"    headers {      Authorization = "Basic XXX"      Content-Type = "application/json"    }    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""    format = "json"    content_field = "$.ET_RETURN.*"    schema {      fields {        MATNR = "string"        MAKTX = "string"        WERKS = "string"        NAME1 = "string"        LGORT = "string"        LGOBE = "string"        CHARG = "string"        MEINS = "string"        LABST = "double"        UMLME = "double"        INSME = "double"        EINME = "double"        SPEME = "double"        RETME = "double"      }    }  }}# 此转换操作主要用于字段从命名等方便用途transform {  Sql {    plugin_input = "stock"    plugin_output = "stock-tf-out"    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"  }}# 连接paimon进行数据同步,paimon 暂时 未看到有支持 insert overwrite 分区覆写,此例仅作为参考,不适用本此例子需求sink {  Paimon {    warehouse = "s3a://test/"    database = "sap"    table = "ods_sap_stock"    paimon.hadoop.conf = {        fs.s3a.access-key=XXX        fs.s3a.secret-key=XXX        fs.s3a.endpoint="http://minio:9000"        fs.s3a.path.style.access=true        fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider    }  }}

DolphinScheduler集成SeaTunnel

  • 制作worker镜像

FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2RUN mkdir /opt/seatunnelRUN mkdir /opt/seatunnel/apache-seatunnel-2.3.10# 容器集成seatunnelCOPY apache-seatunnel-2.3.10/ /opt/seatunnel/apache-seatunnel-2.3.10/

打包镜像,推送到镜像仓库

docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .
  • 使用新镜像部署一个worker,此处修改docker-compose.yaml,增加一个dolphinscheduler-worker-seatunnel节点。

...  dolphinscheduler-worker-seatunnel:    image: xxx/dolphinscheduler-worker:3.2.2-seatunnel    profiles: ["all"]    env_file: .env    healthcheck:      test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ]      interval: 30s      timeout: 5s      retries: 3    depends_on:      dolphinscheduler-zookeeper:        condition: service_healthy    volumes:      - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler      - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs      - ./dolphinscheduler-shared-local:/opt/soft      - ./dolphinscheduler-resource-local:/dolphinscheduler    networks:      dolphinscheduler:        ipv4_address: 172.15.0.18...
  • DolphinScheduler配置SeaTunnel分组及环境配置

    • 安全中心-Worker分组管理,创建一个这个节点ip的分组,用于以后需要seatunnel的任务跑该分组
      17467594366894093287a6fbc3cfc394879150387231b

    • 环境管理-创建环境,增加一个用于执行seatunnel的环境,同时需要绑定Worker分组为上一步创建的seatunnel分组
      1746759436686e818d740f6298af1dcac1da7126c3fc2

    • 创建工作流定义,把上面的seatunnel任务配置填写上
      1746759436687bb094b1d607f7c1c306038be72f7461e

    • 运行时候,选择seatunnel的worker分组和环境即可跑在这个集成了seatunnel的环境上
      174675943668507fff2fc3004d2810f8ffcba85089180

转载自俊瑶先森
原文链接:https://junyao.tech/posts/9c6867c5.html