Apache SeaTunnel 设置钉钉通知详细教程,亲测可用!

Seatunnel任务事件钉钉通知

https://github.com/apache/SeaTunnel

点击蓝字



关注我们

转载自 DataScientist

作者 | ts7ming

说明

背景

问题

  • 需要在任务报错或者其他关键事件发送钉钉消息通知
  • Seatunnel 本身不支持消息通知, 必须依赖 DolphinScheduler 或其他外部工具

方案

  • 利用SeaTunnel提供的事件监听器功能
  • 编写自定义插件, 捕捉报错事件, 发送消息通知
  • 群机器人配置通过命令行提交

部署

  • 如果不想写代码打包, 只需要报错通知, 跳过开发插件 步骤, 下载 jar包即可
  • 如果需要自定义通知内容和其他事件处理, 自行调整代码

开发插件

  • 可以从 https://github.com/ts7ming/SeatunnelExt 获取(或https://gitee.com/ts7ming/SeatunnelExt)
项目结构
  • 包名 com.ts7ming 自定义即可
│  pom.xml

└─src
    └─main
        ├─java
        │  └─com
        │      └─ts7ming
        │              DingTalkEventListener.java
        │
        └─resources
            └─META-INF
                └─services
                        org.apache.seatunnel.api.event.EventHandler
