DolphinScheduler 3.1.3 跨越升级 3.4.1:基于 API 的自动化迁移方案

为什么要跨越大版本?
177926303212377e66b23c7f6f455


点亮⭐️

https://github.com/apache/DolphinScheduler



点击蓝字 关注我们



作者 | 肖清海

一、背景:

为什么要跨越大版本?


旧环境情况

当前 DolphinScheduler 版本: 3.1.3
当前 SeaTunnel 版本: 2.1.3

部署规模: 1台Master + 2台Worker,工作流定义3700+,日均调度任务量20000+

使用年限: 已经稳定运行3年

升级驱动因素

  1. 功能需求:因业务需求增大,当前版本的架构设计缺陷、元数据库处理限制以及服务器资源不足;

  2. 社区支持:官方推荐使用最新稳定版本,获得更好的技术支持

  3. 性能优化:3.4.1版本在调度性能和稳定性方面有显著提升

为什么不用官方升级方案

  1. 版本跨度太大:3.1.3 → 3.2.0 → 3.3.0 → 3.4.1,需要多次中间升级

  2. 生产环境限制:无法接受多次停机窗口,业务连续性要求高

  3. 架构变更风险:资源中心重构、元数据库表结构变更等存在较高迁移风险

  4. 工作量考量:任务数量多,手动重建工作流工作量巨大,需要自动化方案

二、整体迁移思路:绕开官方

升级路径,采用「重建+API」方案


2.1 核心思想

不追求「逐步升级」,而是「换新环境+数据迁移」:

  • 旧环境(3.1.3)继续稳定运行,不影响业务
  • 新环境(3.4.1)全新部署,保证架构干净
  • 编写脚本代码通过旧版本API读取工作流定义和任务配置
  • 通过新版本的API批量创建工作流
  • 切换业务任务到新环境

2.2 方案优势与风险对比

维度
官方升级方案
本方案(重建+API)
停机时间
多次升级,累计停机可能数小时甚至是数天
切换时停旧任务启动新任务,基本是无缝衔接
回滚难度
困难(数据库已变更)
简单(旧环境完好,随时切回)
数据一致性
需验证所有表结构迁移
只迁移核心业务数据(工作流定义),历史执行记录不迁移
版本适配
需处理中间版本的所有兼容性问题
直接适配3.4.1,只改造必要参数
工作量
需多次验证
主要集中在脚本开发
适用场景
小版本升级


三、详细实施步骤


3.1 环境准备阶段

3.1.1 新环境部署

  • 搭建全新的3.4.1版本环境
  • 配置数据库、Registry等依赖,弃用zk注册中心,改用Jdbc
  • 配置必要的组件:datax、seatunnel等
  • 了解新版本的特性,如个人关注点在:
    • 集成seatunnel 2.1.3,启动方式从引擎:从spark改成start-seatunnel-spark.sh;
    • 默认参数配。如租户、工作组、环境等,在项目管理设置里面配置偏好;
    • 参数传递变化。新版必须在下游节点添加“IN”类型的参数,才可以接收上游变量值
  • 验证新环境基本功能正常,另外选取代表性任务手动创建执行验证

3.1.2 API访问配置

  • 在新环境中配置API访问权限:令牌管理创建新的令牌
  • 获取管理员token用于API调用
  • 验证API接口连通性

3.2 元数据库基础数据配置

  • 表数据复制:按照旧版本元数据表的基本配置,复刻基础数据到新元数据库表(包括id),这样在开发还原任务脚本的时候能减少很多改动。

17792630330922dcf71ea981a7dc5
3.3 数据迁移脚本开发

3.3.1 前期准备测试

  1. 先对自己的任务分类:模板化的任务,非模板化的任务;
  2. 选取代表性的任务,在新版本上创建运行,保证任务的正常运行,同步后的表的数据正常。

3.3.2 代码开发-读取原任务定义

























