必看!S3File Sink Connector 使用文档

S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。

16554572107562be03194c595c0ec13b210d62ee32323

S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。


01




描述


将数据输出到 AWS S3 文件系统。
提示:
如果您使用的是 Spark/Flink,在使用此连接器之前,必须确保您的 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。
如果您使用的是 SeaTunnel Engine,它会在您下载和安装 SeaTunnel Engine 时自动集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib 目录下确认这个 JAR 包是否存在。

02




主要特性

✅仅一次语义
默认情况下,我们使用 2PC 提交来确保 "仅一次语义"。
✅文件格式类型

    ✅文本 (text)
    CSV
    Parquet
    ORC
    JSON
    Excel

03




选项

选项
类型必需默认值 备注
path
string
-

bucket
string
-

fs.s3a.endpoint
string
-

fs.s3a.aws.credentials.provider
string
com.amazonaws.auth.InstanceProfileCredentialsProvider

access_key
string
-
仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
access_secret
string
-
仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
custom_filename
boolean
false
是否需要自定义文件名
file_name_expression
string
"${transactionId}"
仅在 custom_filename 为 true 时使用
filename_time_format
string
"yyyy.MM.dd"
仅在 custom_filename 为 true 时使用
file_format_type
string
"csv"

field_delimiter
string
'01'
仅在 file_format 为 text 时使用
row_delimiter
string
"n"
仅在 file_format 为 text 时使用
have_partition
boolean
false
是否需要处理分区
partition_by
array
-
仅在 have_partition 为 true 时使用
partition_dir_expression
string
"k0={v0}/k1={v1}/.../kn={vn}/"
仅在 have_partition 为 true 时使用
is_partition_field_write_in_file
boolean
false
仅在 have_partition 为 true 时使用
sink_columns
array

当此参数为空时,将写入所有从 "Transform" 或 "Source" 获取的字段
is_enable_transaction
boolean
true

batch_size
int
1000000

compress_codec
string
none

common-options
object
-

max_rows_in_memory
int
-
仅在 file_format 为 Excel 时使用
sheet_name
string
Sheet${Random number}
仅在 file_format 为 Excel 时使用

path [string]

目标目录路径是必需的。

bucket [string]

S3 文件系统的bucket地址,例如:s3n://seatunnel-test,如果您使用的是 s3a 协议,此参数应为 s3a://seatunnel-test

fs.s3a.endpoint [string]

fs s3a 端点

fs.s3a.aws.credentials.provider [string]

认证 s3a 的方式。目前我们仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 和 com.amazonaws.auth.InstanceProfileCredentialsProvider
关于凭证提供程序的更多信息,您可以参考 Hadoop AWS 文档

