排程系統作為分布式系統技術中重要的一環,了解其技術原理必不可少,不同系統内部采用的排程系統叫法不一樣,但大緻功能都類似,而Quartz作為經典的開源企業級排程系統,怎麼能不研究一下呢?
為什麼要學習quartz源碼?
- 排程系統很重要而且很常見,quartz又是業内知名産品,在企業中得到了廣泛的應用
- 學習好的系統設計可以提升自己的系統設計能力,後續涉及到任務排程相關功能,做起來更輕松和更穩定
概念
- Job代表一個任務執行個體。 Job由Jobdetail配置的執行個體資訊生成。
- JobDetail代表一個任務配置詳情。
- Trigger代表排程參數的配置,什麼時候發起調用,時間政策的排程。
- Scheduler:排程容器,一個Scheduler可以注冊多個JobDetail和Trigger。隻有JobDetail和Trigger組合到一起,才能被Scheduler排程。
- JobStore:儲存和讀取JobDetail與Trigger的地方,可以存儲在記憶體或者資料庫中。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iN4QDZ0MmZlZGMkFmMlN2M2IWOxgDZyYDNmFmMwgjZl9CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
Demo
來一段代碼實際感受下Quartz的使用方式,有助于了解其概念:
1 假如mvn依賴,mysql和HikariCP用于持久化任務配置。
org.quartz-scheduler quartz 2.3.0org.quartz-scheduler quartz-jobs 2.2.1mysql mysql-connector-java 5.1.35com.zaxxer HikariCP 2.2.5複制代碼
2 準備Demo代碼
//建立一個簡單的Job接口類public class HelloJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { System.out.println("hello quartz!"); }}// 1. 通過工廠的方式建立Scheduler// 2. JobDetail指定Job為HelloJob// 3. Trigger執行政策為每個10s重複執行一次排程作業public class SchedulerTest { private static SchedulerFactory factory = new StdSchedulerFactory(); public static void main(String[] args) throws SchedulerException { Scheduler scheduler = factory.getScheduler(); scheduler.start(); // JobDetail JobDetail job = JobBuilder.newJob(HelloJob.class) .withIdentity("myJob", "group") .build(); // Trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("myTrigger", "group") .startNow() .withSchedule(simpleSchedule() .withIntervalInSeconds(10) .repeatForever()) .build(); // 排程 scheduler.scheduleJob(job,trigger); }}複制代碼
3 預設情況下JobDetail和Trigger是存儲在記憶體中的,如果想要持久化到資料庫中,可以新增quartz.properties,修改配置準備資料庫腳本。
- 資料庫腳本:資料表腳本:raw.githubusercontent.com/quartznet/q…
- Quartz配置:
# quartz資料庫的表字首org.quartz.jobStore.tablePrefix = QRTZ_# 持久化使用的類,JobStoreTX支援事物的送出和復原org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTXorg.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate# 資料源的标記,配置之後quartz會根據值作為字首擷取資料庫的配置# 在StdSchedulerFactory類中搜尋 String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX); 檢視這部分代碼org.quartz.jobStore.dataSource = myDS# 配置資料庫org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driverorg.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/quartz-test?characterEncoding=utf-8org.quartz.dataSource.myDS.user = rootorg.quartz.dataSource.myDS.password =org.quartz.dataSource.myDS.maxConnections = 5#org.quartz.dataSource.myDS.connectionProvider.class=org.quartz.utils.HikariCpPoolingConnectionProviderorg.quartz.dataSource.myDS.provider=hikaricp# 其餘采用預設的quartz配置org.quartz.scheduler.instanceName: DefaultQuartzSchedulerorg.quartz.scheduler.rmi.export: falseorg.quartz.scheduler.rmi.proxy: falseorg.quartz.scheduler.wrapJobExecutionInUserTransaction: falseorg.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPoolorg.quartz.threadPool.threadCount: 10org.quartz.threadPool.threadPriority: 5org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: trueorg.quartz.jobStore.misfireThreshold: 60000複制代碼
運作結果:
資料表解釋:
原理設計
UML類圖
- 兩個主要線程:QuartzSchdulerThread與MisfireHandler 排程任務的核心執行邏輯在QuartzSchdulerThread中 MisfireHandler用于解決任務未觸發問題。
- JobStore對Job和Trigger的增删改查,JobRunShell将架構與我們自定義的業務Job進行關聯起來處理
Quartz主要啟動過程
通過時序圖,了解Quartz大部分核心類的建立時機。
1 首先建立排程工廠類,一般使用StdSchedulerFactory,通過工廠類建立Scheduler。Scheduler的屬性可通過quartz.properties配置
2 以Scheduler的标準實作StdScheduler為例,其為QuartzScheduler的代理類,主要行為通過QuartzScheduler來實作。
3 QuartzScheduler執行個體化的時候也是在StdSchedulerFactory中,它主要使用兩個對象。
- QuartzSchedulerResources 執行個體化與StdSchedulerFactory中,包含Scheduler建立和運作過程的主要資源,如JobStore和ThreadExecutor。
- QuartzSchedulerThread 負責觸發Trigger,通過SchedulerSignaler進行互動
Quartz任務排程過程
我們建立的任務是怎麼被排程的?主要在排程線程QuartzSchedulerThread中實作,其大緻邏輯
1 先擷取線程池中可以使用的線程數量,如果沒有可以用的線程會阻塞到有可用的線程。 配置:org.quartz.threadPool.xxx
2 通過JobStore擷取接下來30秒鐘内要執行的trigger。org.quartz.spi.JobStore#acquireNextTriggers
3 循環與waiting到任務配置的觸發時間
4 進行觸發,通過JobStore.triggerFired擷取TriggerFiredResult
5 針對每個要執行的TriggerFiredResult,建立JobRunShell,并放入線程池執行
- JobRunShell調用初始化方法,建立本次要執行Job和JobExecutionContext。 Job = JobDetail.getJobClass().newInstance(), JobExecutionContext包含了本次Job運作的JobDetail和Trigger等資訊。
- 将JobRunnerShell丢到線程池中,從線程池中選一個可用的WorkerThread運作。
- 運作JobRunnerShell的run方法。job.execute(jec); 執行Job執行個體代碼,執行前後可以通過listner做一些監聽。
Quartz任務Misfire過程
Quartz排程器正常情況下擷取将來一段時間内要觸發的任務,然後循環等待到指定時刻進行執行,但是可能在指定的時間點未執行到配置的任務。出現這種情況的原因:
- 系統重新開機,重新開機的這段時間中,一些任務被misfire
- trigger被暫停(suspendXXX)的一段時間中,一些任務被misfire
- 線程池資源不足,任務無法被執行
- 有些任務在觸發時間時,上次正在執行的任務目前還沒有結束。
那麼Misfire機制的處理原理是什麼呢?
- 假設在0時刻有一個任務需要執行,但是到了目前時刻即圖中的80,任務還沒有被執行, 如果目前時刻與0時刻要執行的任務大于misfireThreshold,那麼0時刻的任務被看做是misfire任務。
- 然後0時刻的任務會被MisfireHandler檢測到,再将其next_trigger_time設定為90(設定為目前時刻之後)。
- 由于任務的next_trigger_time設定為了目前時刻之後,排程線程會重新檢測到這個任務,然後進行觸發。
内部run方法的執行流程:
1 掃描在misfireThreshold到此刻時間範圍内沒有被執行的Trigger。首先進行計數:countMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime())
2 如果count大于0的話,擷取鎖,防止并發通路。然後擷取需要被觸發的Misfire trigger。
3 根據配置的misfireInstruction更新trigger的next_fire_time。主要方法位于:SimpleTriggerImpl#updateAfterMisfire
4 送出connection
5 如果還有更多的misfire任務,休息最短暫的50ms。 如果沒有則sleep時間為misfireThreshold
Trigger狀态
在網上看到一個有關Trigger狀态流轉的圖,參考下:
一些問題
預估在使用Quartz中可能會存在的問題:
1 資料表結構固定,必須要按照官方給的表結構來嗎?
- 可以自己實作JobStore,參考JobStoreSupport類,自定義表結構
2 Quartz預設使用資料庫作為分布式鎖,性能太差,如何優化?
- 自定義LockHandler類,使用Redis實作分布式鎖
- 使用Trigger批處理方式
- 改變任務執行的順序
- 減少上下文的切換
參考:tech.ebayinc.com/engineering…
最後
本人才疏學淺,過程如有不當,希望大佬能指出錯誤,如有想關于其設計原理讨論的,也歡迎來撩。
會持續更新...