使用 API 轻松清理工作流历史版本记录,一键减少关系日志和任务定义日志表的数据量

一键减少关系日志和任务定义日志表的数据量!
1764128087055f6f45544e1f15485



点击蓝字 关注我们



本系列文章是 DolphinScheduler 由浅入深的教程,涵盖搭建、二开迭代、核心原理解读、运维和管理等一系列内容。适用于想对 DolphinScheduler了解或想要加深理解的读者。

推荐阅读:

祝开卷有益 :)


转载自大数据学习指南
作者 | 陶世磊

今天是清理调度数据的第二篇文章,之前分享过如何使用API清理工作流实例和任务实例,可以看这篇文章:海豚调度清理:使用 API 轻松清理历史工作流实例以及日志文件

我们知道 DolphinScheduler 的工作流是有版本控制的,每一次更新任务、添加任务、修改任务等等操作,都会生成一个新的版本号,同时 process_definition_log 和 process_task_relation_log 的数据也会增加,久而久之,会积累大量的"无用数据",MySQL 的记录越来越多,会影响调度的服务,进而影响用户使用体验和 MySQL 服务。


来看一个例子,往下看。

如下图所示,该工作流随着迭代,已经积累了 600 多个版本,我们用了这么长时间的调度,没有发生过需要切换历史版本的情况,历史的版本数据基本都算做“无用”数据了,同时为了保持稳定性,和数仓同学协商,只保留最近 20 个版本。


1764128087745e5c0c98fc9313e82

所以,需要清理以上历史版本记录,保证页面影响速度和 MySQL 服务。

清理调度任务历史版本记录,依然是使用API的方式,直接操作数据库风险比较高。

本文的内容也比较简单,先是说明 API 的逻辑,最后再介绍如何使用一个 Python 脚本来调用 API 删除历史版本记录。

1.API 逻辑介绍

DolphinScheduler 本身提供了删除版本记录的接口,请求类型:DELETE,接口地址:process-definition/{dag_code}/versions/{version} ,接口逻辑比较简单,这里就不赘述了。

2.使用 Python 脚本调用API

Python脚本的逻辑比较简单,使用了4个API,按照顺序是:

1.获取项目列表
2.获取工作流列表
3.获取当前工作流版本信息列表
4.删除历史版本

第三步,需要注意的是,获取版本信息列表的时候,指定了分页大小是 20 ,从第二页开始。因为我们要保留最近的 20 个版本记录。

入参:无

Python 环境 2.7

具体的代码如下:

#!/usr/bin/python
# -*- coding: utf8 -*-
## 清理调度任务历史版本记录,依然是使用API的方式,直接操作数据库风险比较高。
## 会减少 process_definition_log 和 process_task_relation_log 的数据。

import io
import subprocess
import requests
import json
import time
import datetime

# 配置信息: ip 端口 token自行修改
base_url = 'http://xxxx:xxxx'
token = 'xxxxx'
# 获取项目列表
def get_project_list():
url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=&_t=0.3741042528841678".format(base_url=base_url)
payload={}
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */*',
'language': 'zh_CN',
'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
'token':token
}
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
totalList = response_data['data']['totalList']
return totalList

# 获取工作定义列表
def get_definition_detail(project_code):
payload={}
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */
*',
'language': 'zh_CN',
'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
'token':token
}
all_data = []
pageNo = 1
while True:
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition?searchVal=&pageSize=50&pageNo={pageNo}".format(project_code=project_code,pageNo=pageNo,base_url=base_url)
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
page_data = response_data['data']['totalList']
totalPage = response_data['data']['totalPage']

if len(page_data) == 0:
print('工作定义列表为空,退出循环...')
break
all_data.extend(page_data)

if pageNo >= totalPage:
print('工作定义列表到头了,退出循环...')
break
pageNo += 1
# 返回全部数据
return all_data

