DolphinScheduler心脏:Quartz的定时任务调度框架深度解析

Quartz是一个开源的Java作业调度框架,它提供了强大的定时任务调度功能。在DolphinScheduler中,Quartz用于实现定时任务的调度和管理。DolphinScheduler通过QuartzExecutorImpl类与Quartz集成,将工作流及其定时管理操作与Quartz调度框架相结合,实现任务的调度执行。本文将详细剖析Quartz的原理机制,以及在Dolphinscheduler中使用Quartz的原理。

Quartz是一个开源的Java作业调度框架,它提供了强大的定时任务调度功能。在DolphinScheduler中,Quartz用于实现定时任务的调度和管理。DolphinScheduler通过QuartzExecutorImpl类与Quartz集成,将工作流及其定时管理操作与Quartz调度框架相结合,实现任务的调度执行。

本文将详细剖析Quartz的原理机制,以及在Dolphinscheduler中使用Quartz的原理。

Quartz ER图

17320054223533f2f45f2b02a21bf1f7b4f2c734b04ea

  1. QRTZ_JOB_DETAILS  QRTZ_TRIGGERS 是中心表,定义了任务与触发器之间的关系;

  2. QRTZ_TRIGGERS 表通过外键关联了多个触发器类型表,如 QRTZ_SIMPLE_TRIGGERS  QRTZ_CRON_TRIGGERS,用于实现不同类型的触发方式;

  3. QRTZ_FIRED_TRIGGERS 用于记录每次任务执行的历史,与任务和触发器表都有关联;

  4. QRTZ_CALENDARS 用于定义触发器的日历排除规则,QRTZ_PAUSED_TRIGGER_GRPS 用于管理触发器组的暂停状态;

  5. QRTZ_SCHEDULER_STATE  QRTZ_LOCKS 主要用于集群环境中的任务调度协调,确保高可用性。

Dolphinscheduler Quartz使用

新建SHELL任务

17320055339159bc58ffb47f7286fc629f63285521f61

流程定义上线并配置调度

1732005422399d7e7d2e1d673fdbcb5658716c9d83f51173200542243293e49490f5f5040ef5f5fe20f83e038a

定时上线

17320054223636dde0c7d3b41bfcac0c8cb5fea7a8bcd

流程实例运行结果

1732005422413bf7b87f280064ad7baafa959b3aa5350

原理剖析

创建调度

org.apache.dolphinscheduler.api.controller.SchedulerController#createSchedule--org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#insertSchedule....Schedule scheduleObj = new Schedule();Date now = new Date();scheduleObj.setTenantCode(tenantCode);scheduleObj.setProjectName(project.getName());scheduleObj.setProcessDefinitionCode(processDefineCode);scheduleObj.setProcessDefinitionName(processDefinition.getName());ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);scheduleObj.setCrontab(scheduleParam.getCrontab());scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());scheduleObj.setWarningType(warningType);scheduleObj.setWarningGroupId(warningGroupId);scheduleObj.setFailureStrategy(failureStrategy);scheduleObj.setCreateTime(now);scheduleObj.setUpdateTime(now);scheduleObj.setUserId(loginUser.getId());scheduleObj.setUserName(loginUser.getUserName());scheduleObj.setReleaseState(ReleaseState.OFFLINE);scheduleObj.setProcessInstancePriority(processInstancePriority);scheduleObj.setWorkerGroup(workerGroup);scheduleObj.setEnvironmentCode(environmentCode);scheduleMapper.insert(scheduleObj);....
Copy

核心其实就是向 schedule 表中插入了一条数据而已,如下 :

17320054232267878df599aef27a4ac1a5ffabae8ee75

调度上线

org.apache.dolphinscheduler.api.controller.SchedulerController#publishScheduleOnline--org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#onlineScheduler----org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#doOnlineScheduler------org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler#insertOrUpdateScheduleTask精简代码如下 : // TODO 使用schedule id和projectId封装 JobKey,比如jobName=job_25(schedulerId),jobGroup=jobgroup_1(projectId)JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId);// TODO 使用projectId和schedule封装jobDataMap,里面封装的是projectId、scheduleId和schedule(JSON存储)Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule);// TODO 获取cron表达式String cronExpression = schedule.getCrontab();// TODO 获取时区String timezoneId = schedule.getTimezoneId();// TODO 定时调度的开启时间Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);// TODO 定时调度的结束时间Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);jobDetail jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build();jobDetail.getJobDataMap().putAll(jobDataMap);// TODO 创建一个Jobscheduler.addJob(jobDetail, false, true);// TODO 封装TriggerTriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());CronTrigger cronTrigger = newTrigger()                    .withIdentity(triggerKey)                    .startAt(startDate)                    .endAt(endDate)                    .withSchedule(                            cronSchedule(cronExpression)                                    .withMisfireHandlingInstructionIgnoreMisfires()                                    .inTimeZone(DateUtils.getTimezone(timezoneId)))                    .forJob(jobDetail).build();// TODO 开始调度scheduler.scheduleJob(cronTrigger);
Copy

对应的表

存储每个任务的详细信息

17320054231980f0eb38feb9eb44e63db212d0775a7c7

存储触发器的基本信息,是所有触发器类型的父表

1732005423216ec08a41368cd720f6ba97322d159849a

存储 Cron 表达式触发器(Cron Trigger)的信息

17320054232310a6fa9448e6e138f183a6ce88cf9472f

调度执行

org.apache.dolphinscheduler.scheduler.quartz.ProcessScheduleTask,这个类是 qrtz_job_details 中的 JOB_CLASS_NAME 字段protected void executeInternal(JobExecutionContext context) {    JobDataMap dataMap = context.getJobDetail().getJobDataMap();    int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID);    int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);    Date scheduledFireTime = context.getScheduledFireTime();    Date fireTime = context.getFireTime();    Command command = new Command();    command.setCommandType(CommandType.SCHEDULER);    command.setExecutorId(schedule.getUserId());    command.setFailureStrategy(schedule.getFailureStrategy());    command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());    command.setScheduleTime(scheduledFireTime);    command.setStartTime(fireTime);    command.setWarningGroupId(schedule.getWarningGroupId());    String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP            : schedule.getWorkerGroup();    command.setWorkerGroup(workerGroup);    command.setTenantCode(schedule.getTenantCode());    command.setEnvironmentCode(schedule.getEnvironmentCode());    command.setWarningType(schedule.getWarningType());    command.setProcessInstancePriority(schedule.getProcessInstancePriority());    command.setProcessDefinitionVersion(processDefinition.getVersion());    commandService.createCommand(command);}
Copy

说白了,这个就是quartz的一个回调函数,最终生成Command。

转载自Journey
原文链接:https://segmentfault.com/a/1190000045471756