Apache DolphinScheduler 限制秒级别的定时调度

但历史上出现过因配置的疏忽大意而产生故障时间,如应该配置每分钟执行的工作流被配置长了每秒执行,造成短时间内产生大量工作流实例,对ApacheDolphinScheduler服务可用性和提交任务的Hadoop集群造成影响

背景

Apache DolphinScheduler 定时任务配置采用的 7 位 Crontab 表达式,分别对应秒、分、时、月天、月、周天、年

在团队日常开发工作中,工作流的定时调度一般不会细化到秒级别。但历史上出现过因配置的疏忽大意而产生故障时间,如应该配置每分钟执行的工作流被配置长了每秒执行,造成短时间内产生大量工作流实例,对 Apache DolphinScheduler 服务可用性和提交任务的 Hadoop 集群造成影响。

v2-39879a104e68d51b5604b9371d0cf201_1440w

基于此,团队决定将 DolphinScheduler 中定时任务配置模块的 Crontab 表达式做限制,从平台侧杜绝此类事件发生

方案

我们的方案是从前后端双方面限制 Crontab 表达式的第一位:

  • 前端配置选择不提供“每一秒钟”选项

  • 服务端接口判断第一位为 * 时,返回错误

前端修改

在前端项目中,秒、分、时 均为统一模版(CrontabTime),因此新增 dolphinscheduler-ui/src/components/crontab/modules/second.tsx

只保留两种模式:intervalTime 和 specificTime


/*

 * Licensed to the Apache Software Foundation (ASF) under one or more

 * contributor license agreements.  See the NOTICE file distributed with

 * this work for additional information regarding copyright ownership.

 * The ASF licenses this file to You under the Apache License, Version 2.0

 * (the "License"); you may not use this file except in compliance with

 * the License.  You may obtain a copy of the License at

 *

 *    http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */


import _ from 'lodash'

import { defineComponent, onMounted, PropType, ref, toRefs, watch } from 'vue'

import { NInputNumber, NRadio, NRadioGroup, NSelect } from 'naive-ui'

import { useI18n } from 'vue-i18n'

import { ICrontabI18n } from '../types'

import { isStr, specificList } from '../common'

import styles from '../index.module.scss'


const props = {

    timeMin: {

        type: Number as PropType<number>,

        default: 0

    },

    timeMax: {

        type: Number as PropType<number>,

        default: 60

    },

    intervalPerform: {

        type: Number as PropType<number>,

        default: 5

    },

    intervalStart: {

        type: Number as PropType<number>,

        default: 3

    },

    timeSpecial: {

        type: Number as PropType<number | string>,

        default: 60

    },

    timeValue: {

        type: String as PropType<string>,

        default: '*'

    },

    timeI18n: {

        type: Object as PropType<ICrontabI18n>,

        require: true

    }

}


