利用Spring的ThreadPoolTaskScheduler实现轻量级任务调度
在单体应用中需要一个使用简单性能可靠的调度功能,要求可以通过Cron表达式配置触发时间并且任务执行时间可以修改并且立即生效,可以在运行时动态增加、停止、重启job等。
经过研究org.springframework.scheduling.annotation.SchedulingConfigurer
满足通过Cron表达式配置触发时间、任务执行时间可以修改但不满足修改cron表达式后立即生效,也不支持运行时动态增加、停止、重启job。
进一步研究发现可以使用org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
满足全部需要。示例代码如下:
初始化ThreadPoolTaskScheduler
:
@Configuration
public class AppConfig {
@Bean
@ConditionalOnBean
public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("job-schedule-");
// *Set whether to wait for scheduled tasks to complete on shutdown,
// not interrupting running tasks and executing all tasks in the queue.
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
/**
* Set the maximum number of seconds that this executor is supposed to block
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down. This is particularly
* useful if your remaining tasks are likely to need access to other resources
* that are also managed by the container.
*/
taskScheduler.setAwaitTerminationSeconds(60);
return taskScheduler;
}
}
业务对象SysJobSchedule
:
public class SysJobSchedule {
private String name;
private String code;
private String comment;
private String cron;
private String className;
private Boolean deleteFlag = false;
private Boolean stopFlag = false;
.....
}
核心SmartScheduleJob
@Service
@Slf4j
@EnableScheduling
public class SmartScheduleJob {
private ConcurrentHashMap futureConcurrentHashMap = new ConcurrentHashMap<>();
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
/**
* 启动调度
* @param jobSchedule
*/
public void startSchedule(SysJobSchedule jobSchedule) {
if (jobSchedule != null) {
log.info("启动Job {}",jobSchedule.getClassName());
stopSchedule(jobSchedule.getClassName());
initSchedule(jobSchedule);
}
}
/**
* 关闭调度
* @param className
*/
public void stopSchedule((SysJobSchedule jobSchedule) {
String className = jobSchedule.getgClassName();
ScheduledFuture scheduledFuture = this.futureConcurrentHashMap.get(className);
if (scheduledFuture != null) {
log.info("==关闭Job:{}", className);
futureConcurrentHashMap.remove(className);
if (!scheduledFuture.isCancelled())
scheduledFuture.cancel(true);
}
}
private void initSchedule(SysJobSchedule jobSchedule) {
if(jobSchedule.getStopFlag())
return;
ScheduledFuture future = this.threadPoolTaskScheduler.schedule(
//1.添加任务内容(Runnable)
() -> {
this.doTask(jobSchedule);
},
//2.设置执行周期(Trigger)
triggerContext -> {
String cron = jobSchedule.getCron();
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
//将ScheduledFuture添加到ConcurrentHashMap中
futureConcurrentHashMap.put(jobSchedule.getClassName(), future);
}
private void doTask(SysJobSchedule jobSchedule) {
doSomething。。。。
}
}
ThreadPoolTaskScheduler
为轻量级任务调度器适用于单体应用,不适合分布式集群部署,由于节点之间没有共享信息,因而会出现多次调度的情况.