... //获取整个, 循环处理        String processDefinitionUrl = OLD_URL + "/dolphinscheduler/projects/" + oldProjectCode +                "/process-definition/query-process-definition-list";        Map<String, String> map = new HashMap<>();        map.put("projectCode", oldProjectCode);        String pdRes = httpClientUtilOld.doGetRequest(processDefinitionUrl, map);        ArrayList<JSONObject> dataList = parseResDataToList(pdRes);        for (JSONObject job : dataList) {            String oldWFCode = job.get("code").toString();            Map<String, String> mapPara = new HashMap<>();            String oldurl = OLD_URL + "/dolphinscheduler/projects/" + oldProjectCode                    + "/process-definition/" + oldWFCode;            mapPara.put("code", oldWFCode);            mapPara.put("projectCode", oldProjectCode);            String res = httpClientUtilOld.doGetRequest(oldurl, mapPara);            JSONObject jsonObject = JSON.parseObject(res);            JSONObject data = (JSONObject) jsonObject.get("data");            JSONObject processDefinition = data.getJSONObject("processDefinition");            JSONArray processTaskRelationList = data.getJSONArray("processTaskRelationList");            JSONArray taskDefinitionList = data.getJSONArray("taskDefinitionList");            //todo 获取 n个编码,替换旧任务编码            //填充 信息创建任务            createWF(processDefinition, processTaskRelationList, taskDefinitionList, NEW_IP, newProjectCode);

3.3.3 API创建新工作流

































































//看原工作流的任务有几个 获取任务编码        int taskCnt = taskDefinitionList.size();        List<String> taskCodeList = taskDefinitionList.stream()                .map(obj -> (JSONObject) obj)                .map(obj -> obj.getString("code"))                .collect(Collectors.toList());        try {//        todo 获取 任务编码            String taskCodeUrl = NEW_URL + "/dolphinscheduler/projects/" + newProjectCode + "/task-definition/gen-task-codes";            HashMap<String, String> taskCodeMap = new HashMap<>();            //需要创建n个任务            taskCodeMap.put("genNum", String.valueOf(taskCnt));            String codeData = httpClientUtilNew.doGetRequest(taskCodeUrl, taskCodeMap);            Object codes = JSON.parseObject(codeData).get("data");            JSONArray taskCodeArr = JSON.parseArray(codes.toString());//根据实际任务 添加任务下游接收参数            for (int i = 0; i < taskDefinitionList.size(); i++) {                JSONObject logTask = (JSONObject) taskDefinitionList.get(i);                if (“判断逻辑”)) {                    JSONObject taskParams = logTask.getJSONObject("taskParams");                    JSONArray localParams = taskParams.getJSONArray("localParams");                    // 构造 hiveAmount                    JSONObject hiveParam = new JSONObject();                    hiveParam.put("prop", "hiveAmount");                    hiveParam.put("direct", "IN");                    hiveParam.put("type", "VARCHAR");                    hiveParam.put("value", "");                    localParams.add(hiveParam);                    logTask.put("taskParamList", localParams);                    JSONObject paramMap = new JSONObject();                    for (Object obj : localParams) {                        JSONObject param = (JSONObject) obj;                        paramMap.put(param.getString("prop"), param.getString("value"));                    }                    logTask.put("taskParamMap", paramMap);                    ....// 替换必要参数 code 和seatunnel执行引擎参数 for (int i = 0; i < taskCodeList.size(); i++) {                String oldCode = taskCodeList.get(i);                String newCode = taskCodeArr.getString(i);                //替换编码                // 替换sea引擎:"SPARK" "start-seatunnel-spark.sh"                taskDefinitionListJsonStr = taskDefinitionListJsonStr                        .replace("\"code\":" + oldCode + ",", "\"code\":" + newCode + ",")                        .replace("\"engine\":\"SPARK\",", "\"startupScript\":\"start-seatunnel-spark.sh\",");                taskRelationListJsonStr = taskRelationListJsonStr                        .replace("TaskCode\":" + oldCode + ",", "TaskCode\":" + newCode + ",");                locationsJsonStr = locationsJsonStr                        .replace(oldCode, newCode);...  }            Map<String, String> map = new HashMap<>();            map.put("taskDefinitionJson", taskDefinitionListJsonStr);            map.put("taskRelationJson", taskRelationListJsonStr);            map.put("locations", locationsJsonStr);            map.put("name", processDefinition.getString("name"));            map.put("tenantCode", "omm");//processDefinition.getString("tenantCode")            map.put("executionType", processDefinition.getString("executionType"));            map.put("description",                    processDefinition.getString("description") == null ? "" : processDefinition.getString("description"));            map.put("globalParams", processDefinition.getString("globalParams"));            map.put("timeout", processDefinition.getString("timeout"));            String processDefinitionUrl = NEW_URL + "/dolphinscheduler/projects/" + newProjectCode + "/workflow-definition";            String processDefinitionRes = httpClientUtilNew.doPostRequest(processDefinitionUrl, map);