export default defineComponent({

    name: 'CrontabSecond',

    props,

    emits: ['update:timeValue'],

    setup(props, ctx) {

        const options = Array.from({ length: 60 }, (x, i) => ({

            label: i.toString(),

            value: i

        }))


        const timeRef = ref()

        const radioRef = ref()

        const intervalStartRef = ref(props.intervalStart)

        const intervalPerformRef = ref(props.intervalPerform)

        const specificTimesRef = ref<Array<number>>([])


        /**

         * Parse parameter value

         */

        const analyticalValue = () => {

            const $timeVal = props.timeValue

            // Interval time

            const $interval = isStr($timeVal, '/')

            // Specific time

            const $specific = isStr($timeVal, ',')


            // Positive integer (times)

            if (

                ($timeVal.length === 1 ||

                    $timeVal.length === 2 ||

                    $timeVal.length === 4) &&

                _.isInteger(parseInt($timeVal))

            ) {

                radioRef.value = 'specificTime'

                specificTimesRef.value = [parseInt($timeVal)]

                return

            }


            // Interval times

            if ($interval) {

                radioRef.value = 'intervalTime'

                intervalStartRef.value = parseInt($interval[0])

                intervalPerformRef.value = parseInt($interval[1])

                timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`

                return

            }


            // Specific times

            if ($specific) {

                radioRef.value = 'specificTime'

                specificTimesRef.value = $specific.map((item) => parseInt(item))

                return

            }

        }


        // Interval start time(1)

        const onIntervalStart = (value: number | null) => {

            intervalStartRef.value = value || 0

            if (radioRef.value === 'intervalTime') {

                timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`

            }

        }


        // Interval execution time(2)

        const onIntervalPerform = (value: number | null) => {

            intervalPerformRef.value = value || 0

            if (radioRef.value === 'intervalTime') {

                timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`

            }

        }


        // Specific time

        const onSpecificTimes = (arr: Array<number>) => {

            specificTimesRef.value = arr

            if (radioRef.value === 'specificTime') {

                specificReset()

            }

        }


        // Reset interval time

        const intervalReset = () => {

            timeRef.value = `${intervalStartRef.value}/${intervalPerformRef.value}`

        }


        // Reset specific time

        const specificReset = () => {

            let timeValue = '0'

            if (specificTimesRef.value.length) {

                timeValue = specificTimesRef.value.join(',')

            }

            timeRef.value = timeValue

        }


        const updateRadioTime = (value: string) => {

            switch (value) {

                case 'intervalTime':

                    intervalReset()

                    break

                case 'specificTime':

                    specificReset()

                    break

            }

        }


        watch(

            () => timeRef.value,

            () => ctx.emit('update:timeValue', timeRef.value.toString())

        )


        onMounted(() => analyticalValue())


        return {

            options,

            radioRef,

            intervalStartRef,

            intervalPerformRef,

            specificTimesRef,

            updateRadioTime,

            onIntervalStart,

            onIntervalPerform,

            onSpecificTimes,

            ...toRefs(props)

        }

    },

    render() {

        const { t } = useI18n()


        return (

            <NRadioGroup

                v-model:value={this.radioRef}

                onUpdateValue={this.updateRadioTime}

            >

                <div class={styles['crontab-list']}>

                    <NRadio value={'intervalTime'} />

                    <div class={styles['crontab-list-item']}>

                        <div class={styles['item-text']}>{t(this.timeI18n!.every)}</div>

                        <div class={styles['number-input']}>

                            <NInputNumber

                                defaultValue={5}

                                min={this.timeMin}

                                max={this.timeMax}

                                v-model:value={this.intervalPerformRef}

                                onUpdateValue={this.onIntervalPerform}

                            />

                        </div>

                        <div class={styles['item-text']}>

                                {t(this.timeI18n!.timeCarriedOut)}

                        </div>

                        <div class={styles['number-input']}>

                            <NInputNumber

                                defaultValue={3}

                                min={this.timeMin}

                                max={this.timeMax}

                                v-model:value={this.intervalStartRef}

                                onUpdateValue={this.onIntervalStart}

                            />

                        </div>

                        <div class={styles['item-text']}>{t(this.timeI18n!.timeStart)}</div>

                    </div>

                </div>

                <div class={styles['crontab-list']}>

                    <NRadio value={'specificTime'} />

                    <div class={styles['crontab-list-item']}>

                        <div>{t(this.timeI18n!.specificTime)}</div>

                        <div class={styles['select-input']}>

                            <NSelect

                                multiple

                                options={specificList[this.timeSpecial]}

                                placeholder={t(this.timeI18n!.specificTimeTip)}

                                v-model:value={this.specificTimesRef}

                                onUpdateValue={this.onSpecificTimes}

                            />

                        </div>

                    </div>

                </div>

            </NRadioGroup>

        )

    }

})

服务端

添加Crontab表达式检验(有两处:一处是新增Post接口、另一处是修改PUT接口),直接添加个检测方法供这两处调用:

        if (scheduleParam.getCrontab().startsWith("*")) {            logger.error("The crontab must not start with *");            putMsg(result, Status.CRONTAB_EVERY_SECOND_ERROR);            return result;        }

本文完!