access_key [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws

access_secret [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws

hadoop_s3_properties [map]

如果需要添加其他选项,可以在这里添加并参考此 链接

hadoop_s3_properties {
     "fs.s3a.buffer.dir" = "/data/st_test/s3a"
     "fs.s3a.fast.upload.buffer" = "disk"
  }

custom_filename [boolean]

是否自定义文件名。

file_name_expression [string]

仅在 custom_filename 为 true 时使用
file_name_expression 描述了将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now} 或 ${uuid},例如 test_${uuid}_${now}
${now} 代表当前时间,其格式可以通过指定选项 filename_time_format 来定义。
请注意,如果 is_enable_transaction 为 true,我们将在文件名的开头自动添加${transactionId}_

filename_time_format [string]

仅在 custom_filename 为 true 时使用
当 file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式列于下表中:

符号
描述
y
M
d
月中的天数
H
一天中的小时 (0-23)
m
小时中的分钟
s
分钟中的秒数

file_format_type [string]

我们支持以下文件类型:

  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel

请注意,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt

field_delimiter [string]

数据行中列之间的分隔符。仅在 file_format 为 text 时需要。

row_delimiter [string]

文件中行之间的分隔符。仅在 file_format 为 text 时需要。

have_partition [boolean]

是否需要处理分区。

partition_by [array]

仅在 have_partition 为 true 时使用。
基于选定字段对分区数据进行分区。

partition_dir_expression [string]

仅在 have_partition 为 true 时使用。
如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放在分区目录中。
默认的 partition_dir_expression 是 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一个分区字段,v0 是第一个分区字段的值。

is_partition_field_write_in_file [boolean]

仅在 have_partition 为 true 时使用。
如果 is_partition_field_write_in_file 为 true,分区字段及其值将写入数据文件中。
例如,如果您想要写入 Hive 数据文件,其值应为 false

sink_columns [array]

需要写入文件的哪些列,默认值为从 "Transform" 或 "Source" 获取的所有列。
字段的顺序决定了实际写入文件的顺序。

is_enable_transaction [boolean]

如果 is_enable_transaction 为 true,我们将确保在写入目标目录时数据不会丢失或重复。
请注意,如果 is_enable_transaction 为 true,我们将在文件头部自动添加 ${transactionId}_
目前仅支持 true

batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size 和 checkpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,当文件中的行数大于 batch_size 时,写入器将写入文件。如果 checkpoint.interval较小,则在新的检查点触发时,写入器将创建一个新文件。

compress_codec [string]

文件的压缩编解码器及其支持的详细信息如下:

  • txt: lzo none
  • JSON: lzo none
  • CSV: lzo none
  • ORC: lzo snappy lz4 zlib none
  • Parquet: lzo snappy lz4 gzip brotli zstd none

提示:Excel 类型不支持任何压缩格式。

常见选项

请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。

max_rows_in_memory [int]

当文件格式为 Excel 时,可以缓存在内存中的数据项的最大数量。

sheet_name [string]

工作簿的工作表名称。

044




示例

对于文本文件格式,具有 have_partitioncustom_filenamesink_columns 和 com.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:

 S3File {
   bucket = "s3a://seatunnel-test"
   tmp_path = "/tmp/seatunnel"
   path="/seatunnel/text"
   fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
   fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
   file_format_type = "text"
   field_delimiter = "t"
   row_delimiter = "n"
   have_partition = true
   partition_by = ["age"]
   partition_dir_expression = "${k0}=${v0}"
   is_partition_field_write_in_file = true
   custom_filename = true
   file_name_expression = "${transactionId}_${now}"
   filename_time_format = "yyyy.MM.dd"
   sink_columns = ["name","age"]
   is_enable_transaction=true
   hadoop_s3_properties {
     "fs.s3a.buffer.dir" = "/data/st_test/s3a"
     "fs.s3a.fast.upload.buffer" = "disk"
   }
 }

对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider进行配置:

 S3File {
   bucket = "s3a://seatunnel-test"
   tmp_path = "/tmp/seatunnel"
   path="/seatunnel/parquet"
   fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
   fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
   access_key = "xxxxxxxxxxxxxxxxx"
   secret_key = "xxxxxxxxxxxxxxxxx"
   file_format_type = "parquet"
   hadoop_s3_properties {
     "fs.s3a.buffer.dir" = "/data/st_test/s3a"
     "fs.s3a.fast.upload.buffer" = "disk"
   }
 }

对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

 S3File {
   bucket = "s3a://seatunnel-test"
   tmp_path = "/tmp/seatunnel"
   path="/seatunnel/orc"
   fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
   fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
   access_key = "xxxxxxxxxxxxxxxxx"
   secret_key = "xxxxxxxxxxxxxxxxx"
   file_format_type = "orc"
 }


05




更新日志


2.3.0-beta 2022-10-20

  • 添加 S3File Sink 连接器

2.3.0 2022-12-30

  • Bug修复
    • 当上游字段为空时会抛出 NullPointerException
    • Sink 列映射失败
    • 从状态中恢复写入器时直接获取事务失败 (3258)
    • 修复了以下导致数据写入文件失败的错误:
  • 功能
    • 允许用户添加额外的 Hadoop-S3 参数
    • 允许使用 S3A 协议
    • 解耦 Hadoop-AWS 依赖
    • 支持 S3A 协议 (3632)
    • 支持设置每个文件的批处理大小 (3625)
    • 设置 S3 AK 为可选项 (3688)

06




下一版本

  • [优化]支持文件压缩(3699)