SeaTunnel 增强对 Excel 读取能力,支持xlsx、xls、公式单元格

org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy@SneakyThrowsprivateObjectconvert(ObjectfieldSeaTunnelDataTypeltgtfieldType){if(field==null){returnquotquot}SqlTypesqlType=fieldType.getSqlType()switch(sqlType){caseMAPcaseARRAYreturnobjectMapper.readValue((String)fieldfieldType.getTypeClass())caseSTRING这里改使用toString方法,避免后续类型转换出错returnfield.toString()caseDOUBLE使用工具方法returnStrToNumberUtil.str2Double(field.toString())caseBOOLEANreturnBoolean.parseBoolean(field.toString()

概述

在数据集成场景中,Excel 文件作为常见的数据来源,其格式多样化和功能复杂性常常给开发者带来一定挑战。

v2-344c639ea0752a902a8de2dbf5612f68_1440w

本次修改基于 SeaTunnel-2.3.4 版本,包括:

  • 自动识别 .xlsx 和 .xls 文件类型,不再依赖文件后缀名判断;

  • 新增对公式单元格的值解析支持;

  • 优化数据类型转换的容错性。

修改完之后,增强对 Excel 的读取能力,自动识别 xlsx、xls,支持读取公式单元格的值,进一步提升了 SeaTunnel 在 Excel 数据处理场景中的易用性和稳定性。

相关模块

本次对 Excel 的支持都在 connector-file-base 模块

增强判断 xls、xlsx 文件能力

SeaTunnel 对 Excel 的读取,都在 ExcelReadStrategy 类中,原版对 Excel 的读取是根据文件名后缀来判断当前文件是 xlsx 还是 xls,一旦遇到文件名随机生成或没有后缀时,会导致解析失败,原版如下:

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy    @SneakyThrows    @Override    public void read(String path, String tableId, Collector<SeaTunnelRow> output) {        Map<String, String> partitionsMap = parsePartitionsByPath(path);        FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);        Workbook workbook;        // 根据文件名后缀判断文件类型        if (path.endsWith(".xls")) {            workbook = new HSSFWorkbook(file);        } else if (path.endsWith(".xlsx")) {            workbook = new XSSFWorkbook(file);        } else {            throw new FileConnectorException(                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,                    "Only support read excel file");        }        Sheet sheet =                pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())                        ? workbook.getSheet(                                pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))                        : workbook.getSheetAt(0);        cellCount = seaTunnelRowType.getTotalFields();        cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();        ......        ......        ......    }

通过引入 Apache POI 的 WorkbookFactory.create 方法来创建 workbook,让 POI 自己判断当前是什么文件类型即可,而无需依赖文件名后缀。这种方式对文件名格式不敏感,增强了兼容性。

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy    @SneakyThrows    @Override    public void read(String path, String tableId, Collector<SeaTunnelRow> output) {        Map<String, String> partitionsMap = parsePartitionsByPath(path);        FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);        Workbook workbook;        // 让poi自己判断文件类型,创建对应的 workbook        workbook = WorkbookFactory.create(file);        Sheet sheet =                pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())                        ? workbook.getSheet(                                pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))                        : workbook.getSheetAt(0);        cellCount = seaTunnelRowType.getTotalFields();        cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();        ......        ......        ......    }

支持读取公式单元格值

读取单元格值的方法在 ExcelReadStrategy 的 getCellValue 方法内,原版如下:

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy    private Object getCellValue(CellType cellType, Cell cell) {        switch (cellType) {            case STRING:                return cell.getStringCellValue();            case BOOLEAN:                return cell.getBooleanCellValue();            case NUMERIC:                if (DateUtil.isCellDateFormatted(cell)) {                    DataFormatter formatter = new DataFormatter();                    return formatter.formatCellValue(cell);                }                return cell.getNumericCellValue();            case ERROR:                break;            default:                throw new FileConnectorException(                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,                        String.format("[%s] type not support ", cellType));        }        return null;    }

可以看到,它只对 String、Boolean、Numeric、Error 做了处理,一旦遇到公式类型,直接报错

所以我们可以增加一个判断,遇到 FORMULA 类型,就使用 poi 提供的 FormulaEvaluator 来读取公式单元格的结果值的类型

修改 getCellValue 方法,让它能从调用方处接收一个 FormulaEvaluator 的参数,避免内部多次 new

针对 FORMULA 类型,根据结果值的类型,调用 cell 不同的方法获取值。

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy    private Object getCellValue(CellType cellType, Cell cell, FormulaEvaluator evaluator) {        switch (cellType) {            case STRING:                return cell.getStringCellValue();            case BOOLEAN:                return cell.getBooleanCellValue();            case NUMERIC:                if (DateUtil.isCellDateFormatted(cell)) {                    DataFormatter formatter = new DataFormatter();                    return formatter.formatCellValue(cell);                }                return cell.getNumericCellValue();            case FORMULA:                CellType formulaResultType = evaluator.evaluateFormulaCell(cell);                switch (formulaResultType) {                    case STRING:                        return cell.getStringCellValue();                    case BOOLEAN:                        return cell.getBooleanCellValue();                    case NUMERIC:                        if (DateUtil.isCellDateFormatted(cell)) {                            DataFormatter formatter = new DataFormatter();                            return formatter.formatCellValue(cell);                        }                        return cell.getNumericCellValue();                    case BLANK:                    case ERROR:                        break;                    default:                        throw new FileConnectorException(                                CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,                                String.format("[%s] formula result type not support ", formulaResultType));                }                break;            case BLANK:            case ERROR:                break;            default:                throw new FileConnectorException(                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,                        String.format("[%s] type not support ", cellType));        }        return null;    }

既然 getCellValue 方法需要接收 FormulaEvaluator 类型的参数,那么在调用方 read 方法中就得 new 一个 FormulaEvaluator.

接下来就再回到 read 方法,new 一个 FormulaEvaluator,在调用 getCellValue 的地方传进去.

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy    @SneakyThrows    @Override    public void read(String path, String tableId, Collector<SeaTunnelRow> output) {        Map<String, String> partitionsMap = parsePartitionsByPath(path);        FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);        Workbook workbook;        // 让poi自己判断文件类型,创建对应的 workbook        workbook = WorkbookFactory.create(file);        // 创建FormulaEvaluator,调用的地方在最下面        FormulaEvaluator evaluator = workbook.getCreationHelper().createFormulaEvaluator();        Sheet sheet =                pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())                        ? workbook.getSheet(                                pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))                        : workbook.getSheetAt(0);        cellCount = seaTunnelRowType.getTotalFields();        cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();        ......        ......        ......        IntStream.range((int) skipHeaderNumber, rowCount)                .mapToObj(sheet::getRow)                .filter(Objects::nonNull)                .forEach(                        rowData -> {                            int[] cellIndexes =                                    indexes == null                                            ? IntStream.range(0, cellCount).toArray()                                            : indexes;                            int z = 0;                            SeaTunnelRow seaTunnelRow = new SeaTunnelRow(cellCount);                            for (int j : cellIndexes) {                                Cell cell = rowData.getCell(j);                                seaTunnelRow.setField(                                        z++,                                        cell == null                                                ? null                                                : convert(                                                        // 在这里将 FormulaEvaluator 传进去                                                        getCellValue(cell.getCellType(), cell, evaluator),                                                        fieldTypes[z - 1]));                            }                            if (isMergePartition) {                                int index = seaTunnelRowType.getTotalFields();                                for (String value : partitionsMap.values()) {                                    seaTunnelRow.setField(index++, value);                                }                            }                            seaTunnelRow.setTableId(tableId);                            output.collect(seaTunnelRow);                        });    }

到此,Apache SeaTunnel 就很好的支持了 xlsx、xls 的读取,然而,继续再看看 ExcelReadStrategy 类的 convert 方法,会发现它对于各个类型的解析是直接用 xxx.parseXXX来做转换的,一旦单元格的内容不符合转换的格式,会抛出异常,导致整个任务失败,比如浮点型字符串前后有空白符,转换就会失败,再比如浮点型转成 Integer,也会失败,容错率不高。

所以咱们再增强一下它对于类型转换的容错率。

增强类型转换容错率

根据上面所说的,咱们看看 convert 方法,原版对于 DOUBLE、BIGINT 等类型是直接用类的 parse 方法转换的,容错率不太友好。

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy    @SneakyThrows    private Object convert(Object field, SeaTunnelDataType<?> fieldType) {        if (field == null) {            return "";        }        SqlType sqlType = fieldType.getSqlType();        switch (sqlType) {            case MAP:            case ARRAY:                return objectMapper.readValue((String) field, fieldType.getTypeClass());            case STRING:                // 这里也改成调一次 toString()                 // 之前偶发出现过一次,虽然判断是STRING类型,但field实际却是double类型的情况,具体原因没找到                // 调用一次toString,确保返回的肯定是string                return field.toString();            case DOUBLE:                return Double.parseDouble(field.toString());            case BOOLEAN:                return Boolean.parseBoolean(field.toString());            case FLOAT:                return (float) Double.parseDouble(field.toString());            case BIGINT:                return (long) Double.parseDouble(field.toString());            case INT:                return (int) Double.parseDouble(field.toString());            case TINYINT:                return (byte) Double.parseDouble(field.toString());            case SMALLINT:                return (short) Double.parseDouble(field.toString());            case DECIMAL:                return BigDecimal.valueOf(Double.parseDouble(field.toString()));            ...            ...            ...        }    }

我们对它进行一点小改造,让它类型转换的时候兼容性高一点

先在 org.apache.seatunnel.connectors.seatunnel.file 包下新增一个包 utils,在里面新增一个工具类 StrToNumberUtil,提供字符串转成各种类型的工具方法。

package org.apache.seatunnel.connectors.seatunnel.file.utils;import org.apache.commons.lang3.StringUtils;import java.math.BigDecimal;import java.util.Optional;public class StrToNumberUtil {    // 字符串转double    public static Double str2Double(String str) {        if (StringUtils.isBlank(str)) {            return null;        }        try {            return Double.parseDouble(str.trim());        } catch (Exception e) {            return null;        }    }    // 字符串转long    public static Long str2Long(String str) {        if (StringUtils.isBlank(str)) {            return null;        }        str = str.trim();        // 多个小数点,不是数字,pass        if (str.indexOf('.') != str.lastIndexOf('.')) {            return null;        }        // 取整数位        String sub = str.indexOf('.') >= 0 ? str.substring(0, str.indexOf('.')) : str;        try {            return Long.parseLong(sub);        } catch (Exception e) {            return null;        }    }    // 字符串转byte    public static Byte str2Byte(String s) {        return Optional.ofNullable(str2Long(s)).map(Long::byteValue).orElse(null);    }    // 字符串转short    public static Short str2Short(String s) {        return Optional.ofNullable(str2Long(s)).map(Long::shortValue).orElse(null);    }    // 字符串转integer    public static Integer str2Int(String s) {        return Optional.ofNullable(str2Long(s)).map(Long::intValue).orElse(null);    }    // 字符串转float    public static Float str2Float(String s) {        return Optional.ofNullable(str2Double(s)).map(Double::floatValue).orElse(null);    }    // 字符串转BigDecimal    public static BigDecimal str2BigDecimal(String s) {        if (StringUtils.isBlank(s)) {            return null;        }        try {            return new BigDecimal(s.trim());        } catch (Exception e) {            return null;        }    }}

然后在 ExcelReadStrategy 类的 convert 方法中使用这些方法替代原版的类型转化逻辑。

// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy    @SneakyThrows    private Object convert(Object field, SeaTunnelDataType<?> fieldType) {        if (field == null) {            return "";        }        SqlType sqlType = fieldType.getSqlType();        switch (sqlType) {            case MAP:            case ARRAY:                return objectMapper.readValue((String) field, fieldType.getTypeClass());            case STRING:                // 这里改使用 toString 方法,避免后续类型转换出错                return field.toString();            case DOUBLE:                // 使用工具方法                return StrToNumberUtil.str2Double(field.toString());            case BOOLEAN:                return Boolean.parseBoolean(field.toString());            case FLOAT:                // 使用工具方法                return StrToNumberUtil.str2Float(field.toString());            case BIGINT:                // 使用工具方法                return StrToNumberUtil.str2Long(field.toString());            case INT:                // 使用工具方法                return StrToNumberUtil.str2Int(field.toString());            case TINYINT:                // 使用工具方法                return StrToNumberUtil.str2Byte(field.toString());            case SMALLINT:                // 使用工具方法                return StrToNumberUtil.str2Short(field.toString());            case DECIMAL:                // 使用工具方法                return StrToNumberUtil.str2BigDecimal(field.toString());            ...            ...            ...        }    }

到此,SeaTunnel 对 Excel 读取的支持又增强了一些!

最后打包

全局生效

如果想让其他基于 connector-file-base 的插件都生效,那就打包 connector-file-base 模块.

mvn clean package -DskipTests=true -pl seatunnel-connectors-v2/connector-file/connector-file-base -am

然后将 connector-file-base 的 jar 放进 SeaTunnel 部署目录的 lib 目录下,所有基于 connector-file-base 的插件都会生效.

部分插件生效

想让某些基于 connector-file-base 的插件生效,那就只重新打包那一个插件即可~

mvn clean package -DskipTests=true -pl [你想要生效的插件路径] -am

用新 Jar 替换旧 Jar 即可

其他

打包的时候可能会因为什么原因导致 maven 的 spotless 插件报错,试试先跑一下 mvn spotless:apply,再去跑打包!

此次改进充分体现了 SeaTunnel 在数据集成场景中的灵活性和持续优化能力。通过更强的文件识别逻辑、更智能的单元格处理方式以及更高的容错率,SeaTunnel 为开发者提供了更可靠的 Excel 数据读取支持。

未来,SeaTunnel 将继续致力于优化数据集成体验,推动多源异构数据的高效整合。如果你有任何建议或问题,欢迎加入社区与我们交流,共同完善这个优秀的开源项目

本文由 白鲸开源科技 提供发布支持!