quartz2.2.1叢集排程機制調研及源碼分析
原文位址:http://demo.netfoucs.com/gklifg/article/details/27090179
引言
quartz叢集架構
排程器執行個體化
排程過程
觸發器的擷取
觸發trigger:
Job執行過程:
總結:
附:
1.單獨啟動一個Job Server來跑job,不部署在web容器中.其他web節點當需要啟動異步任務的時候,可以通過種種方式(DB,
JMS, Web Service, etc)通知Job Server,而Job
Server收到這個通知之後,把異步任務加載到自己的任務隊列中去。
2.獨立出一個job
server,這個server上跑一個spring+quartz的應用,這個應用專門用來啟動任務。在jobserver上加上hessain,得到
業務接口,這樣jobserver就可以調用web
container中的業務操作,也就是正真執行任務的還是在cluster中的tomcat。在jobserver啟動定時任務之後,輪流調用各位址上
的業務操作(類似apache分發tomcat一樣),這樣可以讓不同的定時任務在不同的節點上運作,減低了一台某個node的壓力
3.quartz本身事實上也是支援叢集的。在這種方案下,cluster上的每一個node都在跑quartz,然後也是通過資料中記錄的狀态來
判斷這個操作是否正在執行,這就要求cluster上所有的node的時間應該是一樣的。而且每一個node都跑應用就意味着每一個node都需要有自己
的線程池來跑quartz.
總的來說,第一種方法,在單獨的server上執行任務,對任務的适用範圍有很大的限制,要通路在web環境中的各種資源非常麻煩.但是集中式的管
理容易從架構上規避了分布式環境的種種同步問題.第二種方法在在第一種方法的基礎上減輕了jobserver的重量,隻發送調用請求,不直接執行任務,這
樣解決了獨立server無法通路web環境的問題,而且可以做到節點的輪詢.可以有效地均衡負載.第三種方案是quartz自身支援的叢集方案,在架構
上完全是分布式的,沒有集中的管理,quratz通過資料庫鎖以及辨別字段保證多個節點對任務不重複擷取,并且有負載平衡機制和容錯機制,用少量的備援,
換取了高可用性(high avilable HA)和高可靠性.(個人認為和git的機制有異曲同工之處,分布式的備援設計,換取可靠性和速度).
本文旨在研究quratz為解決分布式任務排程中存在的防止重複執行和負載均衡等問題而建立的機制.以排程流程作為順序,配合源碼了解其中原理.
quartz的分布式架構如上圖,可以看到資料庫是各節點上排程器的樞紐.各個節點并不感覺其他節點的存在,隻是通過資料庫來進行間接的溝通.
實際上,quartz的分布式政策就是一種以資料庫作為邊界資源的并發政策.每個節點都遵守相同的操作規範,使得對資料庫的操作可以串行執行.而不同名稱的排程器又可以互不影響的并行運作.
元件間的通訊圖如下:(*注:主要的sql語句附在文章最後)
quartz運作時由QuartzSchedulerThread類作為主體,循環執行排程流程。JobStore作為中間層,按照quartz的
并發政策執行資料庫操作,完成主要的排程邏輯。JobRunShellFactory負責執行個體化JobDetail對象,将其放入線程池運作。
LockHandler負責擷取LOCKS表中的資料庫鎖。
整個quartz對任務排程的時序大緻如下:
梳理一下其中的流程,可以表示為:
0.排程器線程run()
1.擷取待觸發trigger
1.1資料庫LOCKS表TRIGGER_ACCESS行加鎖
1.2讀取JobDetail資訊
1.3讀取trigger表中觸發器資訊并标記為"已擷取"
1.4commit事務,釋放鎖
2.觸發trigger
2.1資料庫LOCKS表STATE_ACCESS行加鎖
2.2确認trigger的狀态
2.3讀取trigger的JobDetail資訊
2.4讀取trigger的Calendar資訊
2.3更新trigger資訊
2.3commit事務,釋放鎖
3執行個體化并執行Job
3.1從線程池擷取線程執行JobRunShell的run方法
可以看到,這個過程中有兩個相似的過程:同樣是對資料表的更新操作,同樣是在執行操作前擷取鎖 操作完成後釋放鎖.這一規則可以看做是quartz解決叢集問題的核心思想.
規則流程圖:
進一步解釋這條規則就是:一個排程器執行個體在執行涉及到分布式問題的資料庫操作前,首先要擷取QUARTZ2_LOCKS表中對應目前排程器的行級鎖,擷取鎖後即可執行其他表中的資料庫操作,随着操作事務的送出,行級鎖被釋放,供其他排程器執行個體擷取.
叢集中的每一個排程器執行個體都遵循這樣一種嚴格的操作規程,那麼對于同一類排程器來說,每個執行個體對資料庫的操作隻能是串行的.而不同名的排程器之間卻可以并行執行.
下面我們深入源碼,從微觀上觀察quartz叢集排程的細節
一個最簡單的quartz helloworld應用如下:
我們看到初始化一個排程器需要用工廠類擷取執行個體:
<code>SchedulerFactory sf = </code><code>new</code> <code>StdSchedulerFactory();</code>
<code>Scheduler sch = sf.getScheduler(); </code>
然後啟動:
<code>sch.start();</code>
跟進初始化排程器方法sched = instantiate();發現是一個700多行的初始化方法,涉及到
讀取配置資源,
生成QuartzScheduler對象,
建立該對象的運作線程,并啟動線程;
初始化JobStore,QuartzScheduler,DBConnectionManager等重要元件,
至此,排程器的初始化工作已完成,初始化工作中quratz讀取了資料庫中存放的對應目前排程器的鎖資訊,對應CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS兩個LOCK_NAME.
當調用sch.start();方法時,scheduler做了如下工作:
1.通知listener開始啟動
2.啟動排程器線程
3.啟動plugin
4.通知listener啟動完成
排程器啟動後,排程器的線程就處于運作狀态了,開始執行quartz的主要工作–排程任務.
前面已介紹過,任務的排程過程大緻分為三步:
3.執行個體化并執行Job
下面分别分析三個階段的源碼.
QuartzSchedulerThread是排程器線程類,排程過程的三個步驟就承載在run()方法中,分析見代碼注釋:
排程器每次擷取到的trigger是30s内需要執行的,是以要等待一段時間至trigger執行前2ms.在等待過程中涉及到一個新加進來更緊急的trigger的處理邏輯.分析寫在注釋中,不再贅述.
可以看到排程器的隻要在運作狀态,就會不停地執行排程流程.值得注意的是,在流程的最後線程會等待一個随機的時間.這就是quartz自帶的負載平衡機制.
以下是三個步驟的跟進:
排程器調用:
<code>triggers = qsRsrcs.getJobStore().acquireNextTriggers(</code>
<code>now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());</code>
在資料庫中查找一定時間範圍内将會被觸發的trigger.參數的意義如下:參數1:nolaterthan = now+3000ms,即未來30s内将會被觸發.參數2 最大擷取數量,大小取線程池線程剩餘量與定義值得較小者.參數3 時間視窗 預設為0,程式會在nolaterthan後加上視窗大小來選擇trigger.quratz會在每次觸發trigger後計算出trigger下次要執 行的時間,并在資料庫QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中記錄.查找時将目前毫秒數與該字段比較,就能找出下一段時間内 将會觸發的觸發器.查找時,調用在JobStoreSupport類中的方法:
該方法關鍵的一點在于執行了executeInNonManagedTXLock()方法,這一方法指定了一個鎖名,兩個回調函數.在開始執行時獲 得鎖,在方法執行完畢後随着事務的送出鎖被釋放.在該方法的底層,使用 for update語句,在資料庫中加入行級鎖,保證了在該方法執行過程中,其他的排程器對trigger進行擷取時将會等待該排程器釋放該鎖.此方法是前面介 紹的quartz叢集政策的的具體實作,這一模闆方法在後面的trigger觸發過程還會被使用.
<code>public</code> <code>static</code> <code>final</code> <code>String SELECT_FOR_LOCK = </code><code>"SELECT * FROM "</code>
<code> </code><code>+ TABLE_PREFIX_SUBST + TABLE_LOCKS + </code><code>" WHERE "</code> <code>+ COL_SCHEDULER_NAME + </code><code>" = "</code> <code>+ SCHED_NAME_SUBST</code>
<code> </code><code>+ </code><code>" AND "</code> <code>+ COL_LOCK_NAME + </code><code>" = ? FOR UPDATE"</code><code>;</code>
進一步解釋:quratz在擷取資料庫資源之前,先要以for update方式通路LOCKS表中相應LOCK_NAME資料将改行鎖定.如果在此前該行已經被鎖定,那麼等待,如果沒有被鎖定,那麼讀取滿足要求的 trigger,并把它們的status置為STATE_ACQUIRED,如果有tirgger已被置為STATE_ACQUIRED,那麼說明該 trigger已被别的排程器執行個體認領,無需再次認領,排程器會忽略此trigger.排程器執行個體之間的間接通信就展現在這裡.
JobStoreSupport.acquireNextTrigger()方法中:
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
最後釋放鎖,這時如果下一個排程器在排隊擷取trigger的話,則仍會執行相同的步驟.這種機制保證了trigger不會被重複擷取.按照這種算法正常運作狀态下排程器每次讀取的trigger中會有相當一部分已被标記為被擷取.
擷取trigger的過程進行完畢.
QuartzSchedulerThread line336:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
調用JobStoreSupport類的triggersFired()方法:
此處再次用到了quratz的行為規範:executeInNonManagedTXLock()方法,在擷取鎖的情況下對trigger進行觸發操作.其中的觸發細節如下:
該方法做了以下工作:
1.擷取trigger目前狀态
2.通過trigger中的JobKey讀取trigger包含的Job資訊
3.将trigger更新至觸發狀态
4.結合calendar的資訊觸發trigger,涉及多次狀态更新
5.更新資料庫中trigger的資訊,包括更改狀态至STATE_COMPLETE,及計算下一次觸發時間.
6.傳回trigger觸發結果的資料傳輸類TriggerFiredBundle
從該方法傳回後,trigger的執行過程已基本完畢.回到執行quratz操作規範的executeInNonManagedTXLock方法,将資料庫鎖釋放.
trigger觸發操作完成
再回到線程類QuartzSchedulerThread的 line353這時觸發器都已出發完畢,job的詳細資訊都已就位
QuartzSchedulerThread line:368
<code>qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));</code>
<code>shell.initialize(qs);</code>
為每個Job生成一個可運作的RunShell,并放入線程池運作.
在最後排程線程生成了一個随機的等待時間,進入短暫的等待,這使得其他節點的排程器都有機會擷取資料庫資源.如此就實作了quratz的負載平衡.
這樣一次完整的排程過程就結束了.排程器線程進入下一次循環.
簡單地說,quartz的分布式排程政策是以資料庫為邊界資源的一種異步政策.各個排程器都遵守一個基于資料庫鎖的操作規則保證了操作的唯一性.同時多個節點的異步運作保證了服務的可靠.但這種政策有自己的局限性.摘錄官方文檔中對quratz叢集特性的說明:
Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers.
The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).
說明指出,叢集特性對于高cpu使用率的任務效果很好,但是對于大量的短任務,各個節點都會搶占資料庫鎖,這樣就出現大量的線程等待資源.這種情況随着節點的增加會越來越嚴重.
通訊圖中關鍵步驟的主要sql語句: