
https://github.com/apache/
点击蓝字
关注我们
开发插件 步骤, 下载 jar包即可com.ts7ming 自定义即可│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─ts7ming
│ DingTalkEventListener.java
│
└─resources
└─META-INF
└─services
org.apache.seatunnel.api.event.EventHandler
2.3.13 版本, 根据实际情况调整<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ts7ming</groupId>
<artifactId>SeatunnelExt</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<seatunnel.version>2.3.13</seatunnel.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-common</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package com.ts7ming;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@Slf4j
public class DingTalkEventListener implements EventHandler {
private static final String WEBHOOK_URL = System.getProperty("dingtalk.webhook.url", "https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN");
private static final String SECRET = System.getProperty("dingtalk.secret", "YOUR_SECRET");
@Override
public void handle(Event event) {
EventType eventType = event.getEventType();
if (eventType == EventType.JOB_STATUS) {
handleJobStateEvent((JobStateEvent) event);
}
// else if (eventType.name().equals("SCHEMA_CHANGE_ADD_COLUMN")) {
// handleAddColumnEvent((AlterTableAddColumnEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_UPDATE_COLUMNS")) {
// handleUpdateColumnEvent((AlterTableColumnsEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_DROP_COLUMN")) {
// handleDropColumnEvent((AlterTableDropColumnEvent) event);
// }
// else if (eventType.name().equals("SCHEMA_CHANGE_MODIFY_COLUMN")) {
// handleModifyColumnEvent((AlterTableModifyColumnEvent) event);
// }
else {
log.debug("忽略未处理的事件类型: {}", eventType);
}
}
private void handleJobStateEvent(JobStateEvent jobEvent) {
String jobId = jobEvent.getJobId();
String jobName = jobEvent.getJobName();
JobStatus status = jobEvent.getJobStatus();
long eventTime = jobEvent.getCreatedTime();
switch (status) {
case FAILED:
sendAlert("【任务失败】jobId: " + jobId + ", jobName: " + jobName);
break;
case FINISHED:
//sendAlert("任务完成: " + jobId + ", jobName: " + jobName);
break;
//其他需要处理的事件
// case READER_OPEN:
// break;
// case WRITER_CLOSE:
// break;
default:
log.debug("任务状态变更 | jobId: {}, 状态: {}, 时间: {}",jobId, status, eventTime);
// 不发送通知
}
}
private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
sendAlert("【表结构变更】表名: " + tableName + ", 新增列: " + columnName);
}
private void handleUpdateColumnEvent(AlterTableColumnsEvent event) {
String tableName = event.getTableIdentifier().getTableName();
sendAlert("【表结构变更】表名: " + tableName + ", 更新内容: " + event);
}
private void handleDropColumnEvent(AlterTableDropColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn() : "未知列";
sendAlert("【表结构变更】表名: " + tableName + ", 删除列: " + columnName);
}
private void handleModifyColumnEvent(AlterTableModifyColumnEvent event) {
String tableName = event.getTableIdentifier().getTableName();
String columnName = event.getColumn() != null ? event.getColumn().getName() : "未知列";
sendAlert("【表结构变更】表名: " + tableName + ", 修改列: " + columnName);
}
private void sendAlert(String content) {
sendDingTalkMessage(content);
}
void sendDingTalkMessage(String message) {
try {
long timestamp = System.currentTimeMillis();
String sign = generateSign(timestamp, SECRET);
String fullUrl = WEBHOOK_URL + "×tamp=" + timestamp + "&sign=" + sign;
String escapedMessage = message.replace("\", "\\")
.replace("\"", "\\"")
.replace("\n", "\n")
.replace("\r", "\r")
.replace("\t", "\t");
String jsonPayload = String.format("{\"msgtype\":\"text\",\"text\":{\"content\":\"%s\"}}", escapedMessage);
URL url = new URL(fullUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
try (OutputStream os = conn.getOutputStream()) {
os.write(jsonPayload.getBytes(StandardCharsets.UTF_8));
os.flush();
}
int responseCode = conn.getResponseCode();
if (responseCode == 200) {
log.info("钉钉消息发送成功: {}", message);
} else {
log.error("钉钉消息发送失败,响应码: {}, 消息: {}", responseCode, message);
}
} catch (Exception e) {
log.error("发送钉钉消息异常: {}", message, e);
}
}
private String generateSign(long timestamp, String secret) throws Exception {
String stringToSign = timestamp + "\n" + secret;
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
byte[] signData = mac.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8));
return URLEncoder.encode(new String(Base64.getEncoder().encode(signData)), "UTF-8");
}
}
com.ts7ming.DingTalkEventListener
mvn clean package
cd /opt/apache-seatunnel/lib
wget https://github.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar
# 网络差的话用 gitee
wget https://gitee.com/ts7ming/SeatunnelExt/releases/download/v1/SeatunnelExt-1.0-SNAPSHOT.jar
# 如果用其他用户下载, 注意权限
chown -R seatunnel:seatunnel /opt/apache-seatunnel
systemctl stop seatunnel-master.service
systemctl stop seatunnel-worker.service
systemctl start seatunnel-master.service
systemctl start seatunnel-worker.service
grep "DingTalk" /opt/apache-seatunnel/logs/seatunnel-engine-master.log
# 结果应该有如下信息
INFO [o.a.s.e.s.CoordinatorService ] [pool-4-thread-1] - [localhost]:5801 [seatunnel] [5.1] Loaded event handlers: [com.ts7ming.DingTalkEventListener@20eaeaed, org.apache.seatunnel.api.event.LoggingEventHandler@59c99cb9]注意: -D参数一定要在 --config前面 (SeaTunnel启动脚本解析参数的顺序不够灵活)
sh bin/seatunnel.sh --async -Ddingtalk.webhook.url="https://oapi.dingtalk.com/robot/send?access_token=钉钉群Token" -Ddingtalk.secret="钉钉群secret" --config 任务配置.conf -n "任务名称"
Apache SeaTunnel是一个云原生的多模态、高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会ApacheSoftware Foundation正式宣布SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达9k+,社区达到7000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等170多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。
同步Demo
新手入门

最佳实践

测试报告

源码解析



