【源码编译】Apache SeaTunnel-Web 适配最新 2.3.4 版本教程

1seatunnel-web数据库相关配置application.yml用来web服务中的数据持久化2SEATUNNELHOME环境变量seatunnel-web调用seaunnel的插件获取的API扫描connector相关的连接器3STWEBHOME环境变量seatunnel-web会加载seatunnel-webdatasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4重要的配置文件connector-datasource-mapper.yaml该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息比如是否支持多表同步是否支持cdc等hazelcast-client.yamlseatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流

Apache SeaTunnel 新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

171023198478680c47a0e7383802b557ceb6cb70a74a0

本文主要给大家介绍为使用 2.3.4 版本的新特性,需要对 Apache SeaTunnel-Web 依赖的版本进行升级,而 SeaTunnel2.3.4 版本部分 API 跟之前版本不兼容,所以需要对 SeaTunnel-Web 的源码进行修改适配。

源码修改编译

克隆 SeaYunnel-Web 源码到本地

git  clone https://github.com/apache/seatunnel-web.git

在 idea 中打开项目

升级 Pom 中的 SeaTunnel 版本到 2.3.4 并重新导入依赖

<seatunnel-framework.version>2.3.3</seatunnel-framework.version>改为<seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用 SeaTunnel Web 都是基于 SeaTunnel-2.3.3 版本做的适配,而最新发布的 SeaTunnel2.3.4 部分 API 发生了改动导致直接升级的过程中会出现 API 不兼容的问题,所以本篇文章重点来了:我们需要对调用 SeaTunnel API 的 SeaTunnel Web 源码部分进行修改,修改完之后,我们就能完全适配 2.3.4 最新版本。

