作者 | caoyongfei
部署
1
下载解压
# 解压到了/opt/module目录下tar -zxvf apache-SeaTunnel-incubating-2.3.1-bin.tar.gz -C /opt/module
2
下载对应的connector
https://repo.maven.apache.org/maven2/org/apache/SeaTunnel/
connector-assert-2.3.1.jarconnector-cdc-mysql-2.3.1.jarconnector-console-2.3.1.jar # 自带的connector-doris-2.3.1.jarconnector-elasticsearch-2.3.1.jarconnector-fake-2.3.1.jar # 自带的connector-file-hadoop-2.3.1.jarconnector-file-local-2.3.1.jarconnector-hive-2.3.1.jarconnector-iceberg-2.3.1.jarconnector-jdbc-2.3.1.jarconnector-kafka-2.3.1.jarconnector-redis-2.3.1.jar
配置安装SeaTunnel的插件
vim SeaTunnel-2.3.1/config/plugin_config
调用安装脚本的时候会在maven的中央仓库下载对应的jar包,尽量少放,下载太慢了,我放了这些。
--connectors-v2--connector-assertconnector-cdc-mysqlconnector-jdbcconnector-fakeconnector-console--end--
3
安装SeaTunnel
sh bin/install-plugin.sh 2.3.1
hive-exec-2.3.9.jar
# 下载链接
# https://repo.maven.apache.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
# 注意这里是hive-exec-2.3.9.jar,不要从你的hive的lib目录下拷贝最新的jar包,就用这个
SeaTunnel-hadoop3-3.1.4-uber-2.3.1.jar
# 下载链接
# https://repo.maven.apache.org/maven2/org/apache/SeaTunnel/SeaTunnel-hadoop3-3.1.4-uber/2.3.1/SeaTunnel-hadoop3-3.1.4-uber-2.3.1.jar
SeaTunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar
# 下载链接
# hhttps://repo.maven.apache.org/maven2/org/apache/SeaTunnel/SeaTunnel-hadoop3-3.1.4-uber/2.3.1/SeaTunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar
测试样例
1
官方demo fake to console
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {
}
}
运行命令
cd /opt/module/SeaTunnel-2.3.1./bin/SeaTunnel.sh --config ./config/v2.batch.config.template -e lcoal
2
MySQL to console
vim mysql_2console.conf
mysql_2console.conf
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source{
Jdbc {
url = "jdbc:mysql://hadoop102/dim_db?useUnicode=true&characterEncoding=utf8&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "xxxxxx"
query = "select * from dim_basicdata_date_a_d where date < '2010-12-31'"
}
}
sink {
Console {
}
}
CREATE DATABASE dim_db DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
drop table if exists dim_db.dim_basicdata_date_a_d;
create table if not exists dim_db.dim_basicdata_date_a_d
(
`date` varchar(40) comment '日期',
`year` varchar(40) comment '年',
`quarter` varchar(40) comment '季度(1/2/3/4)',
`season` varchar(40) comment '季节(春季/夏季/秋季/冬季)',
`month` varchar(40) comment '月',
`day` varchar(40) comment '日',
`week` varchar(40) comment '年内第几周',
`weekday` varchar(40) comment '周几(1-周一/2-周二/3-周三/4-周四/5-周五/6-周六/7-周日)',
`is_workday` varchar(40) comment '是否是工作日(1-是,0-否)',
`date_type` varchar(40) comment '节假日类型(工作日/法定上班[调休]/周末/节假日)',
`update_date` varchar(40) comment '更新日期'
);
cd /opt/module/SeaTunnel-2.3.1./bin/SeaTunnel.sh --config ./job/mysql_2console.conf -e local
3
Hive to console
CREATE database db_hive;
drop table if exists db_hive.dim_basicdata_date_a_d;
create table if not exists db_hive.dim_basicdata_date_a_d
(
`date` string comment '日期',
`year` string comment '年',
`quarter` string comment '季度(1/2/3/4)',
`season` string comment '季节(春季/夏季/秋季/冬季)',
`month` string comment '月',
`day` string comment '日',
`week` string comment '年内第几周',
`weekday` string comment '周几(1-周一/2-周二/3-周三/4-周四/5-周五/6-周六/7-周日)',
`is_workday` string comment '是否是工作日(1-是,0-否)',
`date_type` string comment '节假日类型(工作日/法定上班[调休]/周末/节假日)',
`update_date` string comment '更新日期'
);
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source{
Hive {
table_name = "db_hive.dim_basicdata_date_a_d"
metastore_uri = "thrift://hadoop102:9083"
}
}
sink {
Console {
}
}
<!-- 为了方便连接,采用直连的方式连接到hive数据库,注释掉下面三条配置信息 -->
<!-- 指定存储元数据要连接的地址 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop102:9083</value>
</property>
<!-- 指定 hiveserver2 连接的 host -->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop102</value>
</property>
<!-- 指定 hiveserver2 连接的端口号 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
cd /opt/module/SeaTunnel-2.3.1
./bin/SeaTunnel.sh --config ./job/hive_2console.conf -e local
4
MySQL to hive
dim_basicdate_mysql_2hive.conf
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source{
Jdbc {
url = "jdbc:mysql://hadoop102/dim_db?useUnicode=true&characterEncoding=utf8&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "111111"
query = "select * from dim_basicdata_date_a_d"
}
}
sink {
Hive {
table_name = "db_hive.dim_basicdata_date_a_d"
metastore_uri = "thrift://hadoop102:9083"
}
}
cd /opt/module/SeaTunnel-2.3.1./bin/SeaTunnel.sh --config ./job/dim_basicdate_mysql_2hive.conf-e local