利用Spring的ThreadPoolTaskScheduler实现轻量级任务调度


在单体应用中需要一个使用简单性能可靠的调度功能,要求可以通过Cron表达式配置触发时间并且任务执行时间可以修改并且立即生效,可以在运行时动态增加、停止、重启job等。

经过研究org.springframework.scheduling.annotation.SchedulingConfigurer满足通过Cron表达式配置触发时间、任务执行时间可以修改但不满足修改cron表达式后立即生效,也不支持运行时动态增加、停止、重启job。

进一步研究发现可以使用org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler满足全部需要。示例代码如下:

初始化ThreadPoolTaskScheduler:

@Configuration
public class AppConfig {
    @Bean
    @ConditionalOnBean
    public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setThreadNamePrefix("job-schedule-");
        //	 *Set whether to wait for scheduled tasks to complete on shutdown,
        //	  not interrupting running tasks and executing all tasks in the queue.
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        /**
         *  Set the maximum number of seconds that this executor is supposed to block
         *  on shutdown in order to wait for remaining tasks to complete their execution
         *  before the rest of the container continues to shut down. This is particularly
         *  useful if your remaining tasks are likely to need access to other resources
         *  that are also managed by the container.
         */
        taskScheduler.setAwaitTerminationSeconds(60);
        return taskScheduler;
    }
}

业务对象SysJobSchedule:

public class SysJobSchedule {
    private String name;
    private String code;
    private String comment;
    private String cron;
    private String className;
    private Boolean deleteFlag = false;
    private Boolean stopFlag = false;
    .....
 }

核心SmartScheduleJob

@Service
@Slf4j
@EnableScheduling
public class SmartScheduleJob {
    private ConcurrentHashMap futureConcurrentHashMap = new ConcurrentHashMap<>();
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    
      /**
     * 启动调度
     * @param jobSchedule
     */
    public void startSchedule(SysJobSchedule jobSchedule) {
        if (jobSchedule != null) {
            log.info("启动Job {}",jobSchedule.getClassName());
            stopSchedule(jobSchedule.getClassName());
            initSchedule(jobSchedule);
        }
    }

    /**
     * 关闭调度
     * @param className
     */
    public void stopSchedule((SysJobSchedule jobSchedule) {
        String className = jobSchedule.getgClassName();
        ScheduledFuture scheduledFuture = this.futureConcurrentHashMap.get(className);
        if (scheduledFuture != null) {
            log.info("==关闭Job:{}", className);
            futureConcurrentHashMap.remove(className);
            if (!scheduledFuture.isCancelled())
                scheduledFuture.cancel(true);
        }
    }


    private void initSchedule(SysJobSchedule jobSchedule) {
        if(jobSchedule.getStopFlag())
            return;
        ScheduledFuture future = this.threadPoolTaskScheduler.schedule(
                //1.添加任务内容(Runnable)
                () -> {
                    this.doTask(jobSchedule);
                },
                //2.设置执行周期(Trigger)
                triggerContext -> {                              
                    String cron = jobSchedule.getCron();                 
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                }
        );
        //将ScheduledFuture添加到ConcurrentHashMap中
        futureConcurrentHashMap.put(jobSchedule.getClassName(), future);
    }

    private void doTask(SysJobSchedule jobSchedule) {
      doSomething。。。。
    }

}

ThreadPoolTaskScheduler为轻量级任务调度器适用于单体应用,不适合分布式集群部署,由于节点之间没有共享信息,因而会出现多次调度的情况.