在数据集成场景中,Excel 文件作为常见的数据来源,其格式多样化和功能复杂性常常给开发者带来一定挑战。
本次修改基于 -2.3.4 版本,包括:
自动识别
.xlsx
和.xls
文件类型,不再依赖文件后缀名判断;新增对公式单元格的值解析支持;
优化数据类型转换的容错性。
修改完之后,增强对 Excel 的读取能力,自动识别 xlsx、xls,支持读取公式单元格的值,进一步提升了 SeaTunnel 在 Excel 数据处理场景中的易用性和稳定性。
本次对 Excel 的支持都在 connector-file-base
模块
SeaTunnel 对 Excel 的读取,都在 ExcelReadStrategy
类中,原版对 Excel 的读取是根据文件名后缀来判断当前文件是 xlsx 还是 xls,一旦遇到文件名随机生成或没有后缀时,会导致解析失败,原版如下:
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy @SneakyThrows @Override public void read(String path, String tableId, Collector<SeaTunnelRow> output) { Map<String, String> partitionsMap = parsePartitionsByPath(path); FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path); Workbook workbook; // 根据文件名后缀判断文件类型 if (path.endsWith(".xls")) { workbook = new HSSFWorkbook(file); } else if (path.endsWith(".xlsx")) { workbook = new XSSFWorkbook(file); } else { throw new FileConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "Only support read excel file"); } Sheet sheet = pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key()) ? workbook.getSheet( pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key())) : workbook.getSheetAt(0); cellCount = seaTunnelRowType.getTotalFields(); cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size(); ...... ...... ...... }
通过引入 Apache POI 的 WorkbookFactory.create
方法来创建 workbook
,让 POI
自己判断当前是什么文件类型即可,而无需依赖文件名后缀。这种方式对文件名格式不敏感,增强了兼容性。
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy @SneakyThrows @Override public void read(String path, String tableId, Collector<SeaTunnelRow> output) { Map<String, String> partitionsMap = parsePartitionsByPath(path); FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path); Workbook workbook; // 让poi自己判断文件类型,创建对应的 workbook workbook = WorkbookFactory.create(file); Sheet sheet = pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key()) ? workbook.getSheet( pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key())) : workbook.getSheetAt(0); cellCount = seaTunnelRowType.getTotalFields(); cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size(); ...... ...... ...... }
读取单元格值的方法在 ExcelReadStrategy
的 getCellValue
方法内,原版如下:
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy private Object getCellValue(CellType cellType, Cell cell) { switch (cellType) { case STRING: return cell.getStringCellValue(); case BOOLEAN: return cell.getBooleanCellValue(); case NUMERIC: if (DateUtil.isCellDateFormatted(cell)) { DataFormatter formatter = new DataFormatter(); return formatter.formatCellValue(cell); } return cell.getNumericCellValue(); case ERROR: break; default: throw new FileConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, String.format("[%s] type not support ", cellType)); } return null; }
可以看到,它只对 String、Boolean、Numeric、Error
做了处理,一旦遇到公式类型,直接报错
所以我们可以增加一个判断,遇到 FORMULA
类型,就使用 po
i 提供的 FormulaEvaluator
来读取公式单元格的结果值的类型
修改 getCellValue
方法,让它能从调用方处接收一个 FormulaEvaluator
的参数,避免内部多次 new
。
针对 FORMULA
类型,根据结果值的类型,调用 cell
不同的方法获取值。
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy private Object getCellValue(CellType cellType, Cell cell, FormulaEvaluator evaluator) { switch (cellType) { case STRING: return cell.getStringCellValue(); case BOOLEAN: return cell.getBooleanCellValue(); case NUMERIC: if (DateUtil.isCellDateFormatted(cell)) { DataFormatter formatter = new DataFormatter(); return formatter.formatCellValue(cell); } return cell.getNumericCellValue(); case FORMULA: CellType formulaResultType = evaluator.evaluateFormulaCell(cell); switch (formulaResultType) { case STRING: return cell.getStringCellValue(); case BOOLEAN: return cell.getBooleanCellValue(); case NUMERIC: if (DateUtil.isCellDateFormatted(cell)) { DataFormatter formatter = new DataFormatter(); return formatter.formatCellValue(cell); } return cell.getNumericCellValue(); case BLANK: case ERROR: break; default: throw new FileConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, String.format("[%s] formula result type not support ", formulaResultType)); } break; case BLANK: case ERROR: break; default: throw new FileConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, String.format("[%s] type not support ", cellType)); } return null; }
既然 getCellValue
方法需要接收 FormulaEvaluator
类型的参数,那么在调用方 read
方法中就得 new
一个 FormulaEvaluator
.
接下来就再回到 read
方法,new
一个 FormulaEvaluator
,在调用 getCellValue
的地方传进去.
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy @SneakyThrows @Override public void read(String path, String tableId, Collector<SeaTunnelRow> output) { Map<String, String> partitionsMap = parsePartitionsByPath(path); FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path); Workbook workbook; // 让poi自己判断文件类型,创建对应的 workbook workbook = WorkbookFactory.create(file); // 创建FormulaEvaluator,调用的地方在最下面 FormulaEvaluator evaluator = workbook.getCreationHelper().createFormulaEvaluator(); Sheet sheet = pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key()) ? workbook.getSheet( pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key())) : workbook.getSheetAt(0); cellCount = seaTunnelRowType.getTotalFields(); cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size(); ...... ...... ...... IntStream.range((int) skipHeaderNumber, rowCount) .mapToObj(sheet::getRow) .filter(Objects::nonNull) .forEach( rowData -> { int[] cellIndexes = indexes == null ? IntStream.range(0, cellCount).toArray() : indexes; int z = 0; SeaTunnelRow seaTunnelRow = new SeaTunnelRow(cellCount); for (int j : cellIndexes) { Cell cell = rowData.getCell(j); seaTunnelRow.setField( z++, cell == null ? null : convert( // 在这里将 FormulaEvaluator 传进去 getCellValue(cell.getCellType(), cell, evaluator), fieldTypes[z - 1])); } if (isMergePartition) { int index = seaTunnelRowType.getTotalFields(); for (String value : partitionsMap.values()) { seaTunnelRow.setField(index++, value); } } seaTunnelRow.setTableId(tableId); output.collect(seaTunnelRow); }); }
到此,Apache SeaTunnel 就很好的支持了 xlsx、xls
的读取,然而,继续再看看 ExcelReadStrategy
类的 convert
方法,会发现它对于各个类型的解析是直接用 xxx.parseXXX
来做转换的,一旦单元格的内容不符合转换的格式,会抛出异常,导致整个任务失败,比如浮点型字符串前后有空白符,转换就会失败,再比如浮点型转成 Integer
,也会失败,容错率不高。
所以咱们再增强一下它对于类型转换的容错率。
根据上面所说的,咱们看看 convert
方法,原版对于 DOUBLE、BIGINT
等类型是直接用类的 parse
方法转换的,容错率不太友好。
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy @SneakyThrows private Object convert(Object field, SeaTunnelDataType<?> fieldType) { if (field == null) { return ""; } SqlType sqlType = fieldType.getSqlType(); switch (sqlType) { case MAP: case ARRAY: return objectMapper.readValue((String) field, fieldType.getTypeClass()); case STRING: // 这里也改成调一次 toString() // 之前偶发出现过一次,虽然判断是STRING类型,但field实际却是double类型的情况,具体原因没找到 // 调用一次toString,确保返回的肯定是string return field.toString(); case DOUBLE: return Double.parseDouble(field.toString()); case BOOLEAN: return Boolean.parseBoolean(field.toString()); case FLOAT: return (float) Double.parseDouble(field.toString()); case BIGINT: return (long) Double.parseDouble(field.toString()); case INT: return (int) Double.parseDouble(field.toString()); case TINYINT: return (byte) Double.parseDouble(field.toString()); case SMALLINT: return (short) Double.parseDouble(field.toString()); case DECIMAL: return BigDecimal.valueOf(Double.parseDouble(field.toString())); ... ... ... } }
我们对它进行一点小改造,让它类型转换的时候兼容性高一点
先在 org.apache.seatunnel.connectors.seatunnel.file
包下新增一个包 utils
,在里面新增一个工具类 StrToNumberUtil
,提供字符串转成各种类型的工具方法。
package org.apache.seatunnel.connectors.seatunnel.file.utils;import org.apache.commons.lang3.StringUtils;import java.math.BigDecimal;import java.util.Optional;public class StrToNumberUtil { // 字符串转double public static Double str2Double(String str) { if (StringUtils.isBlank(str)) { return null; } try { return Double.parseDouble(str.trim()); } catch (Exception e) { return null; } } // 字符串转long public static Long str2Long(String str) { if (StringUtils.isBlank(str)) { return null; } str = str.trim(); // 多个小数点,不是数字,pass if (str.indexOf('.') != str.lastIndexOf('.')) { return null; } // 取整数位 String sub = str.indexOf('.') >= 0 ? str.substring(0, str.indexOf('.')) : str; try { return Long.parseLong(sub); } catch (Exception e) { return null; } } // 字符串转byte public static Byte str2Byte(String s) { return Optional.ofNullable(str2Long(s)).map(Long::byteValue).orElse(null); } // 字符串转short public static Short str2Short(String s) { return Optional.ofNullable(str2Long(s)).map(Long::shortValue).orElse(null); } // 字符串转integer public static Integer str2Int(String s) { return Optional.ofNullable(str2Long(s)).map(Long::intValue).orElse(null); } // 字符串转float public static Float str2Float(String s) { return Optional.ofNullable(str2Double(s)).map(Double::floatValue).orElse(null); } // 字符串转BigDecimal public static BigDecimal str2BigDecimal(String s) { if (StringUtils.isBlank(s)) { return null; } try { return new BigDecimal(s.trim()); } catch (Exception e) { return null; } }}
然后在 ExcelReadStrategy
类的 convert
方法中使用这些方法替代原版的类型转化逻辑。
// org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy @SneakyThrows private Object convert(Object field, SeaTunnelDataType<?> fieldType) { if (field == null) { return ""; } SqlType sqlType = fieldType.getSqlType(); switch (sqlType) { case MAP: case ARRAY: return objectMapper.readValue((String) field, fieldType.getTypeClass()); case STRING: // 这里改使用 toString 方法,避免后续类型转换出错 return field.toString(); case DOUBLE: // 使用工具方法 return StrToNumberUtil.str2Double(field.toString()); case BOOLEAN: return Boolean.parseBoolean(field.toString()); case FLOAT: // 使用工具方法 return StrToNumberUtil.str2Float(field.toString()); case BIGINT: // 使用工具方法 return StrToNumberUtil.str2Long(field.toString()); case INT: // 使用工具方法 return StrToNumberUtil.str2Int(field.toString()); case TINYINT: // 使用工具方法 return StrToNumberUtil.str2Byte(field.toString()); case SMALLINT: // 使用工具方法 return StrToNumberUtil.str2Short(field.toString()); case DECIMAL: // 使用工具方法 return StrToNumberUtil.str2BigDecimal(field.toString()); ... ... ... } }
到此,SeaTunnel 对 Excel 读取的支持又增强了一些!
如果想让其他基于 connector-file-base
的插件都生效,那就打包 connector-file-base
模块.
mvn clean package -DskipTests=true -pl seatunnel-connectors-v2/connector-file/connector-file-base -am
然后将 connector-file-base
的 jar
放进 SeaTunnel 部署目录的 lib
目录下,所有基于 connector-file-base
的插件都会生效.
想让某些基于 connector-file-base
的插件生效,那就只重新打包那一个插件即可~
mvn clean package -DskipTests=true -pl [你想要生效的插件路径] -am
用新 Jar 替换旧 Jar 即可
打包的时候可能会因为什么原因导致 maven
的 spotless
插件报错,试试先跑一下 mvn spotless:apply
,再去跑打包!
此次改进充分体现了 SeaTunnel 在数据集成场景中的灵活性和持续优化能力。通过更强的文件识别逻辑、更智能的单元格处理方式以及更高的容错率,SeaTunnel 为开发者提供了更可靠的 Excel 数据读取支持。
未来,SeaTunnel 将继续致力于优化数据集成体验,推动多源异构数据的高效整合。如果你有任何建议或问题,欢迎加入社区与我们交流,共同完善这个优秀的开源项目!
本文由 科技 提供发布支持!