解决 SeaTunnel 2.3.4 版本写入 S3 文件报错问题

此文件系统需要您为文件系统专用一个存储桶-您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区

在使用 Apache SeaTunnel 时,我遇到了一个写入 S3 文件的报错问题。通过深入调试和分析,找到了问题所在,并提出了相应的解决方案。 17202367478258d7600dd2f37c432dfb228668c936a6d

本文将详细介绍报错情况、参考资料、解决思路以及后续研究方向,希望对大家有帮助!

一、详细报错

2024-04-12 20:44:18,647 ERROR [.c.FileSinkAggregatedCommitter] [hz.main.generic-operation.thread-43] - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1/NON_PARTITION/output_params_0.json=/xugurtp/seatunnel/tmp/6af80b38f3434aceb573cc65b9cd12216a/39111/output_params_0.json}}, partitionDirAndValuesMap={}) java.lang.IllegalStateException: Connection pool shut down

二、参考资料

三、解决思路

1. 远程调试

在本地 IDEA 中进行 debug 未发现报错,但在服务器上执行时却报错,因此决定进行远程 debug。执行以下命令添加 JVM 参数:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

实际命令是:

 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dhazelcast.client.config=/opt/module/seatunnel-2.3.4/config/hazelcast-client.yaml -Dseatunnel.config=/opt/module/seatunnel-2.3.4/config/seatunnel.yaml -Dhazelcast.config=/opt/module/seatunnel-2.3.4/config/hazelcast.yaml -Dlog4j2.configurationFile=/opt/module/seatunnel-2.3.4/config/log4j2_client.properties -Dseatunnel.logs.path=/opt/module/seatunnel-2.3.4/logs -Dseatunnel.logs.file_name=seatunnel-starter-client -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client -XX:MaxMetaspaceSize=1g -XX:+UseG1GC -cp /opt/module/seatunnel-2.3.4/lib/*:/opt/module/seatunnel-2.3.4/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient -e local --config job/s3_sink.conf -cn xxx

2. 定位问题

通过调试发现问题出在 hadoop-aws 使用的缓存连接池对象。关键在于 if 判断部分,如果上游传递了 fs.s3a.impl.disable.cache=true,则不使用缓存。深入 debug 发现:有时 hadoopConf.getSchema 获取的不是 s3a 而是 s3n

s3 和 s3n /s3a 的区别

  • s3:基于块的文件系统

  • s3n:基于对象存储的文件系统,支持高达 5GB 的对象

  • s3a:基于对象存储的文件系统,支持高达 5TB 的对象,并具有更高的性能

在配置文件中设置的是 s3a,但实际获取到的是 s3n,这显然不合理。

3. 深入挖掘

我仔细看了一下报错的截图发现:

172023706252871df30737227fbe3d4dbef9ece220750

确实是 commit 期间报的错:那么也就是说 commit 初始化 s3conf 并没有走 buildWithConfig 方法,而是用的默认值,而且我根本没找到 commit 里面有 new s3Conf 的代码,再次 debug 看看谁去重新初始化了 S3Conf

17202370625402d52b7efd3bdae908c7232b69b413a3e

定位到这里就很头疼了,已经涉及到引擎层而非插件层面了,涉及到 classloader 的使用以及反序列化操作:

1720237062480947f16742091f04de844a40538010f45

反序列化代码:

        logicalDag =                CustomClassLoadedObject.deserializeWithCustomClassLoader(                        nodeEngine.getSerializationService(),                        classLoader,                        jobImmutableInformation.getLogicalDag());

很明显可以看出,S3Conf(静态类) 被重新初始化了,导致 SHEMA 被重新赋值成 s3n 了

1720237062508490c6e8423d3a6e27638e1cad155dbb0

因为 s3conf 它本身的属性都是静态的,而对 classloader 反序列化是时会重新加载静态属性的,所以导致 shema 被重新赋值为默认 s3n 了

综上所述

除了 source 和 sink 阶段,AggregatedCommit 操作也会写入 s3File。错误发生在 commit 期间,说明初始化 S3Conf 时并没有走 buildWithConfig 方法,而是使用了默认值。

由于 S3Conf 类的属性是静态的,反序列化时会重新加载静态属性,导致 SCHEMA 被重新赋值为默认的 s3n

资料参考 :https://wiki.apache.org/hadoop/AmazonS3

s3:基于 Block 块的文件系统

S3 Block FileSystem(URI scheme:s3)由 S3 支持的基于块的文件系统。 文件存储为块,就像 HDFS 一样。 这样可以有效地实现重命名。 此文件系统需要您为文件系统专用一个存储桶 - 您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区。 此文件系统存储的文件大于 5GB,但不能与其他 S3 工具进行互操作。

s3n:基于对象存储的文件系统

S3 Native FileSystem(URI scheme:s3n)用于在 S3 上读取和写入常规文件的本机文件系统。 这个文件系统的优点是您可以访问使用其他工具编写的 S3 上的文件。 相反,其他工具可以访问使用 Hadoop 编写的文件。 缺点是 S3 的文件大小限制为 5GB。

s3a:基于对象存储的文件系统

S3A(URI 方案:s3a)是 S3 Native,s3n fs 的继承者,S3a:系统使用 Amazon 的库与 S3 进行交互。 这允许 S3A 支持较大的文件(不超过 5GB 的限制),更高的性能操作等等。 文件系统旨在替代 S3 Native:从 s3n:// URL 可访问的所有对象也应该通过替换 URL 模式从 s3a 访问。

public class S3Conf extends HadoopConf {    private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";    private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";    private static final String S3A_SCHEMA = "s3a";    private static final String DEFAULT_SCHEMA = "s3n";    private static String SCHEMA = DEFAULT_SCHEMA;    [@Override](https://my.oschina.net/u/1162528)    public String getFsHdfsImpl() {        return switchHdfsImpl();    }    [@Override](https://my.oschina.net/u/1162528)    public String getSchema() {        return SCHEMA;    }    private S3Conf(String hdfsNameKey) {        super(hdfsNameKey);    }    public static HadoopConf buildWithConfig(Config config) {        HadoopConf hadoopConf = new S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));        String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());        if (bucketName.startsWith(S3A_SCHEMA)) {            SCHEMA = S3A_SCHEMA;        }        HashMap<String, String> s3Options = new HashMap<>();        putS3SK(s3Options, config);        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));        }        s3Options.put(                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),                config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));        s3Options.put(                S3ConfigOptions.FS_S3A_ENDPOINT.key(),                config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));        hadoopConf.setExtraOptions(s3Options);        return hadoopConf;    }    public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {        Config config = readonlyConfig.toConfig();        HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));        String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);        if (bucketName.startsWith(S3A_SCHEMA)) {            SCHEMA = S3A_SCHEMA;        }        HashMap<String, String> s3Options = new HashMap<>();        putS3SK(s3Options, config);        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));        }        s3Options.put(                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),                readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());        s3Options.put(                S3ConfigOptions.FS_S3A_ENDPOINT.key(),                readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));        hadoopConf.setExtraOptions(s3Options);        return hadoopConf;    }    private String switchHdfsImpl() {        switch (SCHEMA) {            case S3A_SCHEMA:                return HDFS_S3A_IMPL;            default:                return HDFS_S3N_IMPL;        }    }    private static void putS3SK(Map<String, String> s3Options, Config config) {        if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())                && !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {            return;        }        String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());        String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());        if (S3A_SCHEMA.equals(SCHEMA)) {            s3Options.put("fs.s3a.access.key", accessKey);            s3Options.put("fs.s3a.secret.key", secretKey);            return;        }        // default s3n        s3Options.put("fs.s3n.awsAccessKeyId", accessKey);        s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);    }}

参考了反序列的知识才了解到这个情况:

当对一个包含静态成员的类进行反序列化时,静态成员不会恢复为之前的状态,而是保持在其初始状态。任何静态变量的值都是与该类本身相关的,

4. 解决方案

  • 1. 去掉 stastic 修饰,把有参构造换成无参构造和静态工厂方法:

  • 2. 保留 stastic 静态方法,使用 getSchema 方法代替静态属性调用:

由此可见,代码中的细节问题,即使看似微不足道,也可能引发严重的后果。一个简单的静态修饰符的误用,不仅能导致程序行为异常,更可能导致系统稳定性和安全性的大问题。

相关的 issues 已提交,大家有兴趣可以查看:

四、有待研究

1. 为什么只有 local 模式会报错:

推测可能是 cluster 模式是分布式的,每个算子分布在不同的机器上,所以本地缓存不会被使用,类似于没有走缓存。

2. 为什么本地 IDEA 执行 local 模式却没问题

可能是 Windows 和 Linux 的线程调度机制不同导致的。

结论

通过这次对 Apache SeaTunnel S3 File 写入报错问题的分析与解决,希望这些经验能帮助到遇到类似问题的开发者,同时也提醒大家在处理分布式系统时注意细节问题,以免引发不必要的故障。