定时任务quartz源码
定时任务配置类
@Configuration public class QuartzConfig { @Autowired private DataSource dataSource; @Bean public SchedulerFactoryBean schedulerFactoryBean (QuartzJobFactory quartzJobFactory) throws Exception { SchedulerFactoryBean factoryBean=new SchedulerFactoryBean(); factoryBean.setJobFactory(quartzJobFactory); factoryBean.setConfigLocation(new ClassPathResource("quartz.properties")); factoryBean.setDataSource(dataSource); factoryBean.afterPropertiesSet(); return factoryBean; } @Bean public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) throws Exception { Scheduler scheduler=schedulerFactoryBean.getScheduler(); scheduler.start(); return scheduler; } }
设置属性
创建调度器
获取调度器,如果没有初始化过也就是没有PrpertiesParser,会调用初始化方法,用来加载quartz.properties,并在instantiate()方法中对quartz框架的相关类进行初始化
JobStoreSupport初始化,QuartzScheduler、ThreadExecutor也是这种方式创建的
QuartzSchedulerResources初始化
在这里创建QuartzScheduler
创建调度器时会通过 execute() 开启调度器线程QuartzSchedulerThread
第一次执行的时候paused默认是true,所以此线程会等待1秒,期间其他线程会正常执行。
接下来再看第一张图片中的afterPropertiesSet方法的registerJobsAndTriggers方法
调度器开始执行
开启调度器的监听,调用started方法,再调用togglePause设置pause为false,在这之后QuartzSchedulerThread的run方法中的while循环判断条件为false,可以继续执行后续逻辑
判断是否开启集群的配置
cluster初始化
先调用manager方法,再获取线程池并启动
判断是否是第一次进入,是的话查询失败记录。如果错误记录大于0,加锁
获取集群中所有可能失败的调度器实例的列表
protected void clusterRecover(Connection conn, ListfailedInstances) throws JobPersistenceException { if (failedInstances.size() > 0) { long recoverIds = System.currentTimeMillis(); logWarnIfNonZero(failedInstances.size(), "ClusterManager: detected " + failedInstances.size() + " failed or restarted instances."); try { for (SchedulerStateRecord rec : failedInstances) { getLog().info( "ClusterManager: Scanning for instance \"" + rec.getSchedulerInstanceId() + "\"'s failed in-progress jobs."); 根据调度器实例id获取触发器记录 List firedTriggerRecs = getDelegate() .selectInstancesFiredTriggerRecords(conn, rec.getSchedulerInstanceId()); int acquiredCount = 0; int recoveredCount = 0; int otherCount = 0; Set triggerKeys = new HashSet (); for (FiredTriggerRecord ftRec : firedTriggerRecs) { 可以获取名字和分组 TriggerKey tKey = ftRec.getTriggerKey(); JobKey jKey = ftRec.getJobKey(); triggerKeys.add(tKey); // release blocked triggers.. if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) { 如果是阻塞状态,改为等待状态,等待状态的触发器才可以被触发 getDelegate() .updateTriggerStatesForJobFromOtherState( conn, jKey, STATE_WAITING, STATE_BLOCKED); } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) { 将暂停阻塞状态的触发器改为暂停状态 getDelegate() .updateTriggerStatesForJobFromOtherState( conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED); } // release acquired triggers.. if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) { 将已确认状态改为等待状态 getDelegate().updateTriggerStateFromOtherState( conn, tKey, STATE_WAITING, STATE_ACQUIRED); acquiredCount++; } else if (ftRec.isJobRequestsRecovery()) { // handle jobs marked for recovery that were not fully // executed.. if (jobExists(conn, jKey)) { 处理尚未完全恢复的工作 @SuppressWarnings("deprecation") SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl( "recover_" + rec.getSchedulerInstanceId() + "_" + String.valueOf(recoverIds++), Scheduler.DEFAULT_RECOVERY_GROUP, new Date(ftRec.getScheduleTimestamp())); rcvryTrig.setJobName(jKey.getName()); rcvryTrig.setJobGroup(jKey.getGroup()); rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY); rcvryTrig.setPriority(ftRec.getPriority()); JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp())); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp())); rcvryTrig.setJobDataMap(jd); rcvryTrig.computeFirstFireTime(null); storeTrigger(conn, rcvryTrig, null, false, STATE_WAITING, false, true); recoveredCount++; } else { getLog() .warn( "ClusterManager: failed job '" + jKey + "' no longer exists, cannot schedule recovery."); otherCount++; } } else { otherCount++; } // free up stateful job's triggers if (ftRec.isJobDisallowsConcurrentExecution()) { getDelegate() .updateTriggerStatesForJobFromOtherState( conn, jKey, STATE_WAITING, STATE_BLOCKED); getDelegate() .updateTriggerStatesForJobFromOtherState( conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED); } } getDelegate().deleteFiredTriggers(conn, rec.getSchedulerInstanceId()); // Check if any of the fired triggers we just deleted were the last fired trigger // records of a COMPLETE trigger. int completeCount = 0; for (TriggerKey triggerKey : triggerKeys) { if (getDelegate().selectTriggerState(conn, triggerKey). equals(STATE_COMPLETE)) { List firedTriggers = getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup()); if (firedTriggers.isEmpty()) { if (removeTrigger(conn, triggerKey)) { completeCount++; } } } } logWarnIfNonZero(acquiredCount, "ClusterManager: ......Freed " + acquiredCount + " acquired trigger(s)."); logWarnIfNonZero(completeCount, "ClusterManager: ......Deleted " + completeCount + " complete triggers(s)."); logWarnIfNonZero(recoveredCount, "ClusterManager: ......Scheduled " + recoveredCount + " recoverable job(s) for recovery."); logWarnIfNonZero(otherCount, "ClusterManager: ......Cleaned-up " + otherCount + " other failed job(s)."); if (!rec.getSchedulerInstanceId().equals(getInstanceId())) { getDelegate().deleteSchedulerState(conn, rec.getSchedulerInstanceId()); } } } catch (Throwable e) { throw new JobPersistenceException("Failure recovering jobs: " + e.getMessage(), e); } } }
protected void storeTrigger(Connection conn, OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state, boolean forceState, boolean recovering) throws JobPersistenceException { boolean existingTrigger = triggerExists(conn, newTrigger.getKey()); if ((existingTrigger) && (!replaceExisting)) { throw new ObjectAlreadyExistsException(newTrigger); } try { boolean shouldBepaused; if (!forceState) { shouldBepaused = getDelegate().isTriggerGroupPaused( conn, newTrigger.getKey().getGroup()); if(!shouldBepaused) { shouldBepaused = getDelegate().isTriggerGroupPaused(conn, ALL_GROUPS_PAUSED); if (shouldBepaused) { getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup()); } } if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) { state = STATE_PAUSED; } } if(job == null) { job = retrieveJob(conn, newTrigger.getJobKey()); } if (job == null) { throw new JobPersistenceException("The job (" + newTrigger.getJobKey() + ") referenced by the trigger does not exist."); } if (job.isConcurrentExectionDisallowed() && !recovering) { state = checkBlockedState(conn, job.getKey(), state); } if (existingTrigger) { getDelegate().updateTrigger(conn, newTrigger, state, job); } else { getDelegate().insertTrigger(conn, newTrigger, state, job); } } catch (Exception e) { throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" + newTrigger.getJobKey() + "' job:" + e.getMessage(), e); } }
注意,这里比较关键!
设置pause为false
QuartzSchedulerThread run()代码
@Override public void run() { int acquiresFailed = 0; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresFailed = 0; } if (halted.get()) { break; } } // wait a bit, if reading from job store is consistently // failing (e.g. DB is down or restarting).. if (acquiresFailed > 1) { try { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) { } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... Listtriggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) { if (acquiresFailed == 0) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List bndles = new ArrayList (); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }
JobRunShell的run方法
public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { JobExecutionException jobExEx = null; Job job = jec.getJobInstance(); try { begin(); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't begin execution.", se); break; } // notify job & trigger listeners... try { if (!notifyListenersBeginning(jec)) { break; } } catch(VetoedException ve) { try { CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null); qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode); // QTZ-205 // Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not. if (jec.getTrigger().getNextFireTime() == null) { qs.notifySchedulerListenersFinalized(jec.getTrigger()); } complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } break; } long startTime = System.currentTimeMillis(); long endTime = startTime; // execute the job try { log.debug("Calling execute on job " + jobDetail.getKey()); job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getKey() + " threw a JobExecutionException: ", jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getKey() + " threw an unhandled Exception: ", e); SchedulerException se = new SchedulerException( "Job threw an unhandled exception.", e); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); // notify all job listeners if (!notifyJobListenersComplete(jec, jobExEx)) { break; } CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP; // update the trigger try { instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { // If this happens, there's a bug in the trigger... SchedulerException se = new SchedulerException( "Trigger threw an unhandled exception.", e); qs.notifySchedulerListenersError( "Please report this error to the Quartz developers.", se); } // notify all trigger listeners if (!notifyTriggerListenersComplete(jec, instCode)) { break; } // update job/trigger or re-execute job if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) { jec.incrementRefireCount(); try { complete(false); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } continue; } try { complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); continue; } qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); break; } while (true); } finally { qs.removeInternalSchedulerListener(this); } }@Override public void run() { int acquiresFailed = 0; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresFailed = 0; } if (halted.get()) { break; } } // wait a bit, if reading from job store is consistently // failing (e.g. DB is down or restarting).. if (acquiresFailed > 1) { try { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) { } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List