本文将介绍在中使用ProcessBuilder
执行Shell命令的方法。默认通过BashShellInterceptorBuilder
封装Shell脚本并生成执行命令,支持普通模式和sudo模式运行。同时,结合Spring Boot应用示例,展示了如何配置工作目录、合并错误流、监控执行状态,并输出日志信息,从而实现对Shell任务的高效管理和调度。
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
public class ShellInterceptorBuilderFactory { private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash"); @SuppressWarnings("unchecked") public static IShellInterceptorBuilder newBuilder() { // TODO 默认的走的是这个逻辑 if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) { return new BashShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) { return new ShShellInterceptorBuilder(); } if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) { return new CmdShellInterceptorBuilder(); } throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE); }}
默认走的是 BashShellInterceptorBuilder。
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder
public class BashShellInterceptorBuilder extends BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> { @Override public BashShellInterceptorBuilder newBuilder() { return new BashShellInterceptorBuilder(); } @Override public BashShellInterceptor build() throws FileOperateException, IOException { // TODO 这里是生成shell脚本的核心点 generateShellScript(); List<String> bootstrapCommand = generateBootstrapCommand(); // TODO 实例化BashShellInterceptor return new BashShellInterceptor(bootstrapCommand, shellDirectory); } // 这个是如果不是sudo的方式,进行命令执行的前缀 @Override protected String shellInterpreter() { return "bash"; } @Override protected String shellExtension() { return ".sh"; } @Override protected String shellHeader() { return "#!/bin/bash"; }}
org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#generateBootstrapCommand
protected List<String> generateBootstrapCommand() { if (sudoEnable) { // TODO 默认是走这里的,其实就是sudo -u 租户 -i /opt/xx.sh return bootstrapCommandInSudoMode(); } // TODO bash /opt/xx.sh return bootstrapCommandInNormalMode(); }
bootstrapCommandInSudoMode():
private List<String> bootstrapCommandInSudoMode() { if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) { return bootstrapCommandInResourceLimitMode(); } List<String> bootstrapCommand = new ArrayList<>(); bootstrapCommand.add("sudo"); if (StringUtils.isNotBlank(runUser)) { bootstrapCommand.add("-u"); bootstrapCommand.add(runUser); } bootstrapCommand.add("-i"); bootstrapCommand.add(shellAbsolutePath().toString()); return bootstrapCommand; }
bootstrapCommandInNormalMode():
private List<String> bootstrapCommandInNormalMode() { List<String> bootstrapCommand = new ArrayList<>(); bootstrapCommand.add(shellInterpreter()); bootstrapCommand.add(shellAbsolutePath().toString()); return bootstrapCommand; }
org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor
public abstract class BaseShellInterceptor implements IShellInterceptor { protected final String workingDirectory; protected final List<String> executeCommands; protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) { this.executeCommands = executeCommands; this.workingDirectory = workingDirectory; } @Override public Process execute() throws IOException { // init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的 processBuilder.directory(new File(workingDirectory)); // merge error information to standard output stream processBuilder.redirectErrorStream(true); processBuilder.command(executeCommands); log.info("Executing shell command : {}", String.join(" ", executeCommands)); return processBuilder.start(); }}
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.6.1</version></dependency>
@SpringBootApplicationpublic class Application { public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); List<String> executeCommands = new ArrayList<>(); executeCommands.add("sudo"); executeCommands.add("-u"); executeCommands.add("qiaozhanwei"); executeCommands.add("-i"); executeCommands.add("/opt/test/my.sh"); ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的 processBuilder.directory(new File("/opt/test")); // merge error information to standard output stream processBuilder.redirectErrorStream(true); processBuilder.command(executeCommands); Process process = processBuilder.start(); try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = inReader.readLine()) != null) { // TODO 终端日志输出 System.out.println(line); } } catch (Exception e) { e.printStackTrace(); } // TODO 等10分钟,如果10分钟不结束,返回且status为false boolean status = process.waitFor(10, TimeUnit.MINUTES); System.out.println("status ->" + status); }}
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.6.1)2024-06-15 18:33:16.090 INFO 31834 --- [ main] com.journey.test.Application : Starting Application using Java 1.8.0_401 on 192.168.1.4 with PID 31834 (/Users/qiaozhanwei/IdeaProjects/springboot2/target/classes started by qiaozhanwei in /Users/qiaozhanwei/IdeaProjects/springboot2)2024-06-15 18:33:16.091 INFO 31834 --- [ main] com.journey.test.Application : No active profile set, falling back to default profiles: default2024-06-15 18:33:16.244 INFO 31834 --- [ main] com.journey.test.Application : Started Application in 0.252 seconds (JVM running for 0.42)Number of Maps = 1Samples per Map = 1000002024-06-15 18:33:16,790 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWrote input for Map #0Starting Job2024-06-15 18:33:17,329 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at kvm-10-253-26-85/10.253.26.85:80322024-06-15 18:33:17,586 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/qiaozhanwei/.staging/job_1694766249884_09312024-06-15 18:33:17,837 INFO input.FileInputFormat: Total input files to process : 12024-06-15 18:33:18,024 INFO mapreduce.JobSubmitter: number of splits:12024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1694766249884_09312024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Executing with tokens: []2024-06-15 18:33:18,648 INFO conf.Configuration: resource-types.xml not found2024-06-15 18:33:18,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.2024-06-15 18:33:18,698 INFO impl.YarnClientImpl: Submitted application application_1694766249884_09312024-06-15 18:33:18,734 INFO mapreduce.Job: The url to track the job: http://kvm-10-253-26-85:8088/proxy/application_1694766249884_0931/2024-06-15 18:33:18,734 INFO mapreduce.Job: Running job: job_1694766249884_09312024-06-15 18:33:24,978 INFO mapreduce.Job: Job job_1694766249884_0931 running in uber mode : false2024-06-15 18:33:24,978 INFO mapreduce.Job: map 0% reduce 0%2024-06-15 18:33:29,153 INFO mapreduce.Job: map 100% reduce 0%2024-06-15 18:33:34,384 INFO mapreduce.Job: map 100% reduce 100%2024-06-15 18:33:34,455 INFO mapreduce.Job: Job job_1694766249884_0931 completed successfully2024-06-15 18:33:34,565 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=28 FILE: Number of bytes written=548863 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=278 HDFS: Number of bytes written=215 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=3 HDFS: Number of bytes read erasure-coded=0 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=37968 Total time spent by all reduces in occupied slots (ms)=79360 Total time spent by all map tasks (ms)=2373 Total time spent by all reduce tasks (ms)=2480 Total vcore-milliseconds taken by all map tasks=2373 Total vcore-milliseconds taken by all reduce tasks=2480 Total megabyte-milliseconds taken by all map tasks=4859904 Total megabyte-milliseconds taken by all reduce tasks=10158080 Map-Reduce Framework Map input records=1 Map output records=2 Map output bytes=18 Map output materialized bytes=28 Input split bytes=160 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=28 Reduce input records=2 Reduce output records=0 Spilled Records=4 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=87 CPU time spent (ms)=1420 Physical memory (bytes) snapshot=870387712 Virtual memory (bytes) snapshot=9336647680 Total committed heap usage (bytes)=2716860416 Peak Map Physical memory (bytes)=457416704 Peak Map Virtual memory (bytes)=3773362176 Peak Reduce Physical memory (bytes)=412971008 Peak Reduce Virtual memory (bytes)=5563285504 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=118 File Output Format Counters Bytes Written=97Job Finished in 17.292 secondsEstimated value of Pi is 3.14120000000000000000status ->trueProcess finished with exit code 0