随着数据技术的快速发展,了解并掌握各种工具和技术变得尤为重要。为此,我们准备在 Apache 社区发起如何使用连接器的 Demo 演示计划,邀请所有热爱数据同步技术的同学分享他们的知识和实操经验!
我们第三期主题是:如何使用 SeaTunnel 连接器从 MySQL 同步到 Hive,如果您对此计划感兴趣,也欢迎联系社区运营同学参与 Demo 录制!无论您是数据工程师、开发者还是技术爱好者,都欢迎您参与并展示您的技术才能。
敲重点~敲重点~如果你是用户,想看什么同步场景的 Demo!请下滑到最底部留言,我们优先出品呼声最高的同步场景 Demo!
我们的目标是创建一个共享和学习的平台,通过具体的 Demo 演示和对应的文档帮助社区成员更好地理解和应用各种数据连接器。这些 Demo 可以帮助新手快速学习,同时也为资深专家提供一个展示创新解决方案的舞台。
关于从 MySQL 同步到 Hive,前段时间也有用户投稿,感兴趣的同学可以搜索看看:
【最佳实践】2 个步骤教你从 Mysql 同步到 Hive
如何使用 SeaTunnel 同步 MySQL 数据到 Hive
Mysql Source 连接器相关请参考之前的教程: 全方位解读 SeaTunnel MySQL CDC 连接器:实现数据高效同步的强大工具
需要参考的文档及代码原文链接:https://seatunnel.apache.org/docs/2.3.5/connector-v2/sink/Hive (预计 2.3.6 版本才能正式使用)
将数据写入到 Hive。
要使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hive。
如果您使用 SeaTunnel Zeta Engine,则需要将 seatunnel-hadoop3-3.1.4-uber.jar
、hive-exec-3.1.3.jar
和 libfb303-0.9.3.jar
放置在 $SEATUNNEL_HOME/lib/
目录下。 :::
[x] 精确一次
默认情况下,我们使用两阶段提交(2PC)来确保 精确一次
。
[x] 文件格式
[x] text
[x] csv
[x] parquet
[x] orc
[x] json
[x] 压缩编码
[x] lzo
| 名称 | 类型 | 必需 | 默认值 | |-------------------------------|---------|--------|---------| | table_name | string | 是 | - | | metastore_uri | string | 是 | - | | compress_codec | string | 否 | none | | hdfs_site_path | string | 否 | - | | hive_site_path | string | 否 | - | | hive.hadoop.conf | Map | 否 | - | | hive.hadoop.conf-path | string | 否 | - | | krb5_path | string | 否 | /etc/krb5.conf | | kerberos_principal | string | 否 | - | | kerberos_keytab_path | string | 否 | - | | abort_drop_partition_metadata | boolean | 否 | true | | common-options | | 否 | - |
目标 Hive 表的名称,例如:db1.table1
。如果源是多模式的,您可以使用 ${database_name}.${table_name}
来生成表名,它会用源中生成的 CatalogTable 的值替换 ${database_name}
和 ${table_name}
。
Hive Metastore 的 URI。
hdfs-site.xml
的路径,用于加载 namenodes 的高可用配置。
hive-site.xml
的路径。
Hadoop 配置文件中的属性(core-site.xml
、hdfs-site.xml
、hive-site.xml
)。
core-site.xml
、hdfs-site.xml
、hive-site.xml
文件的指定加载路径。
krb5.conf
的路径,用于 Kerberos 认证。
Kerberos 的 principal。
Kerberos 的 keytab 路径。
决定在中止操作期间是否从 Hive Metastore 中删除分区元数据的标志。
注意:这仅影响 metastore 中的元数据,同步过程中生成的数据将始终被删除。
Sink 插件的常用参数,请参阅 Sink Common Options 获取详细信息。
Hive { table_name = "default.seatunnel_orc" metastore_uri = "thrift://namenode001:9083"}
我们有一个源表,如下所示:
create table test_hive_source( test_tinyint TINYINT, test_smallint SMALLINT, test_int INT, test_bigint BIGINT, test_boolean BOOLEAN, test_float FLOAT, test_double DOUBLE, test_string STRING, test_binary BINARY, test_timestamp TIMESTAMP, test_decimal DECIMAL(8,2), test_char CHAR(64), test_varchar VARCHAR(64), test_date DATE, test_array ARRAY<INT>, test_map MAP<STRING, FLOAT>, test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>)PARTITIONED BY (test_par1 STRING, test_par2 STRING);
我们需要从源表读取数据并写入到另一个表中:
create table test_hive_sink_text_simple( test_tinyint TINYINT, test_smallint SMALLINT, test_int INT, test_bigint BIGINT, test_boolean BOOLEAN, test_float FLOAT, test_double DOUBLE, test_string STRING, test_binary BINARY, test_timestamp TIMESTAMP, test_decimal DECIMAL(8,2), test_char CHAR(64), test_varchar VARCHAR(64), test_date DATE)PARTITIONED BY (test_par1 STRING, test_par2 STRING);
作业配置文件如下:
env { parallelism = 3 job.name="test_hive_source_to_hive"}source { Hive { table_name = "test_hive.test_hive_source" metastore_uri = "thrift://ctyun7:9083" }}sink { Hive { table_name = "test_hive.test_hive_sink_text_simple" metastore_uri = "thrift://ctyun7:9083" hive.hadoop.conf = { bucket = "s3a://mybucket" } }}
1、为 EMR 的 Hive 创建 lib 目录
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
2、从 Maven 中心获取 jar 到 lib 目录
cd ${SEATUNNEL_HOME}/plugins/Hive/libwget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jarwget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
3、从 EMR 环境中复制 jar 到 lib 目录
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/libcp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/libcp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/libcp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
4、运行测试用例
env { parallelism = 1 job.mode = "BATCH"}source { FakeSource { schema = { fields { pk_id = bigint name = string score = int } primaryKey { name = "pk_id" columnNames = [pk_id] } } rows = [ { kind = INSERT fields = [1, "A", 100] }, { kind = INSERT fields = [2, "B", 100] }, { kind = INSERT fields = [3, "C", 100] } ] }}sink { Hive { table_name = "test_hive.test_hive_sink_on_s3" metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083" hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf" hive.hadoop.conf = { bucket="s3://ws-package" } }}
1、为 EMR 的 Hive 创建 lib 目录
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
2、从 Maven 中心获取 jar 到 lib 目录
cd ${SEATUNNEL_HOME}/plugins/Hive/libwget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
3、从 EMR 环境中复制 jar 到 lib 目录并删除冲突的 jar
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/librm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
4、运行测试用例
env { parallelism = 1 job.mode = "BATCH"}source { FakeSource { schema = { fields { pk_id = bigint name = string score = int } primaryKey { name = "pk_id" columnNames = [pk_id] } } rows = [ { kind = INSERT fields = [1, "A", 100] }, { kind = INSERT fields = [2, "B", 100] }, { kind = INSERT fields = [3, "C", 100] } ] }}sink { Hive { table_name = "test_hive.test_hive_sink_on_oss" metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083" hive.hadoop.conf-path = "/tmp/hadoop" hive.hadoop.conf = { bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com" } }}
我们有多个源表,如下所示:
create table test_1()PARTITIONED BY (xx);create table test_2()PARTITIONED BY (xx);...
我们需要从这些源表读取数据并写入到其他表中:
作业配置文件如下:
env { # 在这里设置 Flink 配置 parallelism = 3 job.name="test_hive_source_to_hive"}source { Hive { tables_configs = [ { table_name = "test_hive.test_1" metastore_uri = "thrift://ctyun6:9083" }, { table_name = "test_hive.test_2" metastore_uri = "thrift://ctyun7:9083" } ] }}sink { Hive { table_name = "${database_name}.${table_name}" metastore_uri = "thrift://ctyun7:9083" }}
通过视频教程,我们探讨了如何使用 Apache SeaTunnel 的 Hive Sink Connector 将数据高效地写入 Hive 表。
无论是在本地环境还是云上部署,使用 Hive Sink Connector 都能够帮助企业构建高效、可靠的数据处理流程。希望通过本文的指导,您能更好地理解和应用这一强大的工具,以满足您的数据处理需求。
如果您对本文内容有任何疑问或建议,欢迎在评论区分享您的想法。让我们共同探讨和进步,不断推动数据技术的边界。