纯干货 | Dolphinscheduler Master模块源码剖析

此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看Master模块源码的原理。

此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看Master模块源码的原理。

Master Slot计算

1743575838083c4ca4238a0b923820dcc509a6f75849b

核心代码逻辑
org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify

public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {    List<Server> serverList = masterNodeInfo.values().stream()            // TODO 这里其实就是过滤掉buzy的master节点            .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))            .map(this::convertHeartBeatToServer).collect(Collectors.toList());    // TODO 同步master节点    syncMasterNodes(serverList);}

计算 totalSlot和currentSlot

private void syncMasterNodes(List<Server> masterNodes) {    slotLock.lock();    try {        this.masterPriorityQueue.clear();        // TODO 这里会把所有的master节点都放入到masterPriorityQueue中,比如说 [192.168.220.1:12345,192.168.220.2:12345]        this.masterPriorityQueue.putAll(masterNodes);        // TODO 就是获取到本地ip的在队列中的位置        int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());        // TODO 所有节点数量        int tempTotalSlot = masterNodes.size();        // TODO 正常情况下不会小于0        if (tempCurrentSlot < 0) {            totalSlot = 0;            currentSlot = 0;            log.warn("Current master is not in active master list");        } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {            // TODO 这里其实就是记录的是比如说一共有两个slot,我的slot是0或者1            totalSlot = tempTotalSlot;            currentSlot = tempCurrentSlot;            log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);        }    } finally {        slotLock.unlock();    }}

this.masterPriorityQueue.putAll(masterNodes); 会计算索引

public void putAll(Collection<Server> serverList) {    for (Server server : serverList) {        this.queue.put(server);    }    // TODO 这里更新了hostIndexMap,存放的是 <host:port> -> 索引    refreshMasterList();}private void refreshMasterList() {    hostIndexMap.clear();    Iterator<Server> iterator = queue.iterator();    int index = 0;    while (iterator.hasNext()) {        Server server = iterator.next();        String addr = NetUtils.getAddr(server.getHost(), server.getPort());        hostIndexMap.put(addr, index);        index += 1;    }}

Master消费Command生成流程实例

1743575838086c81e728d9d4c2f636f067f89cc14862c

command最终的获取逻辑:

比如说两个Master节点 : masterCount=2 thisMasterSlot=0  master1masterCount=2 thisMasterSlot=1  master2command中的数据如下 :1 master22 master13 master24 master1select *        from t_ds_command        where id % #{masterCount} = #{thisMasterSlot}        order by process_instance_priority, id asc            limit #{limit}

有没有感到疑惑,就是如果一个master更新到的最新的,一个没有更新到,怎么办?

比如说,master1节点是这样的1  master22  master13  master24  master1比如说,master2节点是这样的,是不是发现master2节点都是他的,都可以拉取消费?那就导致重复消费,比如说1这个command1 master12 master13 master14 master1

org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand

@Transactionalpublic @Nullable ProcessInstance handleCommand(String host,                                                   Command command) throws CronParseException, CodeGenerateException {    // TODO 创建流程实例    ProcessInstance processInstance = constructProcessInstance(command, host);    // cannot construct process instance, return null    if (processInstance == null) {        log.error("scan command, command parameter is error: {}", command);        commandService.moveToErrorCommand(command, "process instance is null");        return null;    }    processInstance.setCommandType(command.getCommandType());    processInstance.addHistoryCmd(command.getCommandType());    processInstance.setTestFlag(command.getTestFlag());    // if the processDefinition is serial    ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),            processInstance.getProcessDefinitionVersion());    // TODO 是否是串行执行    if (processDefinition.getExecutionType().typeIsSerial()) {        saveSerialProcess(processInstance, processDefinition);        if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {            setSubProcessParam(processInstance);            triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());            deleteCommandWithCheck(command.getId());            // todo: this is a bad design to return null here, whether trigger the task            return null;        }    } else {        // TODO 并行执行        processInstanceDao.upsertProcessInstance(processInstance);    }    // TODO 这里其实还会向triggerRelation表中插入一条数据,是流程实例和triggerCode的关系    triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());    // TODO 设置子流程参数    setSubProcessParam(processInstance);    // TODO 删除command    deleteCommandWithCheck(command.getId());    return processInstance;}

注意:这个方法是加@Transactional的,所以说创建流程实例和删除Command是在一个事物里面的,如果不同的Master消费到同一个Command。肯定会有一个删除Command失败,这时会抛出一个异常,这样就会让数据库进行回滚。

工作流启动流程

1743575838087eccbc87e4b5ce2fe28308fd9f2a7baf3

DAG切分 & 任务提交

1743575838087a87ff679a2f3e71d9181a67b7542122c

Master事件状态流转

1743575838086e4da3b7fbbce2345d7772b0674a318d5图连接 : Master事件状态流转

TaskEventService组件中的TaskEventDispatchThread(线程)和TaskEventHandlerThread(线程)解析

17435758384801679091c5a880faf6fb5e6087eb1b2dc


其实就是Master自己状态(DISPATCH)和Worker汇报上来的状态(RUNNING、UPDATE_PID、RESULT)都会放入到eventQueue,TaskEventDispatchThread(线程)会阻塞的方式进行获取,然后放入到对应的TaskExecuteRunnable中(注意 : 不执行的),只有通过TaskEventHandlerThread(线程)才会使用TaskExecuteThreadPool线程进行TaskExecuteRunnable的提交。

转载自Journey
原文链接:https://segmentfault.com/a/1190000044992842