社区推出了 2.3.X 及 Web 系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.DolphinScheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertor        implements DataTypeConvertor<SeaTunnelDataType<?>> {    @Override    public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {        return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();    }    @Override    public SeaTunnelDataType<?> toSeaTunnelType(            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)            throws DataTypeConvertException {        return seaTunnelDataType;    }    @Override    public SeaTunnelDataType<?> toConnectorType(            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)            throws DataTypeConvertException {        return seaTunnelDataType;    }    @Override    public String getIdentity() {        return "EngineDataTypeConvertor";    }}// 改为public static class SeaTunnelDataTypeConvertor            implements DataTypeConvertor<SeaTunnelDataType<?>> {        @Override        public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {            return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();        }        @Override        public SeaTunnelDataType<?> toSeaTunnelType(                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {            return seaTunnelDataType;        }        @Override        public SeaTunnelDataType<?> toConnectorType(                String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {            return seaTunnelDataType;        }        @Override        public String getIdentity() {            return "EngineDataTypeConvertor";        }    }

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {    Common.setStarter(true);    Set<PluginIdentifier> pluginIdentifiers =            SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();    ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();    pluginIdentifiersList.addAll(pluginIdentifiers);    List<URL> pluginJarPaths =            new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);    //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();    if (!pluginJarPaths.isEmpty()) {        //            List<URL> files = FileUtils.searchJarFiles(path);        pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));        factory =                new DataTypeConvertorFactory(                        new URLClassLoader(pluginJarPaths.toArray(new URL[0])));    } else {        factory = new DataTypeConvertorFactory();    }}// 改为    public TableSchemaServiceImpl() throws IOException {        Common.setStarter(true);        Set<PluginIdentifier> pluginIdentifiers =                SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();        ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();        pluginIdentifiersList.addAll(pluginIdentifiers);        List<URL> pluginJarPaths =                new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);        //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();        if (!pluginJarPaths.isEmpty()) {            //            List<URL> files = FileUtils.searchJarFiles(path);            pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));            factory =                    new DataTypeConvertorFactory(                            new URLClassLoader(pluginJarPaths.toArray(new URL[0])));        } else {            factory = new DataTypeConvertorFactory();        }    }SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());// 改为SeaTunnelDataType<?> dataType =                    convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {        Common.setDeployMode(DeployMode.CLIENT);        JobConfig jobConfig = new JobConfig();        jobConfig.setName(jobInstanceId + "_job");        try {            SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();            SeaTunnelClient seaTunnelClient = createSeaTunnelClient();            ClientJobExecutionEnvironment jobExecutionEnv =                    seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);                final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();            JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);            jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));            jobInstanceDao.update(jobInstance);            CompletableFuture.runAsync(                    () -> {                        waitJobFinish(                                clientJobProxy,                                userId,                                jobInstanceId,                                Long.toString(clientJobProxy.getJobId()),                                seaTunnelClient);                    });        } catch (ExecutionException | InterruptedException e) {            ExceptionUtils.getMessage(e);            throw new RuntimeException(e);        }        return jobInstanceId;    }

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {            jobStatus = JobStatus.CANCELLING.name();// 改为else if (statusList.contains("CANCELING")) {            jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =        new TableFactoryContext(                Collections.singletonList(table),                ReadonlyConfig.fromMap(config),                Thread.currentThread().getContextClassLoader());// 改为TableTransformFactoryContext context =                new TableTransformFactoryContext(                        Collections.singletonList(table),                        ReadonlyConfig.fromMap(config),                        Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(            @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {        SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);        JobConfig jobConfig = new JobConfig();        jobConfig.setName(jobInstanceId + "_job");        try {            seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();        } catch (ExecutionException e) {            throw new RuntimeException(e);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }}// 改为public void restoreJob(        @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {        SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);        JobConfig jobConfig = new JobConfig();        jobConfig.setName(jobInstanceId + "_job");        SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();        try {            seaTunnelClient                .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)                .execute();        } catch (ExecutionException e) {            throw new RuntimeException(e);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }    }

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(        PluginType pluginType) throws IOException {    Common.setStarter(true);    if (!pluginType.equals(PluginType.SOURCE)) {        throw new UnsupportedOperationException("ONLY support plugin type source");    }    Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();    List<Factory> factories;    if (path.toFile().exists()) {        List<URL> files = FileUtils.searchJarFiles(path);        factories =                FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));    } else {        factories =                FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());    }    Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();    factories.forEach(            plugin -> {                if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {                    TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;                    PluginIdentifier info =                            PluginIdentifier.of(                                    "seatunnel",                                    PluginType.SOURCE.getType(),                                    plugin.factoryIdentifier());                    featureMap.put(                            info,                            new ConnectorFeature(                                    SupportColumnProjection.class.isAssignableFrom(                                            tableSourceFactory.getSourceClass())));                }            });    return featureMap;}// 改为    public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(            PluginType pluginType) {        Common.setStarter(true);        if (!pluginType.equals(PluginType.SOURCE)) {            throw new UnsupportedOperationException("ONLY support plugin type source");        }        ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();        pluginIdentifiers.addAll(                SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());        List<URL> pluginJarPaths =                new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);        List<Factory> factories;        if (!pluginJarPaths.isEmpty()) {            factories =                    FactoryUtil.discoverFactories(                            new URLClassLoader(pluginJarPaths.toArray(new URL[0])));        } else {            factories =                    FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());        }        Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();        factories.forEach(                plugin -> {                    if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {                        TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;                        PluginIdentifier info =                                PluginIdentifier.of(                                        "seatunnel",                                        PluginType.SOURCE.getType(),                                        plugin.factoryIdentifier());                        featureMap.put(                                info,                                new ConnectorFeature(                                        SupportColumnProjection.class.isAssignableFrom(                                                tableSourceFactory.getSourceClass())));                    }                });        return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4 版本完成,对应的安装包会在 seatunnel-web-dist/target 目录下生成

Linux 部署测试

这里具体请参考之前社区其他老师发布的文章 Apache SeaTunnel Web 部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 用来web服务中的数据持久化2、SEATUNNEL_HOME(环境变量)seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME(环境变量)seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件:connector-datasource-mapper.yaml 该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)hazelcast-client.yaml seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!