3.4 迁移执行阶段

3.4.1 流程

  1. 备份旧版本调度表:t_ds_schedules_20260416_10

  2. 选择试点项目:选择一个任务量适中、业务影响小的项目作为试点

  3. 迁移任务定义:迁移200个任务定义(包括定时配置)

  4. 上线任务不上线定时:先上线任务定义,但不启用定时调度

  5. 手动提交验证:分批手动提交运行,注意和原集群的任务是否冲突(因为大部分是天/时/15分钟定时,一般不冲突)

  6. 核查处理失败任务:分析失败原因,修复问题后重新运行

  7. 上线定时配置:所有任务验证无误后,上线这些任务的定时

  8. 下线原集群定时:确认新环境稳定运行后,下线原集群对应任务的定时

  9. 分项目、分批次迁移

3.4.2 迁移实施

按照流程,先找个任务少并且有代表性的项目(199个任务),迁移后试运行一段时间,测试稳定性运行一周没有问题。后续迁移完成50个项目,总共约3700个任务,耗时10天左右

  • 表格记录
17792630336074b8ea7352568f45e

3.4.3 运行情况

目前已经运行了近1个月时间。以前调度延迟的问题:几乎是10秒到1分钟甚至更严重,现在几乎没有调度延迟;调度缺失实例的问题,目前也未发生。

目前一切运行良好,暂未发现问题,后续持续观察。

四、风险控制与应急预案


4.1 主要风险点

数据丢失风险:迁移过程中可能遗漏部分配置

兼容性问题:旧版本特有配置在新版本不支持

业务中断风险:切换期间可能出现调度延迟

4.2 应急预案

4.2.1 回滚方案

  • 立即停止新环境调度
  • 恢复旧环境调度服务
  • 分析问题原因,修复后重新尝试

4.2.2 数据备份

  • 完整备份旧环境数据库
  • 备份新环境初始配置

五、总结与经验分享


5.1 项目成果

  • 成功实现跨版本升级,零业务中断
  • 自动化迁移脚本提高效率,减少人工错误
  • 新版本性能提升大,稳定性显著改善

5.2 经验教训

  • 大版本升级需要充分评估架构变化
  • API方式适合配置迁移,但需处理参数兼容性
  • 充分的测试验证是成功的关键
  • 建立完善的监控体系保障稳定性
  • 文档记录对后续维护至关重要

5.3 后续规划

  • 目前是升级的ds版本,解决当前的调度瓶颈,因需适应项目spark集群的升级,下一步要测试实施seatunnel从2.1.3升级到 2.3.12版本,大概率也是沿用这套方案。
  • 探索自动化测试方案
  • 分享经验帮助其他团队升级




END




1779263034098a826c2f1562ac7e8



用户案例


DolphinScheduler Agent开源上线Cisco Webex天翼云Zoom网易邮箱 每日互动 惠生工程作业帮 博世智驾蔚来汽车 长城汽车集度长安汽车思科网讯食行生鲜联通医疗联想新网银行兴业证券唯品富邦消费金融 自如有赞伊利当贝大数据珍岛集团传智教育BigoYY直播 拈花云科太美医疗深圳某智能制造企业
1779263034098a826c2f1562ac7e8



迁移实战


Azkaban Ooize(当贝迁移案例)airflow (有赞迁移案例)Air2phin(迁移工具)Airflow
1779263034098a826c2f1562ac7e8



最新发版消息



Apache DolphinScheduler 3.4.1 发布,新增任务分发超时检测
1779263034098a826c2f1562ac7e8



加入社区


关注社区的方式有很多:

  • GitHub: https://github.com/apache/dolphinscheduler
  • 官网:https://dolphinscheduler.apache.org/en-us
  • 订阅开发者邮件:dev@dolphinscheduler@apache.org(向邮箱发送任意内容,收到邮件后回复同意订阅即可)
  • X.com:@DolphinSchedule
  • YouTube:https://www.youtube.com/@apachedolphinscheduler
  • Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg

同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

非代码方式包括:

完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

‍代码方式包括:

查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

如果你❤️小海豚,就来为我点亮Star吧!

https://github.com/apache/dolphinscheduler


177926303612491bb9eaa4af79005


1779263036662b6464bafe7d1de40

你的好友小海豚拍了拍你

并请你帮她点一下“分享”