精进Quartz源码—QuartzSchedulerThread.run() 源码分析(三)

QuartzSchedulerThread.run()是主要处理任务的方法!下面进行分析,方便自己查看!我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!

大家好,又见面了,我是全栈君。

欢迎关注我的公众号: Java编程技术乐园。分享技术,一起精进Quartz!

做一个积极的人

编码、改bug、提升自己

我有一个乐园,面向编程,春暖花开!

QuartzSchedulerThread.run()是主要处理任务的方法!下面进行分析,方便自己查看!
我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!

如还没有阅读过Quartz学习——scheduler.start()启动源码分析请先阅读这一篇!

QuartzSchedulerThread.run()主要是在有可用线程的时候获取需要执行Trigger并出触发进行任务的调度!


解释:

{0} : 表的前缀 ,如表qrtz_trigger ,{0}== qrtz_

{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName

1 、QuartzSchedulerThread.run()源码讲解

/** * <p> * The main processing loop of the <code>QuartzSchedulerThread</code>. * </p> */
@Override
public void run() { 

boolean lastAcquireFailed = false;
while (!halted.get()) { 

try { 

// check if we're supposed to pause...
//检查我们是否应该暂停...
synchronized (sigLock) { 

while (paused && !halted.get()) { 

try { 

// wait until togglePause(false) is called...
//等待直到togglePause(false)被调用... 
sigLock.wait(1000L);
} catch (InterruptedException ignore) { 

}
}
if (halted.get()) { 

break;
}
}
//2.1获取可用线程的数量
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
//将永远是true,由于blockForAvailableThreads的语义...
if(availThreadCount > 0) { 
 // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;//定义触发器集合
long now = System.currentTimeMillis();//获取当前的时间
clearSignaledSchedulingChange();
try { 

//2.2 从jobStore中获取下次要触发的触发器集合
//idleWaitTime == 30L * 1000L; 当调度程序发现没有当前触发器要触发,它应该等待多长时间再检查...
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled()) 
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) { 

if(!lastAcquireFailed) { 

qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) { 

if(!lastAcquireFailed) { 

getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
//判断返回的触发器存在
if (triggers != null && !triggers.isEmpty()) { 

now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
//若有没有触发的Trigger,下次触发时间 next_fire_time 这个会在启动的时候有个默认的misfire机制,如上一篇中分析的 。setNextFireTime(); 即start()启动时候的当前时间。
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) { 
//这里为什么是2 ???不懂???
synchronized (sigLock) { 

if (halted.get()) { 

break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { 

try { 

// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) { 

}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { 

break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
//这种情况发生,如果releaseIfScheduleChangedSignificantly 决定 释放Trigger
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
//将触发器设置为“正在执行”
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) { 

goAhead = !halted.get();
}
if(goAhead) { 

try { 

//2.3 通知JobStore调度程序现在正在触发其先前已获取(保留)的给定触发器(执行其关联的作业)。
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;  //下面的2.3方法返回的数据赋值到bndles
} catch (SchedulerException se) { 

qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) { 

qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
//循环List<TriggerFiredResult> bndles 集合,获取TriggerFiredResult和TriggerFiredBundle等 
for (int i = 0; i < bndles.size(); i++) { 

TriggerFiredResult result =  bndles.get(i);
TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) { 

getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
//如果触发器被暂停,阻塞或其他类似的事件阻止它在这时被触发,或者如果调度器被关闭(暂停),则可以获得'null'
if (bndle == null) { 

qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try { 

//创建 JobRunShell ,并初始化
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) { 

qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) { 

// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
//这种情况不应该发生,因为它表示调度程序正在关闭或线程池或线程池中并发使用的错误 - 文档说不要这样做...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { 
 // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
应该永远不会发生,如果threadPool.blockForAvailableThreads()遵循约定
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
//idleWaitTime == 30L * 1000L; idleWaitVariablness == 7 * 1000; 
//计算getRandomizedIdleWaitTime()的值 : idleWaitTime - random.nextInt(idleWaitVariablness);
synchronized(sigLock) { 

try { 

if(!halted.get()) { 

// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) { 

sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) { 

}
}
} catch(RuntimeException re) { 

getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
//删除对调度程序内容的引用以帮助垃圾回收...
qs = null;
qsRsrcs = null;
}

2.1获取可用线程的数量

// 参考 quartz2.2源码分析3-线程模型:https://my.oschina.net/chengxiaoyuan/blog/674603
//获取线程池,两个实现 SimpleThreadPool 和 ZeroSizeThreadPool, 一般使用SimpleThreadPool,在quartz.properties 中配置
public int blockForAvailableThreads() { 
//方法具体实现,没有可用线程等待。。。。
synchronized(nextRunnableLock) { 

while((availWorkers.size() < 1 || handoffPending) && !isShutdown) { 

try { 

nextRunnableLock.wait(500);
} catch (InterruptedException ignore) { 

}
}
return availWorkers.size();
}
}

2.2 从jobStore中获取下次要触发的触发器集合

//acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//获取要触发的下一个N个触发器的句柄,并由调用调度器将它们标记为“保留”。
//noLaterThan if > 0,那么JobStore应该只返回一个触发器,该触发器不会晚于此值中所表示的时间触发:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException { 

String lockName;
//isAcquireTriggersWithinLock() false , maxCount == 1
if(isAcquireTriggersWithinLock() || maxCount > 1) { 
 
lockName = LOCK_TRIGGER_ACCESS;
} else { 

lockName = null;
}
return executeInNonManagedTXLock(lockName, 
new TransactionCallback<List<OperableTrigger>>() { 

public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { 

//2.2.1 到了这里,继续往下,这里返回需要下次执行的Trigger
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() { 

public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException { 

try { 

//选择所有fired-trigger记录的状态
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) { 

fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) { 

if (fireInstanceIds.contains(tr.getFireInstanceId())) { 

return true;
}
}
return false;
} catch (SQLException e) { 

throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}

2.2.1看 acquireNextTrigger方法

protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException { 

if (timeWindow < 0) { 

throw new IllegalArgumentException();
}
List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
long firstAcquiredTriggerFireTime = 0;
do { 

currentLoopCount ++;
try { 

//2.2.2查询下次触发的触发器 Triggerkey ,在继续看这个方法里面
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
// No trigger is ready to fire yet.
//还没有触发器准备好。 keys 为上面查询出来的!!!
if (keys == null || keys.size() == 0)
return acquiredTriggers;
//循环TriggerKey name 和 group
for(TriggerKey triggerKey: keys) { 

//如果我们的触发器不再可用,请尝试一个新的触发器。 (retrieve:检索)
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
//查询Trigger 并封装!retrieveTrigger 的 sql
"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
if(nextTrigger == null) { 

continue; // next trigger
}
//如果触发器的作业设置为@DisallowConcurrentExecution,并且它已经添加到结果中,则将其放回到timeTriggers设置并继续搜索下
//一个触发器。
JobKey jobKey = nextTrigger.getJobKey();
//为给定的作业名称/组名称选择JobDetail对象。 封装成JobDetailImpl 对象
JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper());
//selectJobDetail 的sql :"SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
if (job.isConcurrentExectionDisallowed()) { 
//相关联的Job类是否携带DisallowConcurrentExecution注释。
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { 

continue; // next trigger
} else { 

acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
//我们现在有一个获取触发器,让我们添加到返回列表。 如果我们的触发器不再处于预期状态,请尝试新的触发器。
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
//updateTriggerStateFromOtherState 如果给定触发器处于给定的旧状态,则将其更新为给定的新状态
//第一个? newState ; 第四个 ? oldState updateTriggerStateFromOtherState的sql
"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?"
if (rowsUpdated <= 0) { 

continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
//插入一个fired Trigger, 更新当前执行的时间(FIRED_TIME)和下次要执行的时间(SCHED_TIME)
//如下看详情请查看源码的这个方法 以及执行的sql,主要代码如下注释:
/** "INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" //FIRED_TIME ps.setBigDecimal(5, new BigDecimal(String.valueOf(System.currentTimeMillis()))); //SCHED_TIME ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime()))); */
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
//下次执行的Trigger放入acquiredTriggers List中 
acquiredTriggers.add(nextTrigger);
if(firstAcquiredTriggerFireTime == 0)
firstAcquiredTriggerFireTime = nextTrigger.getNextFireTime().getTime();
}
// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { 

continue;
}
// We are done with the while loop.
break;
} catch (Exception e) { 

throw new JobPersistenceException(
"Couldn't acquire next trigger: " + e.getMessage(), e);
}
} while (true);
// Return the acquired trigger list
//返回获取的触发器列表
return acquiredTriggers;	
} 
}

2.2.2查询下次触发的触发器 selectTriggerToAcquire

/** 选择下一次需要触发器的Trigger,它将在两个给定时间戳之间按照触发时间的升序触发,然后按优先级降序。 *参数: conn数据库Connection noLaterThan触发器的getNextFireTime()的最大值(独占) noEarlierThan触发器的最大值getMisfireTime() maxCount允许在返回列表中获取的最大触发器数量。 备注: noLaterThan : System.currentTimeMillis() + 30*1000L(30秒) noEarlierThan :System.currentTimeMillis() + 60*1000L(one minute:一分钟) */
selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
最终这个方法执行的sql:
"SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"
//搜索出下次需要执行的Trigger
//NEXT_FIRE_TIME <= ?(noLaterThan) and NEXT_FIRE_TIME >= ?(noEarlierThan) noEarlierThan <= 下次触发的时间 <= noLaterThan

注:这里的sql和MISFIRE_INSTR 这个值有关系,默认情况MISFIRE_INSTR = 0;除非你自己设置的MISFIRE处理机制!
这个地方要想弄明白,需要去了解Quartz源码——Quartz调度器的Misfire处理规则(四)——scheduler.start()启动源码分析(二)中2.1 恢复job recoverJobs(); 中讲解代码!

我测试执行的一条sql:

 com.mysql.jdbc.JDBC4PreparedStatement@72163468: 
"SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM qrtz_TRIGGERS WHERE SCHED_NAME = 'dufy_test' AND TRIGGER_STATE = 'WAITING' AND NEXT_FIRE_TIME <= 1481126708931 AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= 1481126618934)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"
重点这里:(MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= 1481126618934))

提个小小问题, 什么情况MISFIRE_INSTR == -1 ???

2.3 通知JobStore调度程序现在正在触发其先前已获取(保留)的给定触发器(执行其关联的作业)。

public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException { 

return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new TransactionCallback<List<TriggerFiredResult>>() { 

public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException { 

List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
TriggerFiredResult result;
for (OperableTrigger trigger : triggers) { 

try { 

TriggerFiredBundle bundle = triggerFired(conn, trigger);//这个里面比较复杂,分析!!!
//然后封装成TriggerFiredResult 返回, 回到2.3开始的地方run方法中!
result = new TriggerFiredResult(bundle);
} catch (JobPersistenceException jpe) { 

result = new TriggerFiredResult(jpe);
} catch(RuntimeException re) { 

result = new TriggerFiredResult(re);
}
results.add(result);
}
return results;
}
},
new TransactionValidator<List<TriggerFiredResult>>() { 

....
});
}
protected TriggerFiredBundle triggerFired(Connection conn,
OperableTrigger trigger)
throws JobPersistenceException { 

JobDetail job;
Calendar cal = null;
// Make sure trigger wasn't deleted, paused, or completed...
//确保触发器未被删除,暂停或完成...
try { 
 // if trigger was deleted, state will be STATE_DELETED
//如果触发器被删除,状态将是STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
//查询到数据,返回查询到的当前状态,否则返回 STATE_DELETED ,删除状态
//sql:"SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
if (!state.equals(STATE_ACQUIRED)) { 

return null;
}
} catch (SQLException e) { 

throw new JobPersistenceException("Couldn't select trigger state: "
+ e.getMessage(), e);
}
try { 

//恢复job,根据 trigger.getJobKey() 获取Job的name 和 group
job = retrieveJob(conn, trigger.getJobKey());
//sql:"SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
if (job == null) { 
 return null; }
} catch (JobPersistenceException jpe) { 

try { 

getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
getDelegate().updateTriggerState(conn, trigger.getKey(),
STATE_ERROR);
} catch (SQLException sqle) { 

getLog().error("Unable to set trigger state to ERROR.", sqle);
}
throw jpe;
}
// trigger 有CalendarName 去查询qrtz_calendar 表
if (trigger.getCalendarName() != null) { 

cal = retrieveCalendar(conn, trigger.getCalendarName());
//sql: "SELECT * FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"
if (cal == null) { 
 return null; }
}
try { 

//更新触发的触发器记录。 将更新字段“触发实例”,“触发时间”和“状态”。STATE_EXECUTING:执行状态
getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
//sql:"UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?"
//ps.setBigDecimal(2, new BigDecimal(String.valueOf(System.currentTimeMillis())));
//ps.setBigDecimal(3, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime())));
} catch (SQLException e) { 

throw new JobPersistenceException("Couldn't insert fired trigger: "
+ e.getMessage(), e);
}
Date prevFireTime = trigger.getPreviousFireTime();
// call triggered - to update the trigger's next-fire-time state...
// 2.3.1 更新触发器的下一个触发时间状态...
trigger.triggered(cal);
String state = STATE_WAITING;
boolean force = true;
//相关联的Job类是否携带DisallowConcurrentExecution注释。
if (job.isConcurrentExectionDisallowed()) { 

state = STATE_BLOCKED;
force = false;
try { 

getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_WAITING);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_ACQUIRED);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_PAUSED_BLOCKED, STATE_PAUSED);
} catch (SQLException e) { 

throw new JobPersistenceException(
"Couldn't update states of blocked triggers: "
+ e.getMessage(), e);
}
} 
//getNextFireTime == null 说明trigger执行完成,没有下次触发的时间了
if (trigger.getNextFireTime() == null) { 

state = STATE_COMPLETE;
force = true;
}
//2.3.2插入或者更新一个Trigger ,进入看看
storeTrigger(conn, trigger, job, true, state, force, false);
//清除'dirty'标志(将dirty标志设置为false)。
job.getJobDataMap().clearDirtyFlag();
//创建一个 TriggerFiredBundle的对象,封装数据
return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
.equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
}

2.3.1 更新触发器的下一个触发时间状态…

//当调度程序决定“触发”触发器(执行关联的作业)时调用,以便为触发器更新自身以进行下一次触发(如果有)。 
public void triggered(Calendar calendar) { 

timesTriggered++; //默认 timesTriggered==0
previousFireTime = nextFireTime; //下次执行的next_time赋值给pre_time
nextFireTime = getFireTimeAfter(nextFireTime);//获取下次触发的时间
while (nextFireTime != null && calendar != null
&& !calendar.isTimeIncluded(nextFireTime.getTime())) { 

nextFireTime = getFireTimeAfter(nextFireTime);
if(nextFireTime == null)
break;
//avoid infinite loop
java.util.Calendar c = java.util.Calendar.getInstance();
c.setTime(nextFireTime);
if (c.get(java.util.Calendar.YEAR) > YEAR_TO_GIVEUP_SCHEDULING_AT) { 

nextFireTime = null;
}
}
}

2.3.2插入或者更新一个Trigger ,进入看看

protected void storeTrigger(Connection conn,
OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
boolean forceState, boolean recovering)
throws JobPersistenceException { 

//检查Trigger是否存在 , trigger 的name 和 group
boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
if ((existingTrigger) && (!replaceExisting)) { 
 
throw new ObjectAlreadyExistsException(newTrigger); 
}
try { 

boolean shouldBepaused;
if (!forceState) { 

shouldBepaused = getDelegate().isTriggerGroupPaused(
conn, newTrigger.getKey().getGroup());
if(!shouldBepaused) { 

shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
ALL_GROUPS_PAUSED);
if (shouldBepaused) { 

getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
}
}
if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) { 

state = STATE_PAUSED;
}
}
//job 为null ,重新去查询一遍 jobDetail 根据 Trigger的name和group
if(job == null) { 

job = getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), getClassLoadHelper());
}
if (job == null) { 

throw new JobPersistenceException("The job ("
+ newTrigger.getJobKey()
+ ") referenced by the trigger does not exist.");
}
//是否有注解 ,上面传过来的recovering参数 == false
if (job.isConcurrentExectionDisallowed() && !recovering) { 
 
state = checkBlockedState(conn, job.getKey(), state);
}
//existingTrigger存在的话,进行更新操作,否则插入操作
if (existingTrigger) { 

//2.3.2.1这个里面还是很复杂,贴代码出来了 getDelegate().updateTrigger(conn, newTrigger, state, job);
getDelegate().updateTrigger(conn, newTrigger, state, job);
} else { 

//插入和更新很多类似地方 具体代码不贴出来了 !
getDelegate().insertTrigger(conn, newTrigger, state, job);
}
} catch (Exception e) { 

throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" 
+ newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
}
}

2.3.2.1这个里面还是很复杂,贴代码出来了

//更新基本触发器数据。
public int updateTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException { 

// save some clock cycles by unnecessarily writing job data blob ...
//通过不必要地写入作业数据blob来保存一些时钟周期...
boolean updateJobData = trigger.getJobDataMap().isDirty();//确定Map是否标记为dirty。
ByteArrayOutputStream baos = null;
if(updateJobData && trigger.getJobDataMap().size() > 0) { 

baos = serializeJobData(trigger.getJobDataMap());
}
PreparedStatement ps = null;
int insertResult = 0;
try { 

if(updateJobData) { 

ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
---------------------------------------------------------------------
"UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
---------------------------------------------------------------------
} else { 

ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA));
---------------------------------------------------------------------
"UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
---------------------------------------------------------------------
}
ps.setString(1, trigger.getJobKey().getName());
ps.setString(2, trigger.getJobKey().getGroup());
ps.setString(3, trigger.getDescription());
long nextFireTime = -1;
if (trigger.getNextFireTime() != null) { 
//触发器下次执行的时间不为null
//备注:一般执行完成,即是complete 状态,getNextFireTime == null
nextFireTime = trigger.getNextFireTime().getTime();
}
ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime)));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) { 
//触发器之前执行的时间不为 null
//备注:触发器第一次执行的时候 getNextFireTime 为 null
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(6, state);
//获取对于代理 ;如JDBC 或者RAM
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(7, type);
ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) { 

endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime)));
ps.setString(10, trigger.getCalendarName());
ps.setInt(11, trigger.getMisfireInstruction());
ps.setInt(12, trigger.getPriority());
if(updateJobData) { 

setBytes(ps, 13, baos);
ps.setString(14, trigger.getKey().getName());
ps.setString(15, trigger.getKey().getGroup());
} else { 

ps.setString(13, trigger.getKey().getName());
ps.setString(14, trigger.getKey().getGroup());
}
insertResult = ps.executeUpdate();
if(tDel == null)
updateBlobTrigger(conn, trigger);
else
//更新扩展触发器属性
tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail);
---------------------------------------------------------------
"UPDATE {0}SIMPLE_TRIGGERS SET REPEAT_COUNT = ?, REPEAT_INTERVAL = ?, TIMES_TRIGGERED = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
---------------------------------------------------------------------
} finally { 

closeStatement(ps);
}
return insertResult;//返回执行的结果 为int 
}

Quartz专栏系列

1.精进Quartz——Quartz大致介绍(一)
2.精进Quartz——Quartz简单入门Demo(二)
3.精进Quartz——Spring和Quartz集成详解
4.精进Quartz——SSMM(Spring+SpringMVC+Mybatis+Mysql)和Quartz集成详解(四)
5.精进Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)
6.精进Quartz源码——scheduler.start()启动源码分析(二)
7.精进Quartz源码——QuartzSchedulerThread.run() 源码分析(三)
8.精进Quartz源码——Quartz调度器的Misfire处理规则(四)http://blog.csdn.net/u010648555/article/details/53672738)


谢谢你的阅读,如果您觉得这篇博文对你有帮助,请点赞或者喜欢,让更多的人看到!祝你每天开心愉快!


不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!

博客首页 : http://blog.csdn.net/u010648555

愿你我在人生的路上能都变成最好的自己,能够成为一个独挡一面的人
精进Quartz源码—QuartzSchedulerThread.run() 源码分析(三)

© 每天都在变得更好的阿飞云

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/121203.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • java webservice 实例

    java webservice 实例javawebservice实例 byhgwayen实验目的1.实现一个具有WebService功能的分布式对象类,能够实现求两个整数的最大值的功能。2.在另一台计算机(虚拟机)上,编写客户端程序,通过WebService技术访问远程的基于WebService的分布式对象Max,达到求两个整数的最大值的功能。一、创建并运行HelloWorldWebService.java。1.在classpath路径下新建/rs_midtest、/rs_

  • iOS常见算法(二分法 冒泡 选择 快排)[通俗易懂]

    iOS常见算法(二分法 冒泡 选择 快排)

  • 2021年 全网最细大数据学习笔记(一):初识 Hadoop

    2021年 全网最细大数据学习笔记(一):初识 Hadoop文章目录一、大数据1、大数据的定义2、大数据的特点3、大数据行业应用4、Hadoop与大数据5、其他大数据处理平台6、大数据人才三、一、大数据随着近来计算机技术和互联网的发展,大数据这个名词越来越多地进入到我们的视野中,大数据的快速发展也无时刻影响着我们的生活。1、大数据的定义大数据从字面来理解就是大量的数据。日常生活离不开数据,可以说每时每刻都在产生着数据。例如,一分钟可以做些什么事呢?在日常生活中,一分钟可能连一页书都看不完。但是一分钟内产生的数据却是庞大的。据统计,在一分钟内,YouTu

  • 4个线程池_vc2010线程win32线程已退出

    4个线程池_vc2010线程win32线程已退出在windows中,系统提供了QueueUserWorkItem函数实现异步调用,这个函数相当于在线程池中建立多个用户工作项目,跟普通线程机制一样,线程池也有线程的同步等机制。 【函数原型】BOOLWINAPIQueueUserWorkItem(__inLPTHREAD_START_ROUTINEFunction,__inP…

  • C++ int转char_c语言中int转char

    C++ int转char_c语言中int转char文章目录1.通过ascii码:2.直接转换(更简单,推荐)1.通过ascii码:chara=’0′;intia=(int)a;/*notethattheintcastisnotnecessary–intia=awouldsuffice*/cout<<ia<<endl;结果如下:可以看出这种方法得到的其实是ch…

  • JAVA 正则表达式_正则表达式文档

    JAVA 正则表达式_正则表达式文档一、校验数字的表达式1数字:^[0-9]*$2n位的数字:^\d{n}$3至少n位的数字:^\d{n,}$4m-n位的数字:^\d{m,n}$5零和非零开头的数字:^(0|[1-9][0-9]*)$6非零开头的最多带两位小数的数字:^([1-9][0-9]*)+(.[0-9]{1,2})?$7带1-2位小数的正数或负数:^(\-)?\d+(\.\d{1,2})?…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号