解决 SeaTunnel 2.3.4 版本写入 S3 文件报错问题

此文件系统需要您为文件系统专用一个存储桶-您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区

在使用 Apache SeaTunnel 时,我遇到了一个写入 S3 文件的报错问题。通过深入调试和分析,找到了问题所在,并提出了相应的解决方案。 17202367478258d7600dd2f37c432dfb228668c936a6d

本文将详细介绍报错情况、参考资料、解决思路以及后续研究方向,希望对大家有帮助!

一、详细报错

2024-04-12 20:44:18,647 ERROR [.c.FileSinkAggregatedCommitter] [hz.main.generic-operation.thread-43] - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1/NON_PARTITION/output_params_0.json=/xugurtp/seatunnel/tmp/6af80b38f3434aceb573cc65b9cd12216a/39111/output_params_0.json}}, partitionDirAndValuesMap={}) java.lang.IllegalStateException: Connection pool shut down

二、参考资料

三、解决思路

1. 远程调试

在本地 IDEA 中进行 debug 未发现报错,但在服务器上执行时却报错,因此决定进行远程 debug。执行以下命令添加 JVM 参数:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

实际命令是:

 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dhazelcast.client.config=/opt/module/seatunnel-2.3.4/config/hazelcast-client.yaml -Dseatunnel.config=/opt/module/seatunnel-2.3.4/config/seatunnel.yaml -Dhazelcast.config=/opt/module/seatunnel-2.3.4/config/hazelcast.yaml -Dlog4j2.configurationFile=/opt/module/seatunnel-2.3.4/config/log4j2_client.properties -Dseatunnel.logs.path=/opt/module/seatunnel-2.3.4/logs -Dseatunnel.logs.file_name=seatunnel-starter-client -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client -XX:MaxMetaspaceSize=1g -XX:+UseG1GC -cp /opt/module/seatunnel-2.3.4/lib/*:/opt/module/seatunnel-2.3.4/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient -e local --config job/s3_sink.conf -cn xxx

2. 定位问题

通过调试发现问题出在 hadoop-aws 使用的缓存连接池对象。关键在于 if 判断部分,如果上游传递了 fs.s3a.impl.disable.cache=true,则不使用缓存。深入 debug 发现:有时 hadoopConf.getSchema 获取的不是 s3a 而是 s3n

s3 和 s3n /s3a 的区别

  • s3:基于块的文件系统

  • s3n:基于对象存储的文件系统,支持高达 5GB 的对象

  • s3a:基于对象存储的文件系统,支持高达 5TB 的对象,并具有更高的性能

在配置文件中设置的是 s3a,但实际获取到的是 s3n,这显然不合理。

3. 深入挖掘

我仔细看了一下报错的截图发现:

172023706252871df30737227fbe3d4dbef9ece220750

确实是 commit 期间报的错:那么也就是说 commit 初始化 s3conf 并没有走 buildWithConfig 方法,而是用的默认值,而且我根本没找到 commit 里面有 new s3Conf 的代码,再次 debug 看看谁去重新初始化了 S3Conf

17202370625402d52b7efd3bdae908c7232b69b413a3e

定位到这里就很头疼了,已经涉及到引擎层而非插件层面了,涉及到 classloader 的使用以及反序列化操作:

1720237062480947f16742091f04de844a40538010f45

反序列化代码:

        logicalDag =                CustomClassLoadedObject.deserializeWithCustomClassLoader(                        nodeEngine.getSerializationService(),                        classLoader,                        jobImmutableInformation.getLogicalDag());

很明显可以看出,S3Conf(静态类) 被重新初始化了,导致 SHEMA 被重新赋值成 s3n

1720237062508490c6e8423d3a6e27638e1cad155dbb0

因为 s3conf 它本身的属性都是静态的,而对 classloader 反序列化是时会重新加载静态属性的,所以导致 shema 被重新赋值为默认 s3n

综上所述

除了 sourcesink 阶段,AggregatedCommit 操作也会写入 s3File。错误发生在 commit 期间,说明初始化 S3Conf 时并没有走 buildWithConfig 方法,而是使用了默认值。

由于 S3Conf 类的属性是静态的,反序列化时会重新加载静态属性,导致 SCHEMA 被重新赋值为默认的 s3n

资料参考https://wiki.apache.org/hadoop/AmazonS3

s3:基于 Block 块的文件系统

S3 Block FileSystem(URI scheme:s3)由 S3 支持的基于块的文件系统。 文件存储为块,就像 HDFS 一样。 这样可以有效地实现重命名。 此文件系统需要您为文件系统专用一个存储桶 - 您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区。 此文件系统存储的文件大于 5GB,但不能与其他 S3 工具进行互操作。

s3n:基于对象存储的文件系统

S3 Native FileSystem(URI scheme:s3n)用于在 S3 上读取和写入常规文件的本机文件系统。 这个文件系统的优点是您可以访问使用其他工具编写的 S3 上的文件。 相反,其他工具可以访问使用 Hadoop 编写的文件。 缺点是 S3 的文件大小限制为 5GB。

s3a:基于对象存储的文件系统

S3A(URI 方案:s3a)是 S3 Native,s3n fs 的继承者,S3a:系统使用 Amazon 的库与 S3 进行交互。 这允许 S3A 支持较大的文件(不超过 5GB 的限制),更高的性能操作等等。 文件系统旨在替代 S3 Native:从 s3n:// URL 可访问的所有对象也应该通过替换 URL 模式从 s3a 访问。

public class S3Conf extends HadoopConf {    private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";    private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";    private static final String S3A_SCHEMA = "s3a";    private static final String DEFAULT_SCHEMA = "s3n";    private static String SCHEMA = DEFAULT_SCHEMA;    [@Override](https://my.oschina.net/u/1162528)    public String getFsHdfsImpl() {        return switchHdfsImpl();    }    [@Override](https://my.oschina.net/u/1162528)    public String getSchema() {        return SCHEMA;    }    private S3Conf(String hdfsNameKey) {        super(hdfsNameKey);    }    public static HadoopConf buildWithConfig(Config config) {        HadoopConf hadoopConf = new S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));        String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());        if (bucketName.startsWith(S3A_SCHEMA)) {            SCHEMA = S3A_SCHEMA;        }        HashMap<String, String> s3Options = new HashMap<>();        putS3SK(s3Options, config);        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));        }        s3Options.put(                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),                config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));        s3Options.put(                S3ConfigOptions.FS_S3A_ENDPOINT.key(),                config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));        hadoopConf.setExtraOptions(s3Options);        return hadoopConf;    }    public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {        Config config = readonlyConfig.toConfig();        HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));        String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);        if (bucketName.startsWith(S3A_SCHEMA)) {            SCHEMA = S3A_SCHEMA;        }        HashMap<String, String> s3Options = new HashMap<>();        putS3SK(s3Options, config);        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));        }        s3Options.put(                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),                readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());        s3Options.put(                S3ConfigOptions.FS_S3A_ENDPOINT.key(),                readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));        hadoopConf.setExtraOptions(s3Options);        return hadoopConf;    }    private String switchHdfsImpl() {        switch (SCHEMA) {            case S3A_SCHEMA:                return HDFS_S3A_IMPL;            default:                return HDFS_S3N_IMPL;        }    }    private static void putS3SK(Map<String, String> s3Options, Config config) {        if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())                && !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {            return;        }        String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());        String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());        if (S3A_SCHEMA.equals(SCHEMA)) {            s3Options.put("fs.s3a.access.key", accessKey);            s3Options.put("fs.s3a.secret.key", secretKey);            return;        }        // default s3n        s3Options.put("fs.s3n.awsAccessKeyId", accessKey);        s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);    }}

参考了反序列的知识才了解到这个情况:

当对一个包含静态成员的类进行反序列化时,静态成员不会恢复为之前的状态,而是保持在其初始状态。任何静态变量的值都是与该类本身相关的,

4. 解决方案

  • 1. 去掉 stastic 修饰,把有参构造换成无参构造和静态工厂方法:

  • 2. 保留 stastic 静态方法,使用 getSchema 方法代替静态属性调用:

由此可见,代码中的细节问题,即使看似微不足道,也可能引发严重的后果。一个简单的静态修饰符的误用,不仅能导致程序行为异常,更可能导致系统稳定性和安全性的大问题。

相关的 issues 已提交,大家有兴趣可以查看:

四、有待研究

1. 为什么只有 local 模式会报错:

推测可能是 cluster 模式是分布式的,每个算子分布在不同的机器上,所以本地缓存不会被使用,类似于没有走缓存。

2. 为什么本地 IDEA 执行 local 模式却没问题

可能是 Windows 和 Linux 的线程调度机制不同导致的。

结论

通过这次对 Apache SeaTunnel S3 File 写入报错问题的分析与解决,希望这些经验能帮助到遇到类似问题的开发者,同时也提醒大家在处理分布式系统时注意细节问题,以免引发不必要的故障。