QuartzSchedulerThread.run()是主要處理任務的方法!下面進行分析,友善自己檢視!
我都是分析的jobStore 方式為jdbc的SimpleTrigger!RAM的方式類似分析方式!
Quartz學習——scheduler.start()啟動源碼分析:http://blog.csdn.net/u010648555/article/details/53520314
QuartzSchedulerThread.run()主要是在有可用線程的時候擷取需要執行Trigger并出觸發進行任務的排程!
解釋:
{0} : 表的字首 ,如表qrtz_trigger ,{0}== qrtz_
{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName
1.QuartzSchedulerThread.run()源碼講解
/**
* <p>
* The main processing loop of the <code>QuartzSchedulerThread</code>.
* </p>
*/
@Override
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
//檢查我們是否應該暫停...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
//等待直到togglePause(false)被調用...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
//2.1擷取可用線程的數量
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
//将永遠是true,由于blockForAvailableThreads的語義...
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;//定義觸發器集合
long now = System.currentTimeMillis();//擷取目前的時間
clearSignaledSchedulingChange();
try {
//2.2 從jobStore中擷取下次要觸發的觸發器集合
//idleWaitTime == 30L * 1000L; 當排程程式發現沒有目前觸發器要觸發,它應該等待多長時間再檢查...
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
//判斷傳回的觸發器存在
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
//若有沒有觸發的Trigger,下次觸發時間 next_fire_time 這個會在啟動的時候有個預設的misfire機制,如上一篇中分析的 。setNextFireTime(); 即start()啟動時候的目前時間。
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {//這裡為什麼是2 ???不懂???
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
//這種情況發生,如果releaseIfScheduleChangedSignificantly 決定 釋放Trigger
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
//将觸發器設定為“正在執行”
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
//2.3 通知JobStore排程程式現在正在觸發其先前已擷取(保留)的給定觸發器(執行其關聯的作業)。
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res; //下面的2.3方法傳回的資料指派到bndles
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
//循環List<TriggerFiredResult> bndles 集合,擷取TriggerFiredResult和TriggerFiredBundle等
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
//如果觸發器被暫停,阻塞或其他類似的事件阻止它在這時被觸發,或者如果排程器被關閉(暫停),則可以獲得'null'
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
//建立 JobRunShell ,并初始化
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
//這種情況不應該發生,因為它表示排程程式正在關閉或線程池或線程池中并發使用的錯誤 - 文檔說不要這樣做...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
////應該永遠不會發生,如果threadPool.blockForAvailableThreads()遵循約定
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
//idleWaitTime == 30L * 1000L; idleWaitVariablness == 7 * 1000;
//計算getRandomizedIdleWaitTime()的值 : idleWaitTime - random.nextInt(idleWaitVariablness);
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
//删除對排程程式内容的引用以幫助垃圾回收...
qs = null;
qsRsrcs = null;
}
2.1擷取可用線程的數量
// 參考 quartz2.2源碼分析3-線程模型:https://my.oschina.net/chengxiaoyuan/blog/674603
//擷取線程池,兩個實作 SimpleThreadPool 和 ZeroSizeThreadPool, 一般使用SimpleThreadPool,在quartz.properties 中配置
public int blockForAvailableThreads() {//方法具體實作,沒有可用線程等待。。。。
synchronized(nextRunnableLock) {
while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
return availWorkers.size();
}
}
2.2 從jobStore中擷取下次要觸發的觸發器集合
//acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//擷取要觸發的下一個N個觸發器的句柄,并由調用排程器将它們标記為“保留”。
//noLaterThan if > 0,那麼JobStore應該隻傳回一個觸發器,該觸發器不會晚于此值中所表示的時間觸發:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
//isAcquireTriggersWithinLock() false , maxCount == 1
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
//2.2.1 到了這裡,繼續往下,這裡傳回需要下次執行的Trigger
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
//選擇所有fired-trigger記錄的狀态
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
2.2.1看 acquireNextTrigger方法
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
if (timeWindow < 0) {
throw new IllegalArgumentException();
}
List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
long firstAcquiredTriggerFireTime = 0;
do {
currentLoopCount ++;
try {
//2.2.2查詢下次觸發的觸發器 Triggerkey ,在繼續看這個方法裡面
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
// No trigger is ready to fire yet.
//還沒有觸發器準備好。 keys 為上面查詢出來的!!!
if (keys == null || keys.size() == 0)
return acquiredTriggers;
//循環TriggerKey name 和 group
for(TriggerKey triggerKey: keys) {
//如果我們的觸發器不再可用,請嘗試一個新的觸發器。 (retrieve:檢索)
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
//查詢Trigger 并封裝!retrieveTrigger 的 sql
"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
if(nextTrigger == null) {
continue; // next trigger
}
//如果觸發器的作業設定為@DisallowConcurrentExecution,并且它已經添加到結果中,則将其放回到timeTriggers設定并繼續搜尋下
//一個觸發器。
JobKey jobKey = nextTrigger.getJobKey();
//為給定的作業名稱/組名稱選擇JobDetail對象。 封裝成JobDetailImpl 對象
JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper());
//selectJobDetail 的sql :"SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
if (job.isConcurrentExectionDisallowed()) {//相關聯的Job類是否攜帶DisallowConcurrentExecution注釋。
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
continue; // next trigger
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
//我們現在有一個擷取觸發器,讓我們添加到傳回清單。 如果我們的觸發器不再處于預期狀态,請嘗試新的觸發器。
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
//updateTriggerStateFromOtherState 如果給定觸發器處于給定的舊狀态,則将其更新為給定的新狀态
//第一個? newState ; 第四個 ? oldState updateTriggerStateFromOtherState的sql
"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?"
if (rowsUpdated <= 0) {
continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
//插入一個fired Trigger, 更新目前執行的時間(FIRED_TIME)和下次要執行的時間(SCHED_TIME)
//如下看詳情請檢視源碼的這個方法 以及執行的sql,主要代碼如下注釋:
/**
"INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
//FIRED_TIME
ps.setBigDecimal(5, new BigDecimal(String.valueOf(System.currentTimeMillis())));
//SCHED_TIME
ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime())));
*/
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
//下次執行的Trigger放入acquiredTriggers List中
acquiredTriggers.add(nextTrigger);
if(firstAcquiredTriggerFireTime == 0)
firstAcquiredTriggerFireTime = nextTrigger.getNextFireTime().getTime();
}
// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
continue;
}
// We are done with the while loop.
break;
} catch (Exception e) {
throw new JobPersistenceException(
"Couldn't acquire next trigger: " + e.getMessage(), e);
}
} while (true);
// Return the acquired trigger list
//傳回擷取的觸發器清單
return acquiredTriggers;
}
}
2.2.2查詢下次觸發的觸發器 selectTriggerToAcquire
/**
選擇下一次需要觸發器的Trigger,它将在兩個給定時間戳之間按照觸發時間的升序觸發,然後按優先級降序。
*參數:
conn資料庫Connection
noLaterThan觸發器的getNextFireTime()的最大值(獨占)
noEarlierThan觸發器的最大值getMisfireTime()
maxCount允許在傳回清單中擷取的最大觸發器數量。
備注:
noLaterThan : System.currentTimeMillis() + 30*1000L(30秒)
noEarlierThan :System.currentTimeMillis() + 60*1000L(one minute:一分鐘)
*/
selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
最終這個方法執行的sql:
"SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"
//搜尋出下次需要執行的Trigger
//NEXT_FIRE_TIME <= ?(noLaterThan) and NEXT_FIRE_TIME >= ?(noEarlierThan) noEarlierThan <= 下次觸發的時間 <= noLaterThan
注:這裡的sql和MISFIRE_INSTR 這個值有關系,預設情況MISFIRE_INSTR = 0;除非你自己設定的MISFIRE處理機制!
這個地方要想弄明白,需要去了解Quartz源碼——Quartz排程器的Misfire處理規則(四)——scheduler.start()啟動源碼分析(二)中2.1 恢複job recoverJobs(); 中講解代碼!
我測試執行的一條sql:
com.mysql.jdbc.JDBC4PreparedStatement@72163468:
"SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM qrtz_TRIGGERS WHERE SCHED_NAME = 'dufy_test' AND TRIGGER_STATE = 'WAITING' AND NEXT_FIRE_TIME <= 1481126708931 AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= 1481126618934)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"
重點這裡:(MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= 1481126618934))
提個小小問題, 什麼情況MISFIRE_INSTR == -1 ???
2.3 通知JobStore排程程式現在正在觸發其先前已擷取(保留)的給定觸發器(執行其關聯的作業)。
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new TransactionCallback<List<TriggerFiredResult>>() {
public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
TriggerFiredResult result;
for (OperableTrigger trigger : triggers) {
try {
TriggerFiredBundle bundle = triggerFired(conn, trigger);//這個裡面比較複雜,分析!!!
//然後封裝成TriggerFiredResult 傳回, 回到2.3開始的地方run方法中!
result = new TriggerFiredResult(bundle);
} catch (JobPersistenceException jpe) {
result = new TriggerFiredResult(jpe);
} catch(RuntimeException re) {
result = new TriggerFiredResult(re);
}
results.add(result);
}
return results;
}
},
new TransactionValidator<List<TriggerFiredResult>>() {
....
});
}
protected TriggerFiredBundle triggerFired(Connection conn,
OperableTrigger trigger)
throws JobPersistenceException {
JobDetail job;
Calendar cal = null;
// Make sure trigger wasn't deleted, paused, or completed...
//確定觸發器未被删除,暫停或完成...
try { // if trigger was deleted, state will be STATE_DELETED
//如果觸發器被删除,狀态将是STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
//查詢到資料,傳回查詢到的目前狀态,否則傳回 STATE_DELETED ,删除狀态
//sql:"SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't select trigger state: "
+ e.getMessage(), e);
}
try {
//恢複job,根據 trigger.getJobKey() 擷取Job的name 和 group
job = retrieveJob(conn, trigger.getJobKey());
//sql:"SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
if (job == null) { return null; }
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
getDelegate().updateTriggerState(conn, trigger.getKey(),
STATE_ERROR);
} catch (SQLException sqle) {
getLog().error("Unable to set trigger state to ERROR.", sqle);
}
throw jpe;
}
// trigger 有CalendarName 去查詢qrtz_calendar 表
if (trigger.getCalendarName() != null) {
cal = retrieveCalendar(conn, trigger.getCalendarName());
//sql: "SELECT * FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"
if (cal == null) { return null; }
}
try {
//更新觸發的觸發器記錄。 将更新字段“觸發執行個體”,“觸發時間”和“狀态”。STATE_EXECUTING:執行狀态
getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
//sql:"UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?"
//ps.setBigDecimal(2, new BigDecimal(String.valueOf(System.currentTimeMillis())));
//ps.setBigDecimal(3, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime())));
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't insert fired trigger: "
+ e.getMessage(), e);
}
Date prevFireTime = trigger.getPreviousFireTime();
// call triggered - to update the trigger's next-fire-time state...
// 2.3.1 更新觸發器的下一個觸發時間狀态...
trigger.triggered(cal);
String state = STATE_WAITING;
boolean force = true;
//相關聯的Job類是否攜帶DisallowConcurrentExecution注釋。
if (job.isConcurrentExectionDisallowed()) {
state = STATE_BLOCKED;
force = false;
try {
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_WAITING);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_ACQUIRED);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_PAUSED_BLOCKED, STATE_PAUSED);
} catch (SQLException e) {
throw new JobPersistenceException(
"Couldn't update states of blocked triggers: "
+ e.getMessage(), e);
}
}
//getNextFireTime == null 說明trigger執行完成,沒有下次觸發的時間了
if (trigger.getNextFireTime() == null) {
state = STATE_COMPLETE;
force = true;
}
//2.3.2插入或者更新一個Trigger ,進入看看
storeTrigger(conn, trigger, job, true, state, force, false);
//清除'dirty'标志(将dirty标志設定為false)。
job.getJobDataMap().clearDirtyFlag();
//建立一個 TriggerFiredBundle的對象,封裝資料
return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
.equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
}
2.3.1 更新觸發器的下一個觸發時間狀态...
//當排程程式決定“觸發”觸發器(執行關聯的作業)時調用,以便為觸發器更新自身以進行下一次觸發(如果有)。
public void triggered(Calendar calendar) {
timesTriggered++; //預設 timesTriggered==0
previousFireTime = nextFireTime; //下次執行的next_time指派給pre_time
nextFireTime = getFireTimeAfter(nextFireTime);//擷取下次觸發的時間
while (nextFireTime != null && calendar != null
&& !calendar.isTimeIncluded(nextFireTime.getTime())) {
nextFireTime = getFireTimeAfter(nextFireTime);
if(nextFireTime == null)
break;
//avoid infinite loop
java.util.Calendar c = java.util.Calendar.getInstance();
c.setTime(nextFireTime);
if (c.get(java.util.Calendar.YEAR) > YEAR_TO_GIVEUP_SCHEDULING_AT) {
nextFireTime = null;
}
}
}
2.3.2插入或者更新一個Trigger ,進入看看
protected void storeTrigger(Connection conn,
OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
boolean forceState, boolean recovering)
throws JobPersistenceException {
//檢查Trigger是否存在 , trigger 的name 和 group
boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
if ((existingTrigger) && (!replaceExisting)) {
throw new ObjectAlreadyExistsException(newTrigger);
}
try {
boolean shouldBepaused;
if (!forceState) {
shouldBepaused = getDelegate().isTriggerGroupPaused(
conn, newTrigger.getKey().getGroup());
if(!shouldBepaused) {
shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
ALL_GROUPS_PAUSED);
if (shouldBepaused) {
getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
}
}
if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) {
state = STATE_PAUSED;
}
}
//job 為null ,重新去查詢一遍 jobDetail 根據 Trigger的name和group
if(job == null) {
job = getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), getClassLoadHelper());
}
if (job == null) {
throw new JobPersistenceException("The job ("
+ newTrigger.getJobKey()
+ ") referenced by the trigger does not exist.");
}
//是否有注解 ,上面傳過來的recovering參數 == false
if (job.isConcurrentExectionDisallowed() && !recovering) {
state = checkBlockedState(conn, job.getKey(), state);
}
//existingTrigger存在的話,進行更新操作,否則插入操作
if (existingTrigger) {
//2.3.2.1這個裡面還是很複雜,貼代碼出來了 getDelegate().updateTrigger(conn, newTrigger, state, job);
getDelegate().updateTrigger(conn, newTrigger, state, job);
} else {
//插入和更新很多類似地方 具體代碼不貼出來了 !
getDelegate().insertTrigger(conn, newTrigger, state, job);
}
} catch (Exception e) {
throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '"
+ newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
}
}
2.3.2.1這個裡面還是很複雜,貼代碼出來了
//更新基本觸發器資料。
public int updateTrigger(Connection conn, OperableTrigger trigger, String state,
JobDetail jobDetail) throws SQLException, IOException {
// save some clock cycles by unnecessarily writing job data blob ...
//通過不必要地寫入作業資料blob來儲存一些時鐘周期...
boolean updateJobData = trigger.getJobDataMap().isDirty();//确定Map是否标記為dirty。
ByteArrayOutputStream baos = null;
if(updateJobData && trigger.getJobDataMap().size() > 0) {
baos = serializeJobData(trigger.getJobDataMap());
}
PreparedStatement ps = null;
int insertResult = 0;
try {
if(updateJobData) {
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
---------------------------------------------------------------------
"UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
---------------------------------------------------------------------
} else {
ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA));
---------------------------------------------------------------------
"UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
---------------------------------------------------------------------
}
ps.setString(1, trigger.getJobKey().getName());
ps.setString(2, trigger.getJobKey().getGroup());
ps.setString(3, trigger.getDescription());
long nextFireTime = -1;
if (trigger.getNextFireTime() != null) {//觸發器下次執行的時間不為null
//備注:一般執行完成,即是complete 狀态,getNextFireTime == null
nextFireTime = trigger.getNextFireTime().getTime();
}
ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime)));
long prevFireTime = -1;
if (trigger.getPreviousFireTime() != null) {//觸發器之前執行的時間不為 null
//備注:觸發器第一次執行的時候 getNextFireTime 為 null
prevFireTime = trigger.getPreviousFireTime().getTime();
}
ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime)));
ps.setString(6, state);
//擷取對于代理 ;如JDBC 或者RAM
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);
String type = TTYPE_BLOB;
if(tDel != null)
type = tDel.getHandledTriggerTypeDiscriminator();
ps.setString(7, type);
ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger
.getStartTime().getTime())));
long endTime = 0;
if (trigger.getEndTime() != null) {
endTime = trigger.getEndTime().getTime();
}
ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime)));
ps.setString(10, trigger.getCalendarName());
ps.setInt(11, trigger.getMisfireInstruction());
ps.setInt(12, trigger.getPriority());
if(updateJobData) {
setBytes(ps, 13, baos);
ps.setString(14, trigger.getKey().getName());
ps.setString(15, trigger.getKey().getGroup());
} else {
ps.setString(13, trigger.getKey().getName());
ps.setString(14, trigger.getKey().getGroup());
}
insertResult = ps.executeUpdate();
if(tDel == null)
updateBlobTrigger(conn, trigger);
else
//更新擴充觸發器屬性
tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail);
---------------------------------------------------------------
"UPDATE {0}SIMPLE_TRIGGERS SET REPEAT_COUNT = ?, REPEAT_INTERVAL = ?, TIMES_TRIGGERED = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
---------------------------------------------------------------------
} finally {
closeStatement(ps);
}
return insertResult;//傳回執行的結果 為int
}
歡迎通路我的csdn部落格,我們一同成長!
"不管做什麼,隻要堅持下去就會看到不一樣!在路上,不卑不亢!"
部落格首頁:http://blog.csdn.net/u010648555
版權聲明
作者:阿飛雲
出處:部落格園阿飛雲的技術部落格--http://www.cnblogs.com/aflyun
您的支援是對部落客最大的鼓勵,感謝您的認真閱讀。
本文版權歸作者所有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。