公司的数据开发平台需要用到做任务调度,其中一个场景是:上游任务执行结束后,需要将任务执行结果传递给下游任务。
DolphinScheduler肯定是能实现任务之间的传参的,具体的可以看:DolphinScheduler | 文档中心 (https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/guide/parameter/context)。
但是官方案例中介绍的任务之间传参是提前在管理台上配置好的,OK,那么问题来了,如何实现任务之间的动态传参呢?比如说我们自定义Task,然后在Task执行结束后将执行结果封装,传递给DAG中的下一个Task。
如果DolphinScheduler官方的案例没有演示如何动态传,我们开发者应该如何去处理这种需求?
我是这么做的:分析DolphinScheduler内置的Task,总有一个Task是需要传递参数给下游的。我这里盲猜两个,一个是__SqlTask
,一个是HttpTask
。我的观点是:总不能做完SQL查询,或者做完HTTP请求后就不管结果吧?
分析HttpTask源码,直接找到HttpTask的handle方法,DolphinScheduler中,任何Task的具体执行逻辑都在这个handle方法中。
handle
方法分析
@Overridepublic void handle(TaskCallBack taskCallBack) throws TaskException { long startTime = System.currentTimeMillis(); String formatTimeStamp = DateUtils.formatTimeStamp(startTime); String statusCode = null; String body = null; try ( CloseableHttpClient client = createHttpClient(); CloseableHttpResponse response = sendRequest(client)) { statusCode = String.valueOf(getStatusCode(response)); body = getResponseBody(response); exitStatusCode = validResponse(body, statusCode); // 看名字应该就能猜到是处理请求结果的 addDefaultOutput(body); long costTime = System.currentTimeMillis() - startTime; log.info( "startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {} milliseconds, statusCode : {}, body : {}, log : {}", formatTimeStamp, httpParameters.getUrl(), httpParameters.getHttpMethod(), costTime, statusCode, body, output); } catch (Exception e) { appendMessage(e.toString()); exitStatusCode = -1; log.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e); throw new TaskException("Execute http task failed", e); }}
继续看addDefaultOutput
方法
public void addDefaultOutput(String response) { // put response in output // 创建Property对象 Property outputProperty = new Property(); // 设置Prop,也就是设置Key outputProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), "response")); // 设置是入参还是出参,这里是出参,因为是将结果给下游任务 outputProperty.setDirect(Direct.OUT); // 设置参数类型,VARCHAR表示就是字符串 outputProperty.setType(DataType.VARCHAR); // 设置Value,就是http请求结果 outputProperty.setValue(response); // 重点:将Property添加到varPool中 httpParameters.addPropertyToValPool(outputProperty);}
handler
方法分析
@Overridepublic void handle(TaskCallBack taskCallBack) throws TaskException { log.info("Full sql parameters: {}", sqlParameters); log.info( "sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(), sqlParameters.getLocalParams(), sqlParameters.getUdfs(), sqlParameters.getShowType(), sqlParameters.getConnParams(), sqlParameters.getVarPool(), sqlParameters.getLimit()); try { // get datasource baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType, sqlTaskExecutionContext.getConnectionParams()); List<String> subSqls = DataSourceProcessorProvider.getDataSourceProcessor(dbType) .splitAndRemoveComment(sqlParameters.getSql()); // ready to execute SQL and parameter entity Map List<SqlBinds> mainStatementSqlBinds = subSqls .stream() .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()) .orElse(new ArrayList<>()) .stream() .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()) .orElse(new ArrayList<>()) .stream() .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList()); // execute sql task // 这个方法就是处理sql结果的 executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); } catch (Exception e) { setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); log.error("sql task error", e); throw new TaskException("Execute sql task failed", e); }}
所以我们在看下executeFuncAndSql
方法内部实现
public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds, List<SqlBinds> preStatementsBinds, List<SqlBinds> postStatementsBinds, List<String> createFuncs) throws Exception { try ( Connection connection = DataSourceClientProvider.getAdHocConnection(DbType.valueOf(sqlParameters.getType()), baseConnectionParam)) { // create temp function if (CollectionUtils.isNotEmpty(createFuncs)) { createTempFunction(connection, createFuncs); } // pre execute executeUpdate(connection, preStatementsBinds, "pre"); // main execute String result = null; // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send result = executeQuery(connection, mainStatementsBinds.get(0), "main"); } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { // non query statement String updateResult = executeUpdate(connection, mainStatementsBinds, "main"); result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams()); } // deal out params // 这个方法就是来处理结果的 sqlParameters.dealOutParam(result); // post execute executeUpdate(connection, postStatementsBinds, "post"); } catch (Exception e) { log.error("execute sql error: {}", e.getMessage()); throw e; }}
通过dealOutParam
看具体处理细节
public void dealOutParam(String result) { if (CollectionUtils.isEmpty(localParams)) { return; } List<Property> outProperty = getOutProperty(localParams); if (CollectionUtils.isEmpty(outProperty)) { return; } if (StringUtils.isEmpty(result)) { varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty)); return; } List<Map<String, String>> sqlResult = getListMapByString(result); if (CollectionUtils.isEmpty(sqlResult)) { return; } // if sql return more than one line if (sqlResult.size() > 1) { Map<String, List<String>> sqlResultFormat = new HashMap<>(); // init sqlResultFormat Set<String> keySet = sqlResult.get(0).keySet(); for (String key : keySet) { sqlResultFormat.put(key, new ArrayList<>()); } for (Map<String, String> info : sqlResult) { for (String key : info.keySet()) { sqlResultFormat.get(key).add(String.valueOf(info.get(key))); } } for (Property info : outProperty) { if (info.getType() == DataType.LIST) { info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp()))); } } } else { // result only one line Map<String, String> firstRow = sqlResult.get(0); for (Property info : outProperty) { info.setValue(String.valueOf(firstRow.get(info.getProp()))); } } // 本质还是将sql结果处理后保存在varPool中,varPool才是关键所在 varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));}
所以,源代码分析到这,我们就知道了:如果想实现动态传参,那么我们需要将传递的数据封装成__org.apache.dolphinscheduler.plugin.task.api.model.Property
,然后添加到内置集合变量org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#varPool
中
这里我们不去讨论自定义Task的具体实现步骤,这不是本文的重点。
当我们实现自定义Task后,可以这样编码实现动态传参:
Property outputProperty = new Property();// 添加我们要传递的数据KeyoutputProperty.setProp("xxxxKey"));// OUToutputProperty.setDirect(Direct.OUT);// 这里传递的数据是什么类型就写什么类型,建议通过json字符串处理数据outputProperty.setType(DataType.VARCHAR);// 添加我们要传递的数据KeyoutputProperty.setValue("xxxxValue");// 这里的xxxxParameters是我们自己自定义的,一般情况下,一个Task对应一个ParametersxxxxParameters.addPropertyToValPool(outputProperty);
DolphinScheduler内部有将__List<Property> varPool
转换成Map<String, Property> varParams
的逻辑,然后会将varParams
与其他的参数合并,最后通过taskExecutionContext.setPrepareParamsMap(propertyMap)
将数据设置给Map<String, Property> prepareParamsMap
。
关于DolphinScheduler(海豚调度器)是什么,能做什么,怎么使用等等,这里我就不再赘述,大家感兴趣的可以去看看官方文档:DolphinScheduler | 文档中心 (https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2)
希望通过本篇文章能让各位读者掌握Task之间的动态传参,然后应用在实际工作中。如果本篇文章能给屏幕前的你们或多或少的一些帮助,也是我喜闻乐见的。
本文由 科技 提供发布支持!