需要实现将Excel中的数字类型的单元格像数据库中字符串类型的字段中推送。
SeaTunnel在读取字段类型的时候都是使用强转的形式去获取数据的。假如数据类型不一样的话,直接强转就会报错。
org/apache/seatunnel/api/table/type/SeaTunnelRow.java

org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java

@Override public PreparedStatement toExternal( TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) throws SQLException { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex); int statementIndex = fieldIndex + 1; Object fieldValue = row.getField(fieldIndex); if (fieldValue == null) { statement.setObject(statementIndex, null); continue; } switch (seaTunnelDataType.getSqlType()) { case STRING: //TODO wxt //region try{ //直接类型强转会出问题 比如double类型就不能转成String // 可以使用下面的toString解决这种类型问题 statement.setString(statementIndex, (String) row.getField(fieldIndex)); }catch (Exception e){ statement.setString(statementIndex, row.getField(fieldIndex).toString()); } //endregion break; case BOOLEAN: statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex)); break; case TINYINT: statement.setByte(statementIndex, (Byte) row.getField(fieldIndex)); break; case SMALLINT: statement.setShort(statementIndex, (Short) row.getField(fieldIndex)); break; case INT: statement.setInt(statementIndex, (Integer) row.getField(fieldIndex)); break; case BIGINT: statement.setLong(statementIndex, (Long) row.getField(fieldIndex)); break; case FLOAT: statement.setFloat(statementIndex, (Float) row.getField(fieldIndex)); break; case DOUBLE: statement.setDouble(statementIndex, (Double) row.getField(fieldIndex)); break; case DECIMAL: statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex)); break; case DATE: LocalDate localDate = (LocalDate) row.getField(fieldIndex); statement.setDate(statementIndex, java.sql.Date.valueOf(localDate)); break; case TIME: writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex)); break; case TIMESTAMP: LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex); statement.setTimestamp( statementIndex, java.sql.Timestamp.valueOf(localDateTime)); break; case BYTES: statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex)); break; case NULL: statement.setNull(statementIndex, java.sql.Types.NULL); break; case ARRAY: Object[] array = (Object[]) row.getField(fieldIndex); if (array == null) { statement.setNull(statementIndex, java.sql.Types.ARRAY); break; } statement.setObject(statementIndex, array); break; case MAP: case ROW: default: throw new JdbcConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unexpected value: " + seaTunnelDataType); } } return statement; } private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) { if (v == null) { return 0; } SqlType sqlType = dataType.getSqlType(); switch (sqlType) { case STRING: //region //TODO 避免强转出现问题 try{ return ((String) v).length(); }catch (Exception e){ return ( v.toString()).length(); } //endregion case BOOLEAN: case TINYINT: return 1; case SMALLINT: return 2; case INT: case FLOAT: return 4; case BIGINT: case DOUBLE: return 8; case DECIMAL: return 36; case NULL: return 0; case BYTES: return ((byte[]) v).length; case DATE: return 24; case TIME: return 12; case TIMESTAMP: return 48; case ARRAY: return getBytesForArray(v, ((ArrayType) dataType).getElementType()); case MAP: int size = 0; MapType<?, ?> mapType = ((MapType<?, ?>) dataType); for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) { size += getBytesForValue(entry.getKey(), mapType.getKeyType()) + getBytesForValue(entry.getValue(), mapType.getValueType()); } return size; case ROW: int rowSize = 0; SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType); SeaTunnelDataType<?>[] types = rowType.getFieldTypes(); SeaTunnelRow row = (SeaTunnelRow) v; for (int i = 0; i < types.length; i++) { rowSize += getBytesForValue(row.fields[i], types[i]); } return rowSize; default: throw new UnsupportedOperationException("Unsupported type: " + sqlType); } }用maven插件打包即可,打完包后在dist下面,如截图所示:

原文链接:https://blog.csdn.net/qq_47431361/article/details/143844556
