Apache 新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章
本文主要给大家介绍为使用 2.3.4 版本的新特性,需要对 Apache SeaTunnel-Web 依赖的版本进行升级,而 SeaTunnel2.3.4 版本部分 API 跟之前版本不兼容,所以需要对 SeaTunnel-Web 的源码进行修改适配。
git clone https://github.com/apache/seatunnel-web.git
升级 Pom 中的 SeaTunnel 版本到 2.3.4 并重新导入依赖
<seatunnel-framework.version>2.3.3</seatunnel-framework.version>改为<seatunnel-framework.version>2.3.4</seatunnel-framework.version>
因为大部分用户使用 SeaTunnel Web 都是基于 SeaTunnel-2.3.3 版本做的适配,而最新发布的 SeaTunnel2.3.4 部分 API 发生了改动导致直接升级的过程中会出现 API 不兼容的问题,所以本篇文章重点来了:我们需要对调用 SeaTunnel API 的 SeaTunnel Web 源码部分进行修改,修改完之后,我们就能完全适配 2.3.4 最新版本。
社区推出了 2.3.X 及 Web 系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。
org.apache..api.dto.seatunnel.bean.engine.EngineDataType
public static class SeaTunnelDataTypeConvertor implements DataTypeConvertor<SeaTunnelDataType<?>> { @Override public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) { return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType(); } @Override public SeaTunnelDataType<?> toSeaTunnelType( SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) throws DataTypeConvertException { return seaTunnelDataType; } @Override public SeaTunnelDataType<?> toConnectorType( SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) throws DataTypeConvertException { return seaTunnelDataType; } @Override public String getIdentity() { return "EngineDataTypeConvertor"; }}// 改为public static class SeaTunnelDataTypeConvertor implements DataTypeConvertor<SeaTunnelDataType<?>> { @Override public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) { return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType(); } @Override public SeaTunnelDataType<?> toSeaTunnelType( String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) { return seaTunnelDataType; } @Override public SeaTunnelDataType<?> toConnectorType( String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) { return seaTunnelDataType; } @Override public String getIdentity() { return "EngineDataTypeConvertor"; } }
org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl
public TableSchemaServiceImpl() throws IOException { Common.setStarter(true); Set<PluginIdentifier> pluginIdentifiers = SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet(); ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>(); pluginIdentifiersList.addAll(pluginIdentifiers); List<URL> pluginJarPaths = new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList); // Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir(); if (!pluginJarPaths.isEmpty()) { // List<URL> files = FileUtils.searchJarFiles(path); pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir())); factory = new DataTypeConvertorFactory( new URLClassLoader(pluginJarPaths.toArray(new URL[0]))); } else { factory = new DataTypeConvertorFactory(); }}// 改为 public TableSchemaServiceImpl() throws IOException { Common.setStarter(true); Set<PluginIdentifier> pluginIdentifiers = SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet(); ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>(); pluginIdentifiersList.addAll(pluginIdentifiers); List<URL> pluginJarPaths = new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList); // Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir(); if (!pluginJarPaths.isEmpty()) { // List<URL> files = FileUtils.searchJarFiles(path); pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir())); factory = new DataTypeConvertorFactory( new URLClassLoader(pluginJarPaths.toArray(new URL[0]))); } else { factory = new DataTypeConvertorFactory(); } }SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());// 改为SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getName(), field.getType());
org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()
public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) { Common.setDeployMode(DeployMode.CLIENT); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); try { SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId())); jobInstanceDao.update(jobInstance); CompletableFuture.runAsync( () -> { waitJobFinish( clientJobProxy, userId, jobInstanceId, Long.toString(clientJobProxy.getJobId()), seaTunnelClient); }); } catch (ExecutionException | InterruptedException e) { ExceptionUtils.getMessage(e); throw new RuntimeException(e); } return jobInstanceId; }
org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl
else if (statusList.contains("CANCELLING")) { jobStatus = JobStatus.CANCELLING.name();// 改为else if (statusList.contains("CANCELING")) { jobStatus = JobStatus.CANCELING.name();
org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl
TableFactoryContext context = new TableFactoryContext( Collections.singletonList(table), ReadonlyConfig.fromMap(config), Thread.currentThread().getContextClassLoader());// 改为TableTransformFactoryContext context = new TableTransformFactoryContext( Collections.singletonList(table), ReadonlyConfig.fromMap(config), Thread.currentThread().getContextClassLoader());
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy
public void restoreJob( @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); try { seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute(); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); }}// 改为public void restoreJob( @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); try { seaTunnelClient .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId) .execute(); } catch (ExecutionException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } }
org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures( PluginType pluginType) throws IOException { Common.setStarter(true); if (!pluginType.equals(PluginType.SOURCE)) { throw new UnsupportedOperationException("ONLY support plugin type source"); } Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir(); List<Factory> factories; if (path.toFile().exists()) { List<URL> files = FileUtils.searchJarFiles(path); factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0]))); } else { factories = FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader()); } Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>(); factories.forEach( plugin -> { if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) { TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin; PluginIdentifier info = PluginIdentifier.of( "seatunnel", PluginType.SOURCE.getType(), plugin.factoryIdentifier()); featureMap.put( info, new ConnectorFeature( SupportColumnProjection.class.isAssignableFrom( tableSourceFactory.getSourceClass()))); } }); return featureMap;}// 改为 public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures( PluginType pluginType) { Common.setStarter(true); if (!pluginType.equals(PluginType.SOURCE)) { throw new UnsupportedOperationException("ONLY support plugin type source"); } ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>(); pluginIdentifiers.addAll( SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet()); List<URL> pluginJarPaths = new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers); List<Factory> factories; if (!pluginJarPaths.isEmpty()) { factories = FactoryUtil.discoverFactories( new URLClassLoader(pluginJarPaths.toArray(new URL[0]))); } else { factories = FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader()); } Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>(); factories.forEach( plugin -> { if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) { TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin; PluginIdentifier info = PluginIdentifier.of( "seatunnel", PluginType.SOURCE.getType(), plugin.factoryIdentifier()); featureMap.put( info, new ConnectorFeature( SupportColumnProjection.class.isAssignableFrom( tableSourceFactory.getSourceClass()))); } }); return featureMap;
mvn spotless:apply
mvn clean package -DskipTests
至此,seatunnel web 适配 seatunnel2.3.4 版本完成,对应的安装包会在 seatunnel-web-dist/target 目录下生成
这里具体请参考之前社区其他老师发布的文章 Apache SeaTunnel Web 部署指南
1、seatunnel-web数据库相关配置(application.yml) 用来web服务中的数据持久化2、SEATUNNEL_HOME(环境变量)seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME(环境变量)seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件:connector-datasource-mapper.yaml 该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)hazelcast-client.yaml seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息
感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!