# 获取工作定义的版本信息列表,注意,这里从第二页开始!!!size是 20
def get_version_detail(project_code,dag_code,current_version):
payload={}
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */*',
'language': 'zh_CN',
'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
'Referer': "{base_url}/dolphinscheduler/ui/".format(base_url=base_url),
'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
'token':token
}

all_version = []
pageNo = 2

while True:
if pageNo <= 1:
print('获取工作定义的版本信息列表,pageNo 必须大于1!!!')
break

url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions?searchVal=&pageSize=20&pageNo={pageNo}".format(project_code=project_code,dag_code=dag_code,pageNo=pageNo,base_url=base_url)
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
page_data = response_data['data']['totalList']
totalPage = response_data['data']['totalPage']

if len(page_data) == 0:
print('version列表为空,退出循环...')
break

for page in page_data:
version = int(page['version'])
# 保留近20个版本
if version + 20 <= current_version:
all_version.append(version)

if pageNo >= totalPage:
print('version列表到头了,退出循环...')
break

pageNo += 1

# TODO 分析all_data里面是否包含 current_version

# 返回正常的数据
return all_version

def delete(project_code,dag_code,version):
print('即将删除的项目,工作流以及版本')
print(project_code)
print(dag_code)
print(version)
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions/{version}".format(project_code=project_code,dag_code=dag_code,version=version,base_url=base_url)
# 'processInstanceIds=89767'
payload={}
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */
*',
'language': 'zh_CN',
'sessionId': '680b2a0e-624c-4804-9e9e-58c7d4a0b44c',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': 'http://10.1.19.150:7080',
'Referer': 'http://10.1.19.150:7080/dolphinscheduler/ui/',
'Accept-Language': 'zh-CN,zh;q=0.9,pt;q=0.8,en;q=0.7',
'token':token,
'Cookie': 'sessionId=680b2a0e-624c-4804-9e9e-58c7d4a0b44c; language=zh_CN; userName=admin; HERA_Token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzc29JZCI6Ii0xIiwic3NvX25hbWUiOiJhZG1pbiIsImF1ZCI6IjJkZmlyZSIsImlzcyI6ImhlcmEiLCJleHAiOjE2NDYwMjk3MDYsInVzZXJJZCI6IjEiLCJpYXQiOjE2NDU3NzA1MDYsInVzZXJuYW1lIjoiYWRtaW4ifQ.YEhr9Mi7FDsQIAn5GJorB0U3lL92KQA8YvP26QMhh9g; sessionId=680b2a0e-624c-4804-9e9e-58c7d4a0b44c'
}
response = requests.request("DELETE", url, headers=headers, data=payload)
print('执行结果如下:')
print(response.text)

if __name__ == '__main__':
# # 需要处理的项目
projects = get_project_list()
# 依次处理项目
for project in projects:
project_code = project['code']
print('正在处理项目:'+ str(project_code))
all_dags = get_definition_detail(project_code)
for dag in all_dags:
# 工作流code和当前版本
dag_code = dag['code']
current_version = dag['version']
print(dag_code)
print(current_version)
# 获取该工作流历史版本记录...
all_data = get_version_detail(project_code,dag_code,current_version)
# TODO 删除
print(all_data)
for v in all_data:
delete(project_code,dag_code,v)

使用示例:dolphin_clean_version.py 是上面的脚本。

python  dolphin_clean_version.py

脚本在 GitHub 也维护了一份,欢迎 star 176412808828805e14a8872f9eb5c

https://github.com/aikuyun/dolphin_practices/blob/main/dolphin_clean_version.py

3.注意事项

1.token 获取的方式
176412808874221ef59e3c504ce3b


以上就使用 API 一键减少关系日志表和任务定义日志表的数据量的过程,如果有任何疑问,都可以与我交流,希望可以帮到你,下次见。



1764128089338b7b8f9061239481d
17641280897607a930f8814bdeb3f
1764128089338b7b8f9061239481d

1764128090833b0af496dea826c2f



用户案例



天翼云Zoom网易邮箱
每日互动 惠生工程 作业帮
博世智驾 蔚来汽车 长城汽车
集度长安汽车思科网讯
食行生鲜联通医疗联想
新网银行唯品富邦消费金融
自如有赞伊利当贝大数据
珍岛集团传智教育Bigo
YY直播 拈花云科太美医疗
Cisco Webex兴业证券


1764128090833b0af496dea826c2f



迁移实战



Azkaban Ooize(当贝迁移案例)
airflow (有赞迁移案例)
Air2phin(迁移工具)
Airflow迁移实践

1764128090833b0af496dea826c2f



发版消息




Apache DolphinScheduler 3.2.2版本正式发布!
Apache DolphinScheduler 3.2.1 版本发布:增强功能与安全性的全面升级
Apache DolphinScheduler 3.3.0 Alpha发布,功能增强与性能优化大升级!


1764128090833b0af496dea826c2f



加入社区



关注社区的方式有很多:

  • GitHub: https://github.com/apache/dolphinscheduler
  • 官网:https://dolphinscheduler.apache.org/en-us
  • 订阅开发者邮件:dev@dolphinscheduler@apache.org(向邮箱发送任意内容,收到邮件后回复同意订阅即可)
  • X.com:@DolphinSchedule
  • YouTube:https://www.youtube.com/@apachedolphinscheduler
  • Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg

同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

非代码方式包括:

完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

‍代码方式包括:

查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

如果你❤️小海豚,就来为我点亮Star吧!

https://github.com/apache/dolphinscheduler

17641280925434f33ce91bb9eaa4a


17641280931684bafe7d1de408052

你的好友秀秀子拍了拍你

并请你帮她点一下“分享”