Apache 的Worker模块是其分布式调度系统的核心组件之一,负责任务执行、资源管理及集群动态调度。本文将通过源码剖析,揭示其设计思想与实现细节.
Worker服务的Netty提供和Master JDK动态代理接口调用,请参考Dolphinscheduler告警模块解说,不再重复地说。
简说 :
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator
@RpcServicepublic interface ITaskInstanceOperator { @RpcMethod TaskInstanceDispatchResponse dispatchTask(TaskInstanceDispatchRequest taskInstanceDispatchRequest); @RpcMethod TaskInstanceKillResponse killTask(TaskInstanceKillRequest taskInstanceKillRequest); @RpcMethod TaskInstancePauseResponse pauseTask(TaskInstancePauseRequest taskPauseRequest); @RpcMethod UpdateWorkflowHostResponse updateWorkflowInstanceHost(UpdateWorkflowHostRequest updateWorkflowHostRequest);}
对实现了@RpcService的接口和@RpcMethod的方法,进行Worker的Netty handler注入和Master动态代理实现。
(TaskInstanceDispatchOperationFunction)
WorkerConfig : 其实就是从Worker模块下 application.yaml 下读取 worker 开头的配置
WorkerTaskExecutorFactoryBuilder : 是任务执行器工厂的构造器,里面封装了 DefaultWorkerTaskExecutorFactory(默认Worker任务执行器工厂) ,DefaultWorkerTaskExecutorFactory工厂又封装了 DefaultWorkerTaskExecutor 的创建。DefaultWorkerTaskExecutor 的父类是WorkerTaskExecutor,WorkerTaskExecutor又是一个线程。好玩不?
WorkerTaskExecutorThreadPool : 其实就是Fixed线程池的封装而已
public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) { log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest); // TODO 任务执行上下文 TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext(); try { // TODO 设置worker地址 taskExecutionContext.setHost(workerConfig.getWorkerAddress()); // TODO 设置task日志存放路径 taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); // TODO MDC中设置流程实例id和任务实例id,好像只是put,没有get使用 LogUtils.setWorkflowAndTaskInstanceIDMDC( taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); // check server status, if server is not running, return failed to reject this task if (!ServerLifeCycleManager.isRunning()) { log.error("server is not running. reject task: {}", taskExecutionContext.getProcessInstanceId()); return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(), "server is not running"); } TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); // TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder .createWorkerTaskExecutorFactory(taskExecutionContext) .createWorkerTaskExecutor(); // todo: hold the workerTaskExecutor // TODO 直接进行任务的提交 if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) { log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName()); return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(), "WorkerManagerThread is full"); } else { log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName()); return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId()); } } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); }}
LogUtils.getTaskInstanceLogFullPath(taskExecutionContext) 解析
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 获取任务日志的全路径
/** * Get task instance log full path. * * @param taskExecutionContext task execution context. * @return task instance log full path. */ public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) { return getTaskInstanceLogFullPath( DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); }
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 拼接出任务日志的全路径
/** * todo: Remove the submitTime parameter? * The task instance log full path, the path is like:{log.base}/{taskSubmitTime}/{workflowDefinitionCode}/{workflowDefinitionVersion}/{}workflowInstance}/{taskInstance}.log * * @param taskFirstSubmitTime task first submit time * @param workflowDefinitionCode workflow definition code * @param workflowDefinitionVersion workflow definition version * @param workflowInstanceId workflow instance id * @param taskInstanceId task instance id. * @return task instance log full path. */ public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime, Long workflowDefinitionCode, int workflowDefinitionVersion, int workflowInstanceId, int taskInstanceId) { if (TASK_INSTANCE_LOG_BASE_PATH == null) { throw new IllegalArgumentException( "Cannot find the task instance log base path, please check your logback.xml file"); } final String taskLogFileName = Paths.get( String.valueOf(workflowDefinitionCode), String.valueOf(workflowDefinitionVersion), String.valueOf(workflowInstanceId), String.format("%s.log", taskInstanceId)).toString(); return TASK_INSTANCE_LOG_BASE_PATH .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null)) .resolve(taskLogFileName) .toString(); }
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogBasePath : 读取logback-spring.xml中的配置,获取任务实例日志的基础路径,其实就是获取根目录下/logs为基础路径
/** * Get task instance log base absolute path, this is defined in logback.xml * * @return */ public static Path getTaskInstanceLogBasePath() { return Optional.of(LoggerFactory.getILoggerFactory()) .map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT"))) .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE"))) .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator()))) .map(TaskLogDiscriminator::getLogBase) .map(e -> Paths.get(e).toAbsolutePath()) .orElse(null); }
worker的 logback-spring.xml :
<configuration scan="true" scanPeriod="120 seconds"> <property name="log.base" value="logs"/> ... <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender"> <filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/> <Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator"> <key>taskInstanceLogFullPath</key> <logBase>${log.base}</logBase> </Discriminator> <sift> <appender name="FILE-${taskInstanceLogFullPath}" class="ch.qos.logback.core.FileAppender"> <file>${taskInstanceLogFullPath}</file> <encoder> <pattern> [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n </pattern> <charset>UTF-8</charset> </encoder> <append>true</append> </appender> </sift> </appender> ... <root level="INFO"> <appender-ref ref="STDOUT"/> <appender-ref ref="TASKLOGFILE"/> </root></configuration>
最终地址是:
/opt/dolphinscheduler/worker-server/logs/20240615/13929490938784/1/1815/1202.log
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceDispatchOperationFunction#operate
// TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder .createWorkerTaskExecutorFactory(taskExecutionContext) .createWorkerTaskExecutor(); // todo: hold the workerTaskExecutor // TODO 直接进行任务的提交 if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) { log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName()); return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(), "WorkerManagerThread is full"); } else { log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName()); return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId()); }
直接使用 workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)
进行任务的提交
WorkerTaskExecutor 是一个线程,既然是线程,是不是要看一下run :
public void run() { try { // TODO MDC中设置流程实例和任务实例,其实就相当于是ThreadLocal使用一样 LogUtils.setWorkflowAndTaskInstanceIDMDC( taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); // TODO MDC中设置任务的日志路径 LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); // TODO 打印任务的头部 TaskInstanceLogHeader.printInitializeTaskContextHeader(); // TODO 进行任务的初始化,其实就是做了任务的开始时间和taskAppId(流程实例id + 任务实例id) initializeTask(); // TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功 if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); taskExecutionContext.setEndTime(System.currentTimeMillis()); WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); // TODO 通过worker消息发送器将结果信息发送过去 workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); log.info( "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); return; } // TODO 打印任务插件的头部 TaskInstanceLogHeader.printLoadTaskInstancePluginHeader(); // TODO 执行之前 beforeExecute(); // TODO 回调函数 TaskCallBack taskCallBack = TaskCallbackImpl.builder() .workerMessageSender(workerMessageSender) .taskExecutionContext(taskExecutionContext) .build(); TaskInstanceLogHeader.printExecuteTaskHeader(); // TODO 执行 executeTask(taskCallBack); TaskInstanceLogHeader.printFinalizeTaskHeader(); // TODO 执行之后 afterExecute(); closeLogAppender(); } catch (Throwable ex) { log.error("Task execute failed, due to meet an exception", ex); afterThrowing(ex); closeLogAppender(); } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); LogUtils.removeTaskInstanceLogFullPathMDC(); } }
重点分析:
2.5.1、空跑
如果是空跑,任务直接成功,不执行
// TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功 if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); taskExecutionContext.setEndTime(System.currentTimeMillis()); WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); // TODO 通过worker消息发送器将结果信息发送过去 workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); log.info( "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); return; }
2.5.2、 beforeExecute()
执行之前的准备工作,比如说给Master汇报说自己正在运行、创建租户(linux上用户)、创建工作路径、下载所需资源文件、任务初始化**
protected void beforeExecute() { // TODO 先设置为RUNNING状态 taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION); // TODO 向Master发送消息,告诉Master这个任务正在运行 workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING); log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(), taskExecutionContext.getWorkflowInstanceHost()); // In most of case the origin tenant is the same as the current tenant // Except `default` tenant. The originTenant is used to download the resources // TODO 租户信息 String originTenant = taskExecutionContext.getTenantCode(); String tenant = TaskExecutionContextUtils.getOrCreateTenant(workerConfig, taskExecutionContext); taskExecutionContext.setTenantCode(tenant); log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode()); // TODO 创建工作路径 TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext); log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath()); TaskChannel taskChannel = Optional.ofNullable(taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType())) .orElseThrow(() -> new TaskPluginException(taskExecutionContext.getTaskType() + " task plugin not found, please check the task type is correct.")); log.info("Create TaskChannel: {} successfully", taskChannel.getClass().getName()); // TODO 下载资源 ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(originTenant, taskChannel, storageOperate, taskExecutionContext); taskExecutionContext.setResourceContext(resourceContext); log.info("Download resources successfully: \n{}", taskExecutionContext.getResourceContext()); TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate); log.info("Download upstream files: {} successfully", TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.IN)); // TODO 创建任务 task = taskChannel.createTask(taskExecutionContext); log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType()); // todo: remove the init method, this should initialize in constructor method // TODO 任务进行初始化 task.init(); log.info("Success initialized task plugin instance successfully"); task.getParameters().setVarPool(taskExecutionContext.getVarPool()); log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool()); }
1、日志打印
log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(), taskExecutionContext.getWorkflowInstanceHost());这里需要打印的是 taskExecutionContext.getWorkflowInstanceHost(),不应该是taskExecutionContext.getHost()。就是说你给Master汇报信息的呢,打印自己Worker节点的host干啥(自己肯定知道啊),有用的是当前Worker节点是给哪个Master节点汇报自己的任务状态的
2、创建租户
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#getOrCreateTenant
public static String getOrCreateTenant(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) { try { TenantConfig tenantConfig = workerConfig.getTenantConfig(); String tenantCode = taskExecutionContext.getTenantCode(); if (TenantConstants.DEFAULT_TENANT_CODE.equals(tenantCode) && tenantConfig.isDefaultTenantEnabled()) { log.info("Current tenant is default tenant, will use bootstrap user: {} to execute the task", TenantConstants.BOOTSTRAPT_SYSTEM_USER); return TenantConstants.BOOTSTRAPT_SYSTEM_USER; } boolean osUserExistFlag; // if Using distributed is true and Currently supported systems are linux,Should not let it // automatically // create tenants,so TenantAutoCreate has no effect if (tenantConfig.isDistributedTenantEnabled() && SystemUtils.IS_OS_LINUX) { // use the id command to judge in linux osUserExistFlag = OSUtils.existTenantCodeInLinux(tenantCode); } else if (OSUtils.isSudoEnable() && tenantConfig.isAutoCreateTenantEnabled()) { // if not exists this user, then create // TODO 默认走的是这里的分支,直接通过 sudo useradd -g %s %s 进行创建 OSUtils.createUserIfAbsent(tenantCode); osUserExistFlag = OSUtils.getUserList().contains(tenantCode); } else { osUserExistFlag = OSUtils.getUserList().contains(tenantCode); } if (!osUserExistFlag) { throw new TaskException(String.format("TenantCode: %s doesn't exist", tenantCode)); } return tenantCode; } catch (TaskException ex) { throw ex; } catch (Exception ex) { throw new TaskException( String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode())); } }
3、TaskChannel
TaskPluginManager Master启动的时候通 google的 @AutoService来完成SPI注册。
Master启动时候TaskPluginManager初始化
org.apache.dolphinscheduler.server.master.MasterServer#run
@PostConstruct public void run() throws SchedulerException { ...... // install task plugin // TODO 是通过 google的 @AutoService来进行SPI注册的 this.taskPluginManager.loadPlugin(); ...... }
org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager#loadPlugin
public void loadPlugin() { if (!loadedFlag.compareAndSet(false, true)) { log.warn("The task plugin has already been loaded"); return; } // TODO 实例化的时候是通过SPI进行加载的 PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class); for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) { String factoryName = entry.getKey(); TaskChannelFactory factory = entry.getValue(); log.info("Registering task plugin: {} - {}", factoryName, factory.getClass().getSimpleName()); taskChannelFactoryMap.put(factoryName, factory); taskChannelMap.put(factoryName, factory.create()); log.info("Registered task plugin: {} - {}", factoryName, factory.getClass().getSimpleName()); } }
核心逻辑其实就是
TaskChannelFactory 接口 :
public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI { TaskChannel create(); default SPIIdentify getIdentify() { return SPIIdentify.builder().name(getName()).build(); }}
Task插件都实现了TaskChannelFactory接口并使用@AutoService注解 :
以ShellTaskChannelFactory为例 :
@AutoService(TaskChannelFactory.class)public class ShellTaskChannelFactory implements TaskChannelFactory { @Override public TaskChannel create() { return new ShellTaskChannel(); } @Override public String getName() { return "SHELL"; } @Override public List<PluginParams> getParams() { List<PluginParams> paramsList = new ArrayList<>(); InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')") .addValidate(Validate.newBuilder() .setRequired(true) .build()) .build(); RadioParam runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG") .addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false)) .addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false)) .build(); paramsList.add(nodeName); paramsList.add(runFlag); return paramsList; }}
在这里创建了 ShellTaskChannel,也就是TaskChannel
4、下载所需资源
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#downloadResourcesIfNeeded
public static ResourceContext downloadResourcesIfNeeded(String tenant, TaskChannel taskChannel, StorageOperate storageOperate, TaskExecutionContext taskExecutionContext) { AbstractParameters abstractParameters = taskChannel.parseParameters( ParametersNode.builder() .taskType(taskExecutionContext.getTaskType()) .taskParams(taskExecutionContext.getTaskParams()) .build()); // TODO 其实这里如果要是Sql,这里直接 ArrayList<>()了,下面就不走了 List<ResourceInfo> resourceFilesList = abstractParameters.getResourceFilesList(); if (CollectionUtils.isEmpty(resourceFilesList)) { log.debug("There is no resource file need to download"); return new ResourceContext(); } ResourceContext resourceContext = new ResourceContext(); String taskWorkingDirectory = taskExecutionContext.getExecutePath(); for (ResourceInfo resourceInfo : resourceFilesList) { // TODO 在存储中的路径,比如说hdfs上的文件路径 String resourceAbsolutePathInStorage = resourceInfo.getResourceName(); // TODO 文件名称 String resourceRelativePath = storageOperate.getResourceFileName(tenant, resourceAbsolutePathInStorage); // TODO 本地的绝对路径 String resourceAbsolutePathInLocal = Paths.get(taskWorkingDirectory, resourceRelativePath).toString(); File file = new File(resourceAbsolutePathInLocal); if (!file.exists()) { try { long resourceDownloadStartTime = System.currentTimeMillis(); // TODO 资源进行下载 storageOperate.download(resourceAbsolutePathInStorage, resourceAbsolutePathInLocal, true); log.debug("Download resource file {} under: {} successfully", resourceAbsolutePathInStorage, resourceAbsolutePathInLocal); FileUtils.setFileTo755(file); WorkerServerMetrics .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime); WorkerServerMetrics .recordWorkerResourceDownloadSize(Files.size(Paths.get(resourceAbsolutePathInLocal))); WorkerServerMetrics.incWorkerResourceDownloadSuccessCount(); } catch (Exception ex) { WorkerServerMetrics.incWorkerResourceDownloadFailureCount(); throw new TaskException( String.format("Download resource file: %s error", resourceAbsolutePathInStorage), ex); } } // TODO 封装resourceContext ResourceContext.ResourceItem resourceItem = ResourceContext.ResourceItem.builder() .resourceAbsolutePathInStorage(resourceAbsolutePathInStorage) .resourceRelativePath(resourceRelativePath) .resourceAbsolutePathInLocal(resourceAbsolutePathInLocal) .build(); resourceContext.addResourceItem(resourceItem); } return resourceContext; }
5、下载上游文件(上下游文件的传递)
示例如下 :
upTask :
downTask :
核心逻辑 : 上下游文件传递其实也很简单,就是针对本节点来说就是在本地生成对应的文件,然后上传到比如说HDFS类型的资源中心,然后下游节点会跟进上游taskName.输出变量进行指定资源中心文件的下载。
downTask中的downloadUpstreamFiles逻辑:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#downloadUpstreamFiles
public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) { // TODO 上游传递过来的变量池 List<Property> varPools = getVarPools(taskExecutionContext); // get map of varPools for quick search Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x)); // get "IN FILE" parameters // TODO 其实就是看localParams的参数有没有为IN的FILE的本地参数 List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN); // TODO 一般情况下,就是这里就结束了 if (localParamsProperty.isEmpty()) { return; } String executePath = taskExecutionContext.getExecutePath(); // data path to download packaged data // TODO 下载的临时目录 String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP); log.info("Download upstream files..."); for (Property property : localParamsProperty) { // TODO 这里其实就是获取 /** * varPoolsMap 如下 : * {"prop":"upTask.file-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_text.txt"} * {"prop":"upTask.dir-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_data_ds_pack.zip"} * * localParamsProperty 如下 : * {"prop":"input_dir","direct":"IN","type":"FILE","value":"upTask.dir-data"} */ // TODO 所以这里是不为null的 Property inVarPool = varPoolsMap.get(property.getValue()); if (inVarPool == null) { log.error("{} not in {}", property.getValue(), varPoolsMap.keySet()); throw new TaskException(String.format("Can not find upstream file using %s, please check the key", property.getValue())); } String resourcePath = inVarPool.getValue(); // TODO 其实就是在封装本地的路径 // TODO 这里注意啊,比如说脚本中 cat input_dir/test1/text.txt,input_dir这个东西是下载路径拼接上的 String targetPath = String.format("%s/%s", executePath, property.getProp()); String downloadPath; // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the // targetPath // TODO 判断是否是zip压缩 boolean isPack = resourcePath.endsWith(PACK_SUFFIX); if (isPack) { downloadPath = String.format("%s/%s", downloadTmpPath, new File(resourcePath).getName()); } else { downloadPath = targetPath; } try { // TODO 资源中心路径 String resourceWholePath = storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath); log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath); // TODO 系在到本地 storageOperate.download(resourceWholePath, downloadPath, true); } catch (IOException ex) { throw new TaskException("Download file from storage error", ex); } // unpack if the data is packaged if (isPack) { File downloadFile = new File(downloadPath); log.info("Unpack {} to {}", downloadPath, targetPath); // TODO 如果是zip就是将本地临时目录下的压缩文件解压到目标路径下 ZipUtil.unpack(downloadFile, new File(targetPath)); } } // delete DownloadTmp Folder if DownloadTmpPath exists try { // TODO 临时目录下文件删除掉 org.apache.commons.io.FileUtils.deleteDirectory(new File(downloadTmpPath)); } catch (IOException e) { log.error("Delete DownloadTmpPath {} failed, this will not affect the task status", downloadTmpPath, e); } }
6、创建任务并初始化
其实就是步骤3中,创建完毕TaskChannel,然后调用createTask,返回AbstractTask,然后调用init方法
......// TODO 创建任务task = taskChannel.createTask(taskExecutionContext);log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType());// todo: remove the init method, this should initialize in constructor method// TODO 任务进行初始化task.init();log.info("Success initialized task plugin instance successfully");......
7、给AbstractParameters设置变量池
// TODO 给任务设置变量池// TODO 一般情况下 taskExecutionContext.getVarPool()这里就为nulltask.getParameters().setVarPool(taskExecutionContext.getVarPool());log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
注意: 默认情况下,这个taskExecutionContext.getVarPool()
是空的,除非上游有OUT变量。
2.5.3、任务执行
// TODO 回调函数,这个还是很关键的把workerMessageSender、taskExecutionContext以构造函数放到了TaskCallBack中// TODO 所以taskExecutionContext里面是有之前的内容的TaskCallBack taskCallBack = TaskCallbackImpl.builder() .workerMessageSender(workerMessageSender) .taskExecutionContext(taskExecutionContext) .build();.......// TODO 执行executeTask(taskCallBack);
executeTask(taskCallBack):是核心代码,封装了Worker任务的真正的执行逻辑,参数传递的TaskCallBack,用于任务状态的回报(向Master)
下面就来细说executeTask(taskCallBack)的逻辑 :
public void executeTask(TaskCallBack taskCallBack) throws TaskException { if (task == null) { throw new IllegalArgumentException("The task plugin instance is not initialized"); } // TODO 这里会进行真正的任务处理 task.handle(taskCallBack);}
其中的task其实就是AbstractTask,在beforeExecute中 taskChannel.createTask。是Task抽象父类(以ShellTask为例展开说明,其他任务类型类似)
org.apache.dolphinscheduler.plugin.task.shell.ShellTask#handle
public void handle(TaskCallBack taskCallBack) throws TaskException { try { IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())) // TODO 这里就是要进行变量的替换 .appendScript(shellParameters.getRawScript()); // TODO shell执行 TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); // TODO 执行结果,退出状态码 setExitStatusCode(commandExecuteResult.getExitStatusCode()); // TODO 设置进程ID setProcessId(commandExecuteResult.getProcessId()); // TODO shellCommandExecutor.getTaskOutputParams()这返回的是 output -> 123 shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current Shell task has been interrupted", e); setExitStatusCode(EXIT_CODE_FAILURE); throw new TaskException("The current Shell task has been interrupted", e); } catch (Exception e) { log.error("shell task error", e); setExitStatusCode(EXIT_CODE_FAILURE); throw new TaskException("Execute shell task error", e); }}
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
默认走的是 BashShellInterceptorBuilder
public class ShellInterceptorBuilderFactory { private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash"); @SuppressWarnings("unchecked") public static IShellInterceptorBuilder newBuilder() { // TODO 默认的走的是这个逻辑 if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) { return new BashShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) { return new ShShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) { return new CmdShellInterceptorBuilder(); } throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE); }}
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
是向BaseShellInterceptorBuilder的propertyMap中进行taskExecutionContext.getPrepareParamsMap()参数的设置(注意 : taskExecutionContext.getPrepareParamsMap()是在Master中进行的封装。
.appendScript(shellParameters.getRawScript())
是向BaseShellInterceptorBuilder的scripts进行设置值。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run
public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder, TaskCallBack taskCallBack) throws Exception { TaskResponse result = new TaskResponse(); // todo: we need to use state like JDK Thread to make sure the killed task should not be executed iShellInterceptorBuilder = iShellInterceptorBuilder // TODO 设置执行路径 .shellDirectory(taskRequest.getExecutePath()) // TODO 这里设置shell 名字 .shellName(taskRequest.getTaskAppId()); // Set system env // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv); } // Set custom env // TODO 设置自定义的env if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { // TODO 向 customEnvScripts 中加入 iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig()); } // Set k8s config (This is only work in Linux) if (taskRequest.getK8sTaskExecutionContext() != null) { iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml()); } // Set sudo (This is only work in Linux) // TODO 设置sudo为true的模式 iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable()); // Set tenant (This is only work in Linux) // TODO 设置租户 iShellInterceptorBuilder.runUser(taskRequest.getTenantCode()); // Set CPU Quota (This is only work in Linux) if (taskRequest.getCpuQuota() != null) { iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota()); } // Set memory Quota (This is only work in Linux) if (taskRequest.getMemoryMax() != null) { iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax()); } // TODO 这个是重点 IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build(); // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式 process = iShellInterceptor.execute(); // parse process output // TODO 这里解析到进程的输出 parseProcessOutput(this.process); // collect pod log collectPodLogIfNeeded(); int processId = getProcessId(this.process); result.setProcessId(processId); // cache processId taskRequest.setProcessId(processId); // print process id log.info("process start, process id is: {}", processId); // if timeout occurs, exit directly long remainTime = getRemainTime(); // update pid before waiting for the run to finish if (null != taskCallBack) { // TODO 更新任务实例信息 taskCallBack.updateTaskInstanceInfo(processId); } // waiting for the run to finish boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS); TaskExecutionStatus kubernetesStatus = ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId()); if (taskOutputFuture != null) { try { // Wait the task log process finished. taskOutputFuture.get(); } catch (ExecutionException e) { log.error("Handle task log error", e); } } if (podLogOutputFuture != null) { try { // Wait kubernetes pod log collection finished podLogOutputFuture.get(); // delete pod after successful execution and log collection ProcessUtils.cancelApplication(taskRequest); } catch (ExecutionException e) { log.error("Handle pod log error", e); } } // if SHELL task exit if (status && kubernetesStatus.isSuccess()) { // SHELL task state result.setExitStatusCode(this.process.exitValue()); } else { log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", taskRequest.getTaskTimeout()); result.setExitStatusCode(EXIT_CODE_FAILURE); cancelApplication(); } int exitCode = this.process.exitValue(); String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); return result; }
设置默认的环境变量:
// Set system env// TODO 在这里是设置默认的,比如说可以设置为 /etc/profileif (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);}
org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils
public List<String> ENV_SOURCE_LIST = Arrays.stream( Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s -> s.split(",")) .orElse(new String[0])) .map(String::trim) .filter(StringUtils::isNotBlank) .collect(Collectors.toList());
读取的是 common.properties
,这里可以配置默认的环境变量
# The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile# 默认是空,比如说可以是shell.env_source_list=/etc/profile
// TODO 这个是重点IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder#build
public BashShellInterceptor build() throws FileOperateException, IOException { // TODO 这里是生成shell脚本的核心点,写到指定目录下 generateShellScript(); // TODO 封装命令 List<String> bootstrapCommand = generateBootstrapCommand(); // TODO 实例化BashShellInterceptor return new BashShellInterceptor(bootstrapCommand, shellDirectory);}
org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#bootstrapCommandInSudoMode
注意 : 这个方法里面有两层含义,如果是资源限制走的是bootstrapCommandInResourceLimitMode
,其实这里还蕴藏着一个大大的BUG(我只修改了ShellTask),针对其他类型的Shell封装的任务,比如说MR、Spark、Flink等等,如果走资源限制,这里就有问题,因为这些任务在页面上不能设置CPU和内存的Quota),否则走的是sudo -u 租户 -i /opt/xx.sh
。
private List<String> bootstrapCommandInSudoMode() { // TODO 如果task.resource.limit.state为false,这里的逻辑不会走,也不会走CPU和内存的限制 if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) { return bootstrapCommandInResourceLimitMode(); } List<String> bootstrapCommand = new ArrayList<>(); bootstrapCommand.add("sudo"); if (StringUtils.isNotBlank(runUser)) { bootstrapCommand.add("-u"); bootstrapCommand.add(runUser); } bootstrapCommand.add("-i"); bootstrapCommand.add(shellAbsolutePath().toString()); return bootstrapCommand;}
// TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式process = iShellInterceptor.execute();
org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor#execute
public Process execute() throws IOException { // init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的 processBuilder.directory(new File(workingDirectory)); // merge error information to standard output stream processBuilder.redirectErrorStream(true); processBuilder.command(executeCommands); log.info("Executing shell command : {}", String.join(" ", executeCommands)); return processBuilder.start();}
其实就是使用 ProcessBuilder
进行任务的提交。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#parseProcessOutput
// TODO 解析输出private void parseProcessOutput(Process process) { // todo: remove this this thread pool. ExecutorService getOutputLogService = ThreadUtils .newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName()); getOutputLogService.execute(() -> { TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); // TODO 这里正好的读取process.getInputStream()的输入 try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { // TODO 这里设置了任务的日志路径 LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); String line; while ((line = inReader.readLine()) != null) { // TODO 日志缓冲区 logBuffer.add(line); // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)} taskOutputParameterParser.appendParseLog(line); } processLogOutputIsSuccess = true; } catch (Exception e) { log.error("Parse var pool error", e); processLogOutputIsSuccess = true; } finally { // TODO 在这里的时候就将 taskInstanceLogFullPath 删除了 LogUtils.removeTaskInstanceLogFullPathMDC(); } taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); }); getOutputLogService.shutdown(); ExecutorService parseProcessOutputExecutorService = ThreadUtils .newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" + taskRequest.getTaskName()); taskOutputFuture = parseProcessOutputExecutorService.submit(() -> { try { LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); // TODO 对于非pod(k8s)的任务,其实就是processLogOutputIsSuccess这个标识,这个标识是在上面,就是任务运行完毕了 while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) { if (logBuffer.size() > 1) { logHandler.accept(logBuffer); logBuffer.clear(); logBuffer.add(EMPTY_STRING); } else { // TODO 如果没有日志输出,默认等待1s Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL); } } } catch (Exception e) { log.error("Output task log error", e); } finally { LogUtils.removeTaskInstanceLogFullPathMDC(); } }); parseProcessOutputExecutorService.shutdown();}
解说里面核心的两个逻辑:
结果日志打印
protected LinkedBlockingQueue<String> logBuffer;public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler, TaskExecutionContext taskRequest) { this.logHandler = logHandler; this.taskRequest = taskRequest; this.logBuffer = new LinkedBlockingQueue<>(); this.logBuffer.add(EMPTY_STRING); if (this.taskRequest != null) { // set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages this.taskRequest.setLogBufferEnable(true); } }
通过 logBuffer 临时存放日志,供parseProcessOutputExecutorService现成消费
日志的生产端 :while ((line = inReader.readLine()) != null) { // TODO 日志缓冲区 logBuffer.add(line); // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)} taskOutputParameterParser.appendParseLog(line);}日志的消费端 :this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);public void logHandle(LinkedBlockingQueue<String> logs) { StringJoiner joiner = new StringJoiner("\n\t"); while (!logs.isEmpty()) { joiner.add(logs.poll()); } log.info(" -> {}", joiner);}while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) { if (logBuffer.size() > 1) { logHandler.accept(logBuffer); logBuffer.clear(); logBuffer.add(EMPTY_STRING); } else { // TODO 如果没有日志输出,默认等待1s Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL); }}
所以查看如果是Shell输出日志都是 -> 开头的,比如说
[INFO] 2024-06-24 09:35:44.678 +0800 - -> . ├── 1893_1321.sh └── input_dir ├── test1 │ └── text.txt └── test2 └── text.txt 3 directories, 3 files test1 message test2 message
解析变量池
while ((line = inReader.readLine()) != null) { // TODO 日志缓冲区 logBuffer.add(line); // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)} taskOutputParameterParser.appendParseLog(line);}
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser#appendParseLog
public void appendParseLog(String logLine) { if (logLine == null) { return; } // TODO 刚开始进来,是不会走这里的 if (currentTaskOutputParam != null) { if (currentTaskOutputParam.size() > maxOneParameterRows || currentTaskOutputParamLength > maxOneParameterLength) { log.warn( "The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param", String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows); currentTaskOutputParam = null; currentTaskOutputParamLength = 0; return; } // continue to parse the rest of line int i = logLine.indexOf(")}"); if (i == -1) { // the end of var pool not found currentTaskOutputParam.add(logLine); currentTaskOutputParamLength += logLine.length(); } else { // the end of var pool found currentTaskOutputParam.add(logLine.substring(0, i + 2)); Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam)); if (keyValue.getKey() != null && keyValue.getValue() != null) { // TODO 解析完毕就放入到taskOutputParams中 taskOutputParams.put(keyValue.getKey(), keyValue.getValue()); } currentTaskOutputParam = null; currentTaskOutputParamLength = 0; // continue to parse the rest of line if (i + 2 != logLine.length()) { appendParseLog(logLine.substring(i + 2)); } } return; } int indexOfVarPoolBegin = logLine.indexOf("${setValue("); if (indexOfVarPoolBegin == -1) { indexOfVarPoolBegin = logLine.indexOf("#{setValue("); } if (indexOfVarPoolBegin == -1) { return; } currentTaskOutputParam = new ArrayList<>(); appendParseLog(logLine.substring(indexOfVarPoolBegin));}
解析完毕就放入到taskOutputParams中
更新Pid(向Master汇报)
// update pid before waiting for the run to finishif (null != taskCallBack) { // TODO 更新任务实例信息 taskCallBack.updateTaskInstanceInfo(processId);}
超时判断
long remainTime = getRemainTime();private long getRemainTime() { long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime()) / 1000; long remainTime = taskRequest.getTaskTimeout() - usedTime; if (remainTime < 0) { throw new RuntimeException("task execution time out"); } return remainTime;}......// waiting for the run to finish// TODO 这里其实就是一个超时等待,其实就是说如果不设置超时等待时间,无限等待boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
// TODO 设置退出码// if SHELL task exitif (status && kubernetesStatus.isSuccess()) { // SHELL task state result.setExitStatusCode(this.process.exitValue());} else { log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", taskRequest.getTaskTimeout()); result.setExitStatusCode(EXIT_CODE_FAILURE); cancelApplication();}int exitCode = this.process.exitValue();String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
// TODO 执行结果,退出状态码setExitStatusCode(commandExecuteResult.getExitStatusCode());// TODO 设置进程IDsetProcessId(commandExecuteResult.getProcessId());// TODO shellCommandExecutor.getTaskOutputParams()这返回的是比如说 output -> 123shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#dealOutParam
public void dealOutParam(Map<String, String> taskOutputParams) { // TODO 其实就是说如果localParams不存在,就算设置了输出也不管用 if (CollectionUtils.isEmpty(localParams)) { return; } // TODO 这里其实就是过滤出来localParams为OUT的参数 List<Property> outProperty = getOutProperty(localParams); if (CollectionUtils.isEmpty(outProperty)) { return; } // TODO 如果taskOutputParams为空,输出参数会放入到varPool中 if (MapUtils.isEmpty(taskOutputParams)) { outProperty.forEach(this::addPropertyToValPool); return; } // TODO 这里其实就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value // TODO 最终放到变量池中 for (Property info : outProperty) { String propValue = taskOutputParams.get(info.getProp()); if (StringUtils.isNotEmpty(propValue)) { info.setValue(propValue); addPropertyToValPool(info); } else { log.warn("Cannot find the output parameter {} in the task output parameters", info.getProp()); } }}
这里其实就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value,等待向Master汇报存在TaskInstance的变量池中。
2.5.4、任务执行之后(收尾工作)
protected void afterExecute() throws TaskException { if (task == null) { throw new TaskException("The current task instance is null"); } // TODO 是否要发送告警,使用JDK动态代理 RPC通信调用alert模块AlertBootstrapService sendAlertIfNeeded(); // TODO 发送结果 sendTaskResult(); WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); // TODO common.properties development.state=false,默认是false。如果设置true // TODO 就会开发模式,意味着Dolpinscheduler封装的脚本、jar包不清理 log.info("Remove the current task execute context from worker cache"); clearTaskExecPathIfNeeded();}
发送结果
protected void sendTaskResult() { taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus()); taskExecutionContext.setProcessId(task.getProcessId()); taskExecutionContext.setAppIds(task.getAppIds()); // TODO 其实就是发送变量池,这里是变量池 taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool())); taskExecutionContext.setEndTime(System.currentTimeMillis()); // upload out files and modify the "OUT FILE" property in VarPool // TODO 上传输出文件并修改输出文件到变量池中 TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate); log.info("Upload output files: {} successfully", TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT)); // TODO 发送任务的结果 workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); log.info("Send task execute status: {} to master : {}", taskExecutionContext.getCurrentExecutionStatus().name(), taskExecutionContext.getWorkflowInstanceHost());}
Shell状态码小插曲
[root@node opt]# vim test.sh [root@node opt]# sh test.sh me is journey[root@node opt]# echo $?0[root@node opt]# vim test.sh [root@node opt]# sh test.sh test.sh: line 2: echo1: command not found[root@node opt]# echo $?127[root@node opt]# vim test.sh [root@node opt]# sh test.sh me is 10.253.26.85Killed[root@node opt]# echo $?137
总结 : 其实就是想说SHELL任务正常的退出码为0,被kill掉的状态码为137。其他为异常。
任务状态码判断逻辑:
taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());org.apache.dolphinscheduler.plugin.task.api.AbstractTask#getExitStatus// 其实就是说如果状态码返回为0,任务为成功;状态码为137为KILL。其他状态为失败。而task.getExitStatus()状态是由executeTask中设置完成的public TaskExecutionStatus getExitStatus() { switch (getExitStatusCode()) { case TaskConstants.EXIT_CODE_SUCCESS: return TaskExecutionStatus.SUCCESS; case TaskConstants.EXIT_CODE_KILL: return TaskExecutionStatus.KILL; default: return TaskExecutionStatus.FAILURE; }}
上传输出文件到资源中心:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#uploadOutputFiles
public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) throws TaskException { List<Property> varPools = getVarPools(taskExecutionContext); // get map of varPools for quick search Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x)); // get OUTPUT FILE parameters List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT); if (localParamsProperty.isEmpty()) { return; } log.info("Upload output files ..."); for (Property property : localParamsProperty) { // get local file path String path = String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()); // TODO packIfDir 和 crc其实就是想说,如果是目录,就对目录进行打zip包,然后生成crc。如果是文件就对文件生成crc String srcPath = packIfDir(path); // get crc file path String srcCRCPath = srcPath + CRC_SUFFIX; try { FileUtils.writeContent2File(FileUtils.getFileChecksum(path), srcCRCPath); } catch (IOException ex) { throw new TaskException(ex.getMessage(), ex); } // get remote file path // TODO DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName()); String resourceCRCPath = resourcePath + CRC_SUFFIX; try { // upload file to storage // TODO 以hdfs来说 // TODO hdfs跟路径/tenantCode/resources/DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName String resourceWholePath = storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath); String resourceCRCWholePath = storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourceCRCPath); log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath); storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true); log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath); storageOperate.upload(taskExecutionContext.getTenantCode(), srcCRCPath, resourceCRCWholePath, false, true); } catch (IOException ex) { throw new TaskException("Upload file to storage error", ex); } // update varPool Property oriProperty; // if the property is not in varPool, add it if (varPoolsMap.containsKey(property.getProp())) { // 理论上不会走到这个分支 oriProperty = varPoolsMap.get(property.getProp()); } else { oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue()); // TODO 添加到变量池中 varPools.add(oriProperty); } // TODO 这里就设置了任务名称.property name oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp())); // TODO 这里很关键,其实就是把资源的相对路径放入到了变量池对应的value中 oriProperty.setValue(resourcePath); } // TODO 这里是设置FILE的变量池 taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));}
发送任务的结果 :
workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#operate
public TaskInstanceKillResponse operate(TaskInstanceKillRequest taskInstanceKillRequest) { log.info("Receive TaskInstanceKillRequest: {}", taskInstanceKillRequest); // TODO 任务实例 int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId(); try { LogUtils.setTaskInstanceIdMDC(taskInstanceId); // TODO Worker任务执行器 WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId); if (workerTaskExecutor == null) { log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId); return TaskInstanceKillResponse.fail("Cannot find WorkerTaskExecutor"); } // TODO 任务执行上下文 TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext(); LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); // TODO 这里会进行kill boolean result = doKill(taskExecutionContext); // TODO 使用 Process.destroy() 是 Java 中 Process 类的一个方法,用于销毁与该 Process 对象关联的子进程 this.cancelApplication(workerTaskExecutor); int processId = taskExecutionContext.getProcessId(); // TODO 这里其实想说的是,如果processId为0,直接把该任务的状态设置为KILL,然后在Worker上报信息的时候就会把KILL状态上报上去 // TODO 一定要注意,当前情况不一定是真正的kill掉,只是让DS里面的状态是对的 if (processId == 0) { workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL); // todo: the task might be executed, but the processId is 0 WorkerTaskExecutorHolder.remove(taskInstanceId); log.info("The task has not been executed and has been cancelled, task id:{}", taskInstanceId); return TaskInstanceKillResponse.success(taskExecutionContext); } // TODO 这个其实就是说明,我kill掉了。成功了。然后这个时候Worker其实会感知到任务被kill掉,在他的sendResult FINISH的时候上报 // TODO 上去就可以了 taskExecutionContext .setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); // TODO 删除重试消息 messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId()); return TaskInstanceKillResponse.success(taskExecutionContext); } finally { LogUtils.removeTaskInstanceIdMDC(); LogUtils.removeTaskInstanceLogFullPathMDC(); }}
杀进程和yarn上的任务 :
// TODO 这里会进行killboolean result = doKill(taskExecutionContext);
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#doKill
private boolean doKill(TaskExecutionContext taskExecutionContext) { // kill system process // TODO 杀死Shell关联的进程 boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId()); // TODO kill yarn or k8s application try { ProcessUtils.cancelApplication(taskExecutionContext); } catch (TaskException e) { return false; } return processFlag;}
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#killProcess
杀进程和子进程: 注意,这里和官网有修改,如果有异常打印warn就好,因为有时候不能有权限杀死所有进程。
protected boolean killProcess(String tenantCode, Integer processId) { // todo: directly interrupt the process if (processId == null || processId.equals(0)) { return true; } try { String pidsStr = ProcessUtils.getPidsStr(processId); if (!Strings.isNullOrEmpty(pidsStr)) { String cmd = String.format("kill -9 %s", pidsStr); cmd = OSUtils.getSudoCmd(tenantCode, cmd); log.info("process id:{}, cmd:{}", processId, cmd); OSUtils.exeCmd(cmd); } } catch (Exception e) { log.warn("kill task error", e); } return true;}
杀死yarn上的任务
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils#cancelApplication
public static void cancelApplication(TaskExecutionContext taskExecutionContext) { try { // TODO k8s if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) { if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) { // Set empty container name for Spark on K8S task applicationManagerMap.get(ResourceManagerType.KUBERNETES) .killApplication(new KubernetesApplicationManagerContext( taskExecutionContext.getK8sTaskExecutionContext(), taskExecutionContext.getTaskAppId(), "")); } } else { // TODO YARN String host = taskExecutionContext.getHost(); String executePath = taskExecutionContext.getExecutePath(); String tenantCode = taskExecutionContext.getTenantCode(); List<String> appIds; // TODO 容错的走这个逻辑 if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) { // is failover appIds = Arrays.asList(taskExecutionContext.getAppIds().split(COMMA)); } else { String logPath = taskExecutionContext.getLogPath(); String appInfoPath = taskExecutionContext.getAppInfoPath(); if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) { log.error( "Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}", host, logPath, appInfoPath, executePath, tenantCode); throw new TaskException("Cancel application failed!"); } log.info("Get appIds from worker {}, taskLogPath: {}", host, logPath); // TODO 这里就是正则解析log获取appIds appIds = LogUtils.getAppIds(logPath, appInfoPath, PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds)); } // TODO 如果这里说明appIds是不存在的 if (CollectionUtils.isEmpty(appIds)) { log.info("The appId is empty"); return; } ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN); applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds)); } } catch (Exception e) { log.error("Cancel application failed.", e); }}
task日志中使用正则表达式来解析appIds,这里默认走log,不走aop。
appIds = LogUtils.getAppIds(logPath, appInfoPath, PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));public List<String> getAppIds(String logPath, String appInfoPath, String fetchWay) { if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) { log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay); // TODO 如果走aop拦截的写的日志文件中读取 return getAppIdsFromAppInfoFile(appInfoPath); } else { log.info("Start finding appId in {}, fetch way: {} ", logPath, fetchWay); // TODO 从日志中进行正则匹配 return getAppIdsFromLogFile(logPath); }}
真正地来杀yarn上的任务
applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager#killApplicationpublic boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException { YarnApplicationManagerContext yarnApplicationManagerContext = (YarnApplicationManagerContext) applicationManagerContext; String executePath = yarnApplicationManagerContext.getExecutePath(); String tenantCode = yarnApplicationManagerContext.getTenantCode(); List<String> appIds = yarnApplicationManagerContext.getAppIds(); try { String commandFile = String.format("%s/%s.kill", executePath, String.join(Constants.UNDERLINE, appIds)); String cmd = getKerberosInitCommand() + "yarn application -kill " + String.join(Constants.SPACE, appIds); execYarnKillCommand(tenantCode, commandFile, cmd); } catch (Exception e) { log.warn("Kill yarn application {} failed", appIds, e); } return true;}execYarnKillCommand需要注意,因为使用 yarn application -kill。yarn命令可能没有。增加ENV_SOURCE_LISTprivate void execYarnKillCommand(String tenantCode, String commandFile, String cmd) throws Exception { StringBuilder sb = new StringBuilder(); sb.append("#!/bin/sh\n"); sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); sb.append("cd $BASEDIR\n"); // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(env -> sb.append("source " + env + "\n")); } sb.append("\n\n"); sb.append(cmd); File f = new File(commandFile); if (!f.exists()) { org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8); } String runCmd = String.format("%s %s", Constants.SH, commandFile); runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd); log.info("kill cmd:{}", runCmd); org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);}
总结 : 如果成功把任务kill掉了,WorkerTaskExecutor会感知到的,进而进行KILL状态的FINISH汇报。如果任务已经完成,如果PID=0,将任务上下文状态设置为KILL,线程池中移除该WorkerTaskExecutor线程,WorkerTaskExecutorHolder移除该缓存。
public class TaskInstancePauseOperationFunction implements ITaskInstanceOperationFunction<TaskInstancePauseRequest, TaskInstancePauseResponse> { @Override public TaskInstancePauseResponse operate(TaskInstancePauseRequest taskInstancePauseRequest) { try { LogUtils.setTaskInstanceIdMDC(taskInstancePauseRequest.getTaskInstanceId()); log.info("Receive TaskInstancePauseRequest: {}", taskInstancePauseRequest); log.warn("TaskInstancePauseOperationFunction is not support for worker task yet!"); return TaskInstancePauseResponse.success(); } finally { LogUtils.removeTaskInstanceIdMDC(); } }}
划重点 :
其实暂停来说对于Worker来说,什么也不做。也做不了,你想想真的都能让任务暂停么?除非是引擎程序中有所控制,像MR、SPARK、FLINK这种是不能暂停,暂停的核心逻辑是给流程实例发送一个通知,告诉流程实例我要进行流程的暂停,让正在运行任务的下一个任务进行暂停,当然比如说只有一个任务,任务暂停不了,最后只能成功。还有一种情况就是比如说是最后一个任务,也暂停不了。还有就是执行的很快,你暂停的时候,正好程序要往下执行,而下游已没有任务的情况。这种都是暂停不了的。
这个属于容错,容错章节再详细说。