一招解决SeaTunnel Excel中无法将数字类型转换成字符串类型的问题 | 附源码打包

针对SeaTunnel处理Excel数字类型强转为字符串时导致的类型异常,本文将详解如何通过修改源码,实现数字类型到数据库字符串字段的兼容推送,并通过Maven打包部署修复后的代码。

针对SeaTunnel处理Excel数字类型强转为字符串时导致的类型异常,本文将详解如何通过修改源码,实现数字类型到数据库字符串字段的兼容推送,并通过Maven打包部署修复后的代码。

需求

需要实现将Excel中的数字类型的单元格像数据库中字符串类型的字段中推送。

问题原因

SeaTunnel在读取字段类型的时候都是使用强转的形式去获取数据的。假如数据类型不一样的话,直接强转就会报错。

修改位置

org/apache/seatunnel/api/table/type/SeaTunnelRow.java

1

org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java

2

修改的代码

    @Override    public PreparedStatement toExternal(            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)            throws SQLException {        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();        for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {            SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);            int statementIndex = fieldIndex + 1;            Object fieldValue = row.getField(fieldIndex);            if (fieldValue == null) {                statement.setObject(statementIndex, null);                continue;            }            switch (seaTunnelDataType.getSqlType()) {                case STRING:                    //TODO wxt                //region                    try{                        //直接类型强转会出问题  比如double类型就不能转成String                        // 可以使用下面的toString解决这种类型问题                        statement.setString(statementIndex, (String) row.getField(fieldIndex));                    }catch (Exception e){                        statement.setString(statementIndex,  row.getField(fieldIndex).toString());                    }                //endregion                    break;                case BOOLEAN:                    statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex));                    break;                case TINYINT:                    statement.setByte(statementIndex, (Byte) row.getField(fieldIndex));                    break;                case SMALLINT:                    statement.setShort(statementIndex, (Short) row.getField(fieldIndex));                    break;                case INT:                    statement.setInt(statementIndex, (Integer) row.getField(fieldIndex));                    break;                case BIGINT:                    statement.setLong(statementIndex, (Long) row.getField(fieldIndex));                    break;                case FLOAT:                    statement.setFloat(statementIndex, (Float) row.getField(fieldIndex));                    break;                case DOUBLE:                    statement.setDouble(statementIndex, (Double) row.getField(fieldIndex));                    break;                case DECIMAL:                    statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex));                    break;                case DATE:                    LocalDate localDate = (LocalDate) row.getField(fieldIndex);                    statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));                    break;                case TIME:                    writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex));                    break;                case TIMESTAMP:                    LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);                    statement.setTimestamp(                            statementIndex, java.sql.Timestamp.valueOf(localDateTime));                    break;                case BYTES:                    statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));                    break;                case NULL:                    statement.setNull(statementIndex, java.sql.Types.NULL);                    break;                case ARRAY:                    Object[] array = (Object[]) row.getField(fieldIndex);                    if (array == null) {                        statement.setNull(statementIndex, java.sql.Types.ARRAY);                        break;                    }                    statement.setObject(statementIndex, array);                    break;                case MAP:                case ROW:                default:                    throw new JdbcConnectorException(                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,                            "Unexpected value: " + seaTunnelDataType);            }        }        return statement;    }
    private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {        if (v == null) {            return 0;        }        SqlType sqlType = dataType.getSqlType();        switch (sqlType) {            case STRING:                //region                //TODO 避免强转出现问题                try{                    return ((String) v).length();                }catch (Exception e){                    return ( v.toString()).length();                }              //endregion            case BOOLEAN:            case TINYINT:                return 1;            case SMALLINT:                return 2;            case INT:            case FLOAT:                return 4;            case BIGINT:            case DOUBLE:                return 8;            case DECIMAL:                return 36;            case NULL:                return 0;            case BYTES:                return ((byte[]) v).length;            case DATE:                return 24;            case TIME:                return 12;            case TIMESTAMP:                return 48;            case ARRAY:                return getBytesForArray(v, ((ArrayType) dataType).getElementType());            case MAP:                int size = 0;                MapType<?, ?> mapType = ((MapType<?, ?>) dataType);                for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {                    size +=                            getBytesForValue(entry.getKey(), mapType.getKeyType())                                    + getBytesForValue(entry.getValue(), mapType.getValueType());                }                return size;            case ROW:                int rowSize = 0;                SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType);                SeaTunnelDataType<?>[] types = rowType.getFieldTypes();                SeaTunnelRow row = (SeaTunnelRow) v;                for (int i = 0; i < types.length; i++) {                    rowSize += getBytesForValue(row.fields[i], types[i]);                }                return rowSize;            default:                throw new UnsupportedOperationException("Unsupported type: " + sqlType);        }    }

如何源码打包

用maven插件打包即可,打完包后在dist下面,如截图所示:

3

原文链接:https://blog.csdn.net/qq_47431361/article/details/143844556