pom.xml
  • 这里用了2.3.13 版本, 根据实际情况调整
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ts7ming</groupId>
    <artifactId>SeatunnelExt</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <seatunnel.version>2.3.13</seatunnel.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.seatunnel</groupId>
            <artifactId>seatunnel-api</artifactId>
            <version>${seatunnel.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.seatunnel</groupId>
            <artifactId>seatunnel-engine-common</artifactId>
            <version>${seatunnel.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
DingTalkEventListener.java
  • Seatunnel支持的事件如下
事件类型
说明
关联事件类
JOB_STATUS
作业状态变更事件
JobStateEvent
SCHEMA_CHANGE_UPDATE_COLUMNS
表结构更新事件
AlterTableColumnsEvent
SCHEMA_CHANGE_ADD_COLUMN
表添加列事件
AlterTableAddColumnEvent
SCHEMA_CHANGE_DROP_COLUMN
表删除列事件
AlterTableDropColumnEvent
SCHEMA_CHANGE_MODIFY_COLUMN
表修改列事件
AlterTableModifyColumnEvent
READER_OPEN
读取器打开事件
ReaderOpenEvent
READER_CLOSE
读取器关闭事件
ReaderCloseEvent
WRITER_OPEN
写入器打开事件
WriterOpenEvent
WRITER_CLOSE
写入器关闭事件
WriterCloseEvent
package com.ts7ming;

import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

@Slf4j
public class DingTalkEventListener implements EventHandler {
    private static final String WEBHOOK_URL = System.getProperty("dingtalk.webhook.url""https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN");
    private static final String SECRET = System.getProperty("dingtalk.secret""YOUR_SECRET");
    @Override
    public void handle(Event event) {
        EventType eventType = event.getEventType();
        if (eventType == EventType.JOB_STATUS) {
            handleJobStateEvent((JobStateEvent) event);
        } 
//        else if (eventType.name().equals("SCHEMA_CHANGE_ADD_COLUMN")) {
//            handleAddColumnEvent((AlterTableAddColumnEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_UPDATE_COLUMNS")) {
//            handleUpdateColumnEvent((AlterTableColumnsEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_DROP_COLUMN")) {
//            handleDropColumnEvent((AlterTableDropColumnEvent) event);
//        }
//        else if (eventType.name().equals("SCHEMA_CHANGE_MODIFY_COLUMN")) {
//            handleModifyColumnEvent((AlterTableModifyColumnEvent) event);
//        }
        else {
            log.debug("忽略未处理的事件类型: {}", eventType);
        }
    }

    private void handleJobStateEvent(JobStateEvent jobEvent) {
        String jobId = jobEvent.getJobId();
        String jobName = jobEvent.getJobName();
        JobStatus status = jobEvent.getJobStatus();
        long eventTime = jobEvent.getCreatedTime();

        switch (status) {
            case FAILED:
                sendAlert("【任务失败】jobId: " + jobId + ", jobName: " + jobName);
                break;
            case FINISHED:
                //sendAlert("任务完成: " + jobId + ", jobName: " + jobName);
                break;
            //其他需要处理的事件
            // case READER_OPEN:
            //     break;
            // case WRITER_CLOSE:
            //     break;
            default:
                log.debug("任务状态变更 | jobId: {}, 状态: {}, 时间: {}",jobId, status, eventTime);
                // 不发送通知
        }
    }

    private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
        sendAlert("【表结构变更】表名: " + tableName + ", 新增列: " + columnName);
    }
    private void handleUpdateColumnEvent(AlterTableColumnsEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        sendAlert("【表结构变更】表名: " + tableName + ", 更新内容: " + event);
    }
    private void handleDropColumnEvent(AlterTableDropColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn() : "未知列";
        sendAlert("【表结构变更】表名: " + tableName + ", 删除列: " + columnName);
    }
    private void handleModifyColumnEvent(AlterTableModifyColumnEvent event) {
        String tableName = event.getTableIdentifier().getTableName();
        String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
        sendAlert("【表结构变更】表名: " + tableName + ", 修改列: " + columnName);
    }

    private void sendAlert(String content) {
          sendDingTalkMessage(content);
    }

    void sendDingTalkMessage(String message) {
        try {
            long timestamp = System.currentTimeMillis();
            String sign = generateSign(timestamp, SECRET);
            String fullUrl = WEBHOOK_URL + "&timestamp=" + timestamp + "&sign=" + sign;
            String escapedMessage = message.replace("\", "\\")
                                          .replace("
\"""\\"")
                                          .replace("
\n", "\n")
                                          .replace("
\r", "\r")
                                          .replace("
\t", "\t");
            String jsonPayload = String.format("
{\"msgtype\":\"text\",\"text\":{\"content\":\"%s\"}}", escapedMessage);
            URL url new URL(fullUrl);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("POST");
            conn.setRequestProperty("Content-Type""application/json");
            conn.setDoOutput(true);
            conn.setConnectTimeout(5000);
            conn.setReadTimeout(5000);

            try (OutputStream os = conn.getOutputStream()) {
                os.write(jsonPayload.getBytes(StandardCharsets.UTF_8));
                os.flush();
            }
            
            int responseCode = conn.getResponseCode();
            if (responseCode == 200) {
                log.info("钉钉消息发送成功: {}", message);
            } else {
                log.error("钉钉消息发送失败,响应码: {}, 消息: {}", responseCode, message);
            }
        } catch (Exception e) {
            log.error("发送钉钉消息异常: {}", message, e);
        }
    }
    private String generateSign(long timestamp, String secret) throws Exception {
        String stringToSign = timestamp + "\n" + secret;
        Mac mac = Mac.getInstance("HmacSHA256");
        mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
        byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
        return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), "UTF-8");
    }
}
org.apache.seatunnel.api.event.EventHandler
com.ts7ming.DingTalkEventListener
打包
mvn clean package

部署插件

直接下载可用 jar (如果不想自己打包)
cd /opt/apache-seatunnel/lib
wget https://github.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar
# 网络差的话用 gitee
wget https://gitee.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar

# 如果用其他用户下载, 注意权限
chown -R seatunnel:seatunnel /opt/apache-seatunnel
上传
  • Jar包上传到Seatunnel 根目录 lib 下 (例如: /opt/apache-seatunnel/lib/)
重启 Seatunnel 服务
systemctl stop seatunnel-master.service
systemctl stop seatunnel-worker.service

systemctl start seatunnel-master.service
systemctl start seatunnel-worker.service
检查插件是否加载
grep "DingTalk" /opt/apache-seatunnel/logs/seatunnel-engine-master.log

# 结果应该有如下信息
INFO  [o.a.s.e.s.CoordinatorService  ] [pool-4-thread-1] - [localhost]:5801 [seatunnel] [5.1] Loaded event handlers: [com.ts7ming.DingTalkEventListener@20eaeaed, org.apache.seatunnel.api.event.LoggingEventHandler@59c99cb9]

运行任务

注意: -D参数一定要在 --config前面 (SeaTunnel启动脚本解析参数的顺序不够灵活)

sh bin/seatunnel.sh --async -Ddingtalk.webhook.url="https://oapi.dingtalk.com/robot/send?access_token=钉钉群Token" -Ddingtalk.secret="钉钉群secret" --config 任务配置.conf  -n "任务名称"

Done!


Apache SeaTunnel

Apache SeaTunnel是一个云原生的多模态、高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达9k+,社区达到7000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。




同步Demo

MySQL→Doris | MySQLCDC | MySQL→Hive | HTTP → Doris  | HTTP → MySQL | MySQL→StarRocks|MySQL→Elasticsearch |Kafka→ClickHouse

新手入门

SeaTunnel 让数据集成变得 So easy!3 分钟入门指南
 0 到 1 快速入门 /初探/深入理解 
  分布式集群部署 | CDC数据同步管道 | Oracle-CDC
图片

最佳实践

中控技术天翼云多点OPPO | 清风马蜂窝孩子王哔哩哔哩唯品会众安保险兆原数通 | 亚信科技|映客|翼康济世|信也科技|华润置地|Shopee|京东科技|58同城|互联网银行|JPMorgan
图片

测试报告

SeaTunnel VS GLUE |  VS Airbyte |  VS DataX|SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比
图片

源码解析

Zeta引擎源码解析(一) |(二) |(三)| API 源码解析 |2.1.1源码解析|封装 Flink 连接数据库解析





仓库地址: 
https://github.com/apache/seatunnel
网址:
https://seatunnel.apache.org/
Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在Community Over Code(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:
https://github.com/apache/seatunnel/issues
贡献代码:
https://github.com/apache/seatunnel/pulls
订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org
开发邮件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-3uouszk3m-PtLLNyZsJVqE5Gb6gn24mA
关注 X.com: 
https://x.com/ASFSeaTunnel