【实战】一招搞定Shell调度!DolphinScheduler+ProcessBuilder超详细教程

同时,结合SpringBoot应用示例,展示了如何配置工作目录合并错误流监控执行状态,并输出日志信息,从而实现对Shell任务的高效管理和调度

本文将介绍在DolphinScheduler中使用ProcessBuilder执行Shell命令的方法。默认通过BashShellInterceptorBuilder封装Shell脚本并生成执行命令,支持普通模式和sudo模式运行。同时,结合Spring Boot应用示例,展示了如何配置工作目录、合并错误流、监控执行状态,并输出日志信息,从而实现对Shell任务的高效管理和调度。

1、ProcessBuilder DolphinScheduler中的使用

1.1、命令的封装

org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory

public class ShellInterceptorBuilderFactory {    private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");    @SuppressWarnings("unchecked")    public static IShellInterceptorBuilder newBuilder() {        // TODO 默认的走的是这个逻辑        if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {            return new BashShellInterceptorBuilder();        }        if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {            return new ShShellInterceptorBuilder();        }        if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {            return new CmdShellInterceptorBuilder();        }        throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);    }}

默认走的是 BashShellInterceptorBuilder

org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder

public class BashShellInterceptorBuilder        extends            BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {    @Override    public BashShellInterceptorBuilder newBuilder() {        return new BashShellInterceptorBuilder();    }    @Override    public BashShellInterceptor build() throws FileOperateException, IOException {        // TODO 这里是生成shell脚本的核心点        generateShellScript();        List<String> bootstrapCommand = generateBootstrapCommand();        // TODO 实例化BashShellInterceptor        return new BashShellInterceptor(bootstrapCommand, shellDirectory);    }    // 这个是如果不是sudo的方式,进行命令执行的前缀    @Override    protected String shellInterpreter() {        return "bash";    }    @Override    protected String shellExtension() {        return ".sh";    }    @Override    protected String shellHeader() {        return "#!/bin/bash";    }}

org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#generateBootstrapCommand

protected List<String> generateBootstrapCommand() {        if (sudoEnable) {            // TODO 默认是走这里的,其实就是sudo -u 租户 -i /opt/xx.sh            return bootstrapCommandInSudoMode();        }        // TODO bash /opt/xx.sh        return bootstrapCommandInNormalMode();    }

bootstrapCommandInSudoMode():

private List<String> bootstrapCommandInSudoMode() {        if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {            return bootstrapCommandInResourceLimitMode();        }        List<String> bootstrapCommand = new ArrayList<>();        bootstrapCommand.add("sudo");        if (StringUtils.isNotBlank(runUser)) {            bootstrapCommand.add("-u");            bootstrapCommand.add(runUser);        }        bootstrapCommand.add("-i");        bootstrapCommand.add(shellAbsolutePath().toString());        return bootstrapCommand;    }

bootstrapCommandInNormalMode():

private List<String> bootstrapCommandInNormalMode() {        List<String> bootstrapCommand = new ArrayList<>();        bootstrapCommand.add(shellInterpreter());        bootstrapCommand.add(shellAbsolutePath().toString());        return bootstrapCommand;    }

1.2、命令的执行

org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor

public abstract class BaseShellInterceptor implements IShellInterceptor {    protected final String workingDirectory;    protected final List<String> executeCommands;    protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {        this.executeCommands = executeCommands;        this.workingDirectory = workingDirectory;    }    @Override    public Process execute() throws IOException {        // init process builder        ProcessBuilder processBuilder = new ProcessBuilder();        // setting up a working directory        // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的        processBuilder.directory(new File(workingDirectory));        // merge error information to standard output stream        processBuilder.redirectErrorStream(true);        processBuilder.command(executeCommands);        log.info("Executing shell command : {}", String.join(" ", executeCommands));        return processBuilder.start();    }}

2、最佳实践实例

2.1、pom.xml配置

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter</artifactId>  <version>2.6.1</version></dependency>

2.2、pom.xml配置

@SpringBootApplicationpublic class Application {    public static void main(String[] args) throws Exception {        SpringApplication.run(Application.class, args);        List<String> executeCommands = new ArrayList<>();        executeCommands.add("sudo");        executeCommands.add("-u");        executeCommands.add("qiaozhanwei");        executeCommands.add("-i");        executeCommands.add("/opt/test/my.sh");        ProcessBuilder processBuilder = new ProcessBuilder();        // setting up a working directory        // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的        processBuilder.directory(new File("/opt/test"));        // merge error information to standard output stream        processBuilder.redirectErrorStream(true);        processBuilder.command(executeCommands);        Process process = processBuilder.start();        try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {            String line;            while ((line = inReader.readLine()) != null) {                // TODO 终端日志输出                System.out.println(line);            }        } catch (Exception e) {            e.printStackTrace();        }        // TODO 等10分钟,如果10分钟不结束,返回且status为false        boolean status = process.waitFor(10, TimeUnit.MINUTES);        System.out.println("status ->" + status);    }}

2.3、日志输出结果

  .   ____          _            __ _ _ /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/  ___)| |_)| | | | | || (_| |  ) ) ) )  '  |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot ::                (v2.6.1)2024-06-15 18:33:16.090  INFO 31834 --- [           main] com.journey.test.Application             : Starting Application using Java 1.8.0_401 on 192.168.1.4 with PID 31834 (/Users/qiaozhanwei/IdeaProjects/springboot2/target/classes started by qiaozhanwei in /Users/qiaozhanwei/IdeaProjects/springboot2)2024-06-15 18:33:16.091  INFO 31834 --- [           main] com.journey.test.Application             : No active profile set, falling back to default profiles: default2024-06-15 18:33:16.244  INFO 31834 --- [           main] com.journey.test.Application             : Started Application in 0.252 seconds (JVM running for 0.42)Number of Maps  = 1Samples per Map = 1000002024-06-15 18:33:16,790 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWrote input for Map #0Starting Job2024-06-15 18:33:17,329 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at kvm-10-253-26-85/10.253.26.85:80322024-06-15 18:33:17,586 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/qiaozhanwei/.staging/job_1694766249884_09312024-06-15 18:33:17,837 INFO input.FileInputFormat: Total input files to process : 12024-06-15 18:33:18,024 INFO mapreduce.JobSubmitter: number of splits:12024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1694766249884_09312024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Executing with tokens: []2024-06-15 18:33:18,648 INFO conf.Configuration: resource-types.xml not found2024-06-15 18:33:18,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.2024-06-15 18:33:18,698 INFO impl.YarnClientImpl: Submitted application application_1694766249884_09312024-06-15 18:33:18,734 INFO mapreduce.Job: The url to track the job: http://kvm-10-253-26-85:8088/proxy/application_1694766249884_0931/2024-06-15 18:33:18,734 INFO mapreduce.Job: Running job: job_1694766249884_09312024-06-15 18:33:24,978 INFO mapreduce.Job: Job job_1694766249884_0931 running in uber mode : false2024-06-15 18:33:24,978 INFO mapreduce.Job:  map 0% reduce 0%2024-06-15 18:33:29,153 INFO mapreduce.Job:  map 100% reduce 0%2024-06-15 18:33:34,384 INFO mapreduce.Job:  map 100% reduce 100%2024-06-15 18:33:34,455 INFO mapreduce.Job: Job job_1694766249884_0931 completed successfully2024-06-15 18:33:34,565 INFO mapreduce.Job: Counters: 54    File System Counters        FILE: Number of bytes read=28        FILE: Number of bytes written=548863        FILE: Number of read operations=0        FILE: Number of large read operations=0        FILE: Number of write operations=0        HDFS: Number of bytes read=278        HDFS: Number of bytes written=215        HDFS: Number of read operations=9        HDFS: Number of large read operations=0        HDFS: Number of write operations=3        HDFS: Number of bytes read erasure-coded=0    Job Counters         Launched map tasks=1        Launched reduce tasks=1        Data-local map tasks=1        Total time spent by all maps in occupied slots (ms)=37968        Total time spent by all reduces in occupied slots (ms)=79360        Total time spent by all map tasks (ms)=2373        Total time spent by all reduce tasks (ms)=2480        Total vcore-milliseconds taken by all map tasks=2373        Total vcore-milliseconds taken by all reduce tasks=2480        Total megabyte-milliseconds taken by all map tasks=4859904        Total megabyte-milliseconds taken by all reduce tasks=10158080    Map-Reduce Framework        Map input records=1        Map output records=2        Map output bytes=18        Map output materialized bytes=28        Input split bytes=160        Combine input records=0        Combine output records=0        Reduce input groups=2        Reduce shuffle bytes=28        Reduce input records=2        Reduce output records=0        Spilled Records=4        Shuffled Maps =1        Failed Shuffles=0        Merged Map outputs=1        GC time elapsed (ms)=87        CPU time spent (ms)=1420        Physical memory (bytes) snapshot=870387712        Virtual memory (bytes) snapshot=9336647680        Total committed heap usage (bytes)=2716860416        Peak Map Physical memory (bytes)=457416704        Peak Map Virtual memory (bytes)=3773362176        Peak Reduce Physical memory (bytes)=412971008        Peak Reduce Virtual memory (bytes)=5563285504    Shuffle Errors        BAD_ID=0        CONNECTION=0        IO_ERROR=0        WRONG_LENGTH=0        WRONG_MAP=0        WRONG_REDUCE=0    File Input Format Counters         Bytes Read=118    File Output Format Counters         Bytes Written=97Job Finished in 17.292 secondsEstimated value of Pi is 3.14120000000000000000status ->trueProcess finished with exit code 0