本文将详细剖析Quartz的原理机制,以及在Dolphinscheduler中使用Quartz的原理。
QRTZ_JOB_DETAILS
和 QRTZ_TRIGGERS
是中心表,定义了任务与触发器之间的关系;
QRTZ_TRIGGERS
表通过外键关联了多个触发器类型表,如 QRTZ_SIMPLE_TRIGGERS
和 QRTZ_CRON_TRIGGERS
,用于实现不同类型的触发方式;
QRTZ_FIRED_TRIGGERS
用于记录每次任务执行的历史,与任务和触发器表都有关联;
QRTZ_CALENDARS
用于定义触发器的日历排除规则,QRTZ_PAUSED_TRIGGER_GRPS 用于管理触发器组的暂停状态;
QRTZ_SCHEDULER_STATE
和 QRTZ_LOCKS
主要用于集群环境中的任务调度协调,确保高可用性。
org.apache.dolphinscheduler.api.controller.SchedulerController#createSchedule--org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#insertSchedule....Schedule scheduleObj = new Schedule();Date now = new Date();scheduleObj.setTenantCode(tenantCode);scheduleObj.setProjectName(project.getName());scheduleObj.setProcessDefinitionCode(processDefineCode);scheduleObj.setProcessDefinitionName(processDefinition.getName());ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);scheduleObj.setCrontab(scheduleParam.getCrontab());scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());scheduleObj.setWarningType(warningType);scheduleObj.setWarningGroupId(warningGroupId);scheduleObj.setFailureStrategy(failureStrategy);scheduleObj.setCreateTime(now);scheduleObj.setUpdateTime(now);scheduleObj.setUserId(loginUser.getId());scheduleObj.setUserName(loginUser.getUserName());scheduleObj.setReleaseState(ReleaseState.OFFLINE);scheduleObj.setProcessInstancePriority(processInstancePriority);scheduleObj.setWorkerGroup(workerGroup);scheduleObj.setEnvironmentCode(environmentCode);scheduleMapper.insert(scheduleObj);....
核心其实就是向 schedule 表中插入了一条数据而已,如下 :
org.apache.dolphinscheduler.api.controller.SchedulerController#publishScheduleOnline--org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#onlineScheduler----org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#doOnlineScheduler------org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler#insertOrUpdateScheduleTask精简代码如下 : // TODO 使用schedule id和projectId封装 JobKey,比如jobName=job_25(schedulerId),jobGroup=jobgroup_1(projectId)JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId);// TODO 使用projectId和schedule封装jobDataMap,里面封装的是projectId、scheduleId和schedule(JSON存储)Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule);// TODO 获取cron表达式String cronExpression = schedule.getCrontab();// TODO 获取时区String timezoneId = schedule.getTimezoneId();// TODO 定时调度的开启时间Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);// TODO 定时调度的结束时间Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);jobDetail jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build();jobDetail.getJobDataMap().putAll(jobDataMap);// TODO 创建一个Jobscheduler.addJob(jobDetail, false, true);// TODO 封装TriggerTriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());CronTrigger cronTrigger = newTrigger() .withIdentity(triggerKey) .startAt(startDate) .endAt(endDate) .withSchedule( cronSchedule(cronExpression) .withMisfireHandlingInstructionIgnoreMisfires() .inTimeZone(DateUtils.getTimezone(timezoneId))) .forJob(jobDetail).build();// TODO 开始调度scheduler.scheduleJob(cronTrigger);
存储每个任务的详细信息
存储触发器的基本信息,是所有触发器类型的父表
存储 Cron 表达式触发器(Cron Trigger)的信息
org.apache.dolphinscheduler.scheduler.quartz.ProcessScheduleTask,这个类是 qrtz_job_details 中的 JOB_CLASS_NAME 字段protected void executeInternal(JobExecutionContext context) { JobDataMap dataMap = context.getJobDetail().getJobDataMap(); int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID); int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID); Date scheduledFireTime = context.getScheduledFireTime(); Date fireTime = context.getFireTime(); Command command = new Command(); command.setCommandType(CommandType.SCHEDULER); command.setExecutorId(schedule.getUserId()); command.setFailureStrategy(schedule.getFailureStrategy()); command.setProcessDefinitionCode(schedule.getProcessDefinitionCode()); command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : schedule.getWorkerGroup(); command.setWorkerGroup(workerGroup); command.setTenantCode(schedule.getTenantCode()); command.setEnvironmentCode(schedule.getEnvironmentCode()); command.setWarningType(schedule.getWarningType()); command.setProcessInstancePriority(schedule.getProcessInstancePriority()); command.setProcessDefinitionVersion(processDefinition.getVersion()); commandService.createCommand(command);}
说白了,这个就是quartz的一个回调函数,最终生成Command。
转载自Journey
原文链接:https://segmentfault.com/a/1190000045471756