![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLyImNxYWYzYTM1gTY2kTNhlzN3QzNlVWNklDM3IDNjF2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
目标:
第一章:概述
1、了解任務排程的概念
2、了解分布式任務排程的概念
3、能夠說出Elastic-Job是什麼
第二章:Elastic-Job快速入門
1、能夠搭建Elastic-Job快速入門工程環境
2、能夠編寫Elastic-Job快速入門的程式
3、了解Elastic-Job整體架構的組成部分的職責
4、了解ZooKeeper在Elastic-Job中的作用
第三章:Spring Boot開發分布式任務排程
1、能夠采用Spring Boot搭建Elastic-Job程式環境
2、了解作業分片的概念
3、能夠實作Elastic-Job作業分片案例
第四章:Elastic-Job進階
1、能夠使用事件跟蹤
2、能夠使用elastic-job-lite-console
3、能夠使用Dump指令
Elastic-Job分布式任務排程
1.概述
1.1.什麼是任務排程
我們可以先思考一下下面業務場景的解決方案:
- 某電商系統需要在每天上午10點,下午3點,晚上8點發放一批優惠券。
- 某銀行系統需要在信用卡到期還款日的前三天進行短信提醒。
- 某财務系統需要在每天淩晨0:10結算前一天的财務資料,統計彙總。
- 12306會根據車次的不同,而設定某幾個時間點進行分批放票。
- 某網站為了實作天氣實時展示,每隔5分鐘就去天氣伺服器擷取最新的實時天氣資訊。
以上場景就是任務排程所需要解決的問題。
- 任務排程是指系統為了自動完成特定任務,在約定的特定時刻去執行任務的過程。有了任務排程即可解放更多的人力由系統自動去執行任務。
任務排程如何實作? 多線程方式實作: 學過多線程的同學,可能會想到,我們可以開啟一個線程,每sleep一段時間,就去檢查是否已到預期執行時間。 以下代碼簡單實作了任務排程的功能:
public static void main(String[] args) {
//任務執行間隔時間
final long timeInterval=3000;
Runnable runnable=new Runnable() {
@Override
public void run() {
while(true){
System.out.printf("time:%s,to do...\n",LocalDateTime.now().getSecond());
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
};
Thread thread=new Thread(runnable);
thread.start();
}
上面的代碼實作了按一定的間隔時間執行任務排程的功能。
Jdk也為我們提供了相關支援,如Timer、ScheduledExecutor,下邊我們了解下。
Timer方式實作:
public static void main(String[] args) {
Timer timer=new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.printf("time:%s,to do111...\n",LocalDateTime.now().getSecond());
}
},1000,2000);//1秒後開始排程,每2秒執行一次
Timer timer2=new Timer();
timer2.schedule(new TimerTask() {
@Override
public void run() {
System.out.printf("time:%s,to do222...\n",LocalDateTime.now().getSecond());
}
},1000,3000);//1秒後開始排程,每2秒執行一次
}
Timer 的優點在于簡單易用
,每個Timer對應一個線程,是以可以同時啟動多個Timer并行執行多個任務,同一個Timer中的任務是串行執行。
ScheduledExecutor方式實作:
public static void main(String[] args) {
ScheduledExecutorService service= Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.printf("time:%s,to do...\n",LocalDateTime.now().getSecond());
}
},1,3, TimeUnit.SECONDS);
}
Java 5 推出了基于線程池設計的
ScheduledExecutor
,其設計思想是,每一個被排程的任務都會由線程池中一個線程去執行,是以任務是并發執行的,互相之間不會受到幹擾。
Timer 和 ScheduledExecutor 都僅能提供基于開始時間與重複間隔的任務排程,不能勝任更加複雜的排程需求。
比如,設定每月第一天淩晨1點執行任務、複雜排程任務的管理、任務間傳遞資料等等。
Quartz 是一個功能強大的任務排程架構,它可以滿足更多更複雜的排程需求,Quartz 設計的核心類包括Scheduler, Job 以及 Trigger。其中,Job 負責定義需要執行的任務,Trigger 負責設定排程政策,Scheduler 将二者組裝在一起,并觸發任務開始執行。Quartz支援簡單的按時間間隔排程、還支援按月曆排程方式,通過設定
CronTrigger表達式(包括:
秒、分、時、日、月、周、年
)進行任務排程。
第三方Quartz方式實作:
public static void main(String[] args) throws SchedulerException {
//建立一個Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//建立JobDetail
JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
jobDetailBuilder.withIdentity("jobName","jobGroupName");
JobDetail jobDetail = jobDetailBuilder.build();
//建立觸發的CronTrigger 支援按月曆排程
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName", "triggerGroupName")
.startNow().withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
.build();//每隔兩秒執行一次
//建立觸發的SimpleTrigger 簡單的間隔排程
/*
SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName","triggerGroupName")
.startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2).repeatForever()).build();
*/
scheduler.scheduleJob(jobDetail,trigger); scheduler.start();
}
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.printf("time:%s,to do...\n", LocalDateTime.now().getSecond());
}
}
1.2.什麼是分布式任務排程
什麼是分布式?
目前軟體的架構正在逐漸轉變為分布式架構,将單體結構分為若幹服務,服務之間通過網絡互動來完成使用者的業務處理,如下圖,電商系統為分布式架構,由訂單服務、商品服務、使用者服務等組成:
分布式系統具體如下基本特點:
-
:每個部分都可以獨立部署,服務之間互動通過網絡進行通信,比如:訂單服務、商品服務。分布性
-
:每個部分都可以叢集方式部署,并可針對部分結點進行硬體及軟體擴容,具有一定的伸縮能力。伸縮性
-
:每個部分都可以叢集部分,保證高可用。高可用
什麼是分布式排程?
通常任務排程的程式是內建在應用中的,比如:優惠卷服務中包括了定時發放優惠卷的排程程式,結算服務中包括了定期生成報表的任務排程程式,由于采用分布式架構,一個服務往往會部署多個備援執行個體來運作我們的業務,在這種分布式系統環境下運作任務排程,我們稱之為
分布式任務排程
,如下圖:
分布式排程要實作的目标:
不管是任務排程程式內建在應用程式中,還是單獨建構的任務排程系統,如果采用分布式排程任務的方式就相當于将任務排程程式分布式建構,這樣就可以具有分布式系統的特點,并且提高任務的排程處理能力:
1、并行任務排程
并行任務排程實作靠多線程,如果有大量任務需要排程,
此時光靠多線程就會有瓶頸了,因為一台計算機CPU的處理能力是有限的。
如果将任務排程程式分布式部署,每個結點還可以部署為叢集,
這樣就可以讓多台計算機共同去完成任務排程,我們可以将任務分割為若幹個分片,
由不同的執行個體并行執行,來提高任務排程的處理效率。
2、高可用
若某一個執行個體當機,不影響其他執行個體來執行任務。
3、彈性擴容
當叢集中增加執行個體就可以提高并執行任務的處理效率。
4、任務管理與監測
對系統中存在的所有定時任務進行統一的管理及監測。
讓開發人員及運維人員能夠時刻了解任務執行情況,進而做出快速的應急處理響應。
5、避免任務重複執行
當任務排程以叢集方式部署,同一個任務排程可能會執行多次,
比如在上面提到的電商系統中到點發優惠券的例子,就會發放多次優惠券,
對公司造成很多損失,是以我們需要控制相同的任務在多個運作執行個體上隻執行一次,
考慮采用下邊的方法:
分布式鎖,多個執行個體在任務執行前首先需要擷取鎖,如果擷取失敗那麼久證明有其他服務已經再運作,
如果擷取成功那麼證明沒有服務在運作定時任務,那麼就可以執行。
- ZooKeeper選舉,利用ZooKeeper對Leader執行個體執行定時任務,有其他業務已經使用了ZK,那麼執行定時任務的時候判斷自己是否是Leader,如果不是則不執行,如果是則執行業務邏輯,這樣也能達到我們的目的。
2 Elastic-Job介紹
針對分布式任務排程的需求市場上出現了很多的産品:
1)Elastic-job:當當網基于quartz 二次開發的彈性分布式任務排程系統,功能豐富強大,采用zookeeper實作分布式協調,實作任務高可用以及分片。
2)Saturn: 唯品會開源的一個分布式任務排程平台,可以全域統一配置,統一監控,任務高可用以及分片并發處理。它是在elastic-job基礎之上改良出來的。
3)xxl-job:大衆點評的分布式任務排程平台,是一個輕量級分布式任務排程平台,其核心設計目标是開發迅速、學習簡單、輕量級、易擴充。現已開放源代碼并接入多家公司線上産品線,開箱即用。
4)TBSchedule:淘寶的一款非常優秀的高性能分布式排程架構,目前被應用于阿裡、京東、支付寶、國美等很多網際網路企業的流程排程系統中。
Elastic-Job是一個分布式排程的解決方案,由當當網開源,它由兩個互相獨立的子項目Elastic-Job-Lite和ElasticJob-Cloud組成,使用Elastic-Job可以快速實作分布式任務排程。
Elastic-Job的github位址:https://github.com/elasticjob
功能清單:
分布式排程協調在分布式環境中,任務能夠按指定的排程政策執行,并且能夠避免同一任務多執行個體重複執行。
豐富的排程政策:
基于成熟的定時任務作業架構Quartz cron表達式執行定時任務。
彈性擴容縮容
當叢集中增加某一個執行個體,它應當也能夠被選舉并執行任務;當叢集減少一個執行個體時,它所執行的任務能被轉移到别的執行個體來執行。
失效轉移
某執行個體在任務執行失敗後,會被轉移到其他執行個體執行。
錯過執行作業重觸發若因某種原因導緻作業錯過執行,自動記錄錯過執行的作業,并在上次作業完成後自動觸發。
支援并行排程
支援任務分片,任務分片是指将一個任務分為多個小任務項在多個執行個體同時執行。
作業分片一緻性
當任務被分片後,保證同一分片在分布式環境中僅一個執行執行個體。
支援作業生命周期操作
可以動态對任務進行開啟及停止操作。
豐富的作業類型
支援Simple、DataFlow、Script三種作業類型,後續會有詳細介紹。
Spring整合以及命名空間支援
對Spring支援良好的整合方式,支援spring自定義命名空間,支援占位符。
運維平台提供運維界面,可以管理作業和注冊中心。
3 Elastic-Job快速入門
3.1 環境搭建
3.1.1 版本要求
JDK要求1.7及以上版本
Maven要求3.0.4及以上版本
zookeeper要求采用3.4.6及以上版本
3.1.2 Zookeeper安裝&運作
https://archive.apache.org/dist/zookeeper/ 下載下傳某版本Zookeeper,并解壓。
執行解壓目錄下的bin/zkServer.cmd。
關于Zookeeper後續章節會有介紹。
3.1.3 建立maven工程
建立maven工程elastic-job-quickstart,并導入以下依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima.scheduler</groupId>
<artifactId>elastic-job-quickstart</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
</dependencies>
</project>
工程結構如下:
3.2 代碼實作
3.2.1.編寫定時任務類
此任務在每次執行時擷取一定數目的檔案,進行備份處理,由File實體類的backedUp屬性來辨別該檔案是否已備份。
/*
檔案備份任務
*/
public class FileBackupJob implements SimpleJob {
//檔案清單(模拟)
public static List<FileCustom> files=new ArrayList<>();
//每次任務執行要備份檔案的數量
private final int FETCH_SIZE=1;
//任務執行代碼邏輯
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("作業分片資訊:"+shardingContext.getShardingItem());
//1.擷取未備份檔案
List<FileCustom> fileCustoms = fetchUnBackupFiles(FETCH_SIZE);
//進行檔案備份
backupFiles(fileCustoms);
}
//擷取未備份檔案
public List<FileCustom> fetchUnBackupFiles(int count){
//擷取的檔案清單
List<FileCustom> fileCustoms=new ArrayList<>();
int num=0;
for (FileCustom file : files) {
if(count<=num){
break;
}
//未備份
if(!file.getBackedUp()){
fileCustoms.add(file);
num++;
}
}
System.out.printf("time:%s,擷取檔案%d個\n", LocalDateTime.now(),num);
return fileCustoms;
}
/*
檔案備份
*/
public void backupFiles(List<FileCustom> files){
for (FileCustom file : files) {
file.setBackedUp(true);
System.out.printf("time:%s,備份檔案,名稱:%s,類型:%s\n", LocalDateTime.now(),file.getName(),file.getType());
}
}
}
檔案實體類如下:
@Data
public class FileCustom {
private String id;//辨別
private String name;//檔案名
private String type;//檔案類型,如text、image、radio、vedio
private String content;//檔案内容
private Boolean backedUp=false;//是否已備份
public FileCustom(String id, String name, String type, String content) {
this.id = id;
this.name = name;
this.type = type;
this.content = content;
}
}
3.2.2 編寫啟動類
public class JobMain {
//zookeeper端口
private static final int ZOOKEEPER_PORT=2181;
//zookeeper連接配接字元串 localhost:2181
private static final String ZOOKEEPER_CONNECTION_STRING="localhost:"+ZOOKEEPER_PORT;
//定時任務命名空間
private static final String JOB_NAMESPACE="elastic-job-example-java";
//啟動任務
public static void main(String[] args) {
//制造一些測試資料
generateTestFiles();
//配置注冊中心
CoordinatorRegistryCenter registryCenter = setUpRegistryCenter();
//啟動任務
startJob(registryCenter);
}
//zk的配置
//注冊中心配置
private static CoordinatorRegistryCenter setUpRegistryCenter(){
//注冊中心配置
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING,JOB_NAMESPACE);
//減少zk的逾時時間
zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
//建立注冊中心
CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
registryCenter.init();
return registryCenter;
}
//配置并啟動任務
private static void startJob(CoordinatorRegistryCenter registryCenter){
//建立JobCoreConfiguration 每3秒鐘啟動一次 分片數量1
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("files‐job", "0/3 * * * * ?", 1) .build();
//建立SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, FileBackupJob.class.getCanonicalName());
//啟動任務
new JobScheduler(registryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build()).init();
}
//生成測試檔案
private static void generateTestFiles(){
for(int i=1;i<11;i++){
FileBackupJob.files.add(new FileCustom(String.valueOf(i+10),"檔案"+ (i+10),"text","content"+ (i+10)));
FileBackupJob.files.add(new FileCustom(String.valueOf(i+20),"檔案"+ (i+20),"image","content"+ (i+20)));
FileBackupJob.files.add(new FileCustom(String.valueOf(i+30),"檔案"+ (i+30),"radio","content"+ (i+30)));
FileBackupJob.files.add(new FileCustom(String.valueOf(i+40),"檔案"+ (i+40),"video","content"+ (i+40)));
}
System.out.println("生成測試資料完成");
}
}
2.2.3 測試
(1)啟動main方法檢視控制台
定時任務每3秒批量執行一次,符合基礎預期。
(2)測試視窗1不關閉,再次運作main方法觀察控制台日志(視窗2)
會出現以下兩種情況:
- 視窗1繼續執行任務,視窗2不執行任務
- 視窗2接替視窗1執行任務,視窗1停止執行任務
可通過反複啟停視窗2檢視到以上現象。
(3)視窗1、視窗2同時運作的情況下,停止正在執行任務的視窗
未停止的視窗開始執行任務。
分片測試:
目前作業沒有被分片,是以多個執行個體共同執行時隻有一個執行個體在執行,如果我們将作業分片執行,作業将被拆分為多個獨立的任務項,然後由分布式的應用執行個體分别執行某一個或幾個分片項。
修改上邊的代碼,改為作業分3片執行:
//建立JobCoreConfiguration 每3秒鐘啟動一次 分片數量3
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("files‐job", "0/3 * * * * ?", 3) .build();
同時啟動三個JobMain:
每個JobMain視窗分别執行一片作業。
總結
:
通過以上簡單的測試,就可以看出Elastic-Job幫我們解決了分布式排程的以下三個問題:
1)
多執行個體部署時避免任務重複執行
,在任務執行時間到來時,從所有執行個體中
選舉
出來一個,讓它來執行任務,進而避免多個執行個體同時執行任務。
2)
高可用
,若某一個執行個體當機,不影響其他執行個體來執行任務。
3)
彈性擴容
,當叢集中增加某一個執行個體,它應當也能夠被選舉并執行任務,如果作業分片将參與執行某個分片作業。
3.3 Elastic-Job工作原理
3.3.1.Elastic-Job整體架構
- App:應用程式,内部包含任務執行業務邏輯和Elastic-Job-Lite元件,其中執行任務需要實作ElasticJob接口完成與Elastic-Job-Lite元件的內建,并進行任務的相關配置。應用程式可啟動多個執行個體,也就出現了多個任務執行執行個體。
- Elastic-Job-Lite:Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務,此元件負責任務的排程,并産生日志及任務排程記錄。無中心化,是指沒有排程中心這一概念,每個運作在叢集中的作業伺服器都是對等的,各個作業節點是自治的、平等的、節點之間通過注冊中心進行分布式協調。
- Registry:以Zookeeper作為Elastic-Job的注冊中心元件,存儲了執行任務的相關資訊。同時,Elastic-Job利用該元件進行執行任務執行個體的選舉。
- Console:Elastic-Job提供了運維平台,它通過讀取Zookeeper資料展現任務執行狀态,或更新Zookeeper資料修改全局配置。通過Elastic-Job-Lite元件産生的資料來檢視任務執行曆史記錄。
- 應用程式在啟動時,在其内嵌的Elastic-Job-Lite元件會向Zookeeper注冊該執行個體的資訊,并觸發選舉(此時可能已經啟動了該應用程式的其他執行個體),從衆多執行個體中選舉出一個Leader,讓其執行任務。當到達任務執行時間時,Elastic-Job-Lite元件會調用由應用程式實作的任務業務邏輯,任務執行後會産生任務執行記錄。當應用程式的某一個執行個體當機時,Zookeeper元件會感覺到并重新觸發leader選舉。
3.3.2 ZooKeeper
在學習Elastic-Job執行原理時,有必要大緻了解一下ZooKeeper是用來做什麼的,因為:
- Elastic-Job依賴ZooKeeper完成對執行任務資訊的存儲(如任務名稱、任務參與執行個體、任務執行政策等);
- Elastic-Job依賴ZooKeeper實作選舉機制,在任務執行執行個體數量變化時(如在快速上手中的啟動新執行個體或停止執行個體),會觸發選舉機制來決定讓哪個執行個體去執行該任務。
ZooKeeper是一個分布式一緻性協調服務,它是Apache Hadoop 的一個子項目,它主要是用來解決分布式應用中經常遇到的一些資料管理問題,如:統一命名服務、狀态同步服務、叢集管理、分布式應用配置項的管理等。
咱們可以把ZooKeeper
想象為一個特殊的資料庫
,它
維護着一個類似檔案系統的樹形資料結構
,ZooKeeper的用戶端(如Elastic-Job任務執行執行個體)可以對資料進行存取:
每個子目錄項如 /app1都被稱作為 znode(目錄節點),和檔案系統一樣,我們能夠自由的增加、删除znode,在一個znode下增加、删除子znode,唯一的不同在于znode是可以存儲資料的。
ZooKeeper為什麼稱之為一緻性協調服務呢?因為ZooKeeper擁有資料監聽通知機制,
用戶端注冊監聽它關心的znode
,當znode發生變化(
資料改變、被删除、子目錄節點增加删除
)時,ZooKeeper會通知所有用戶端。簡單來說就是,當分布式系統的若幹個服務都關心一個資料時,
當這個資料發生改變,這些服務都能夠得知,那麼這些服務就針對此資料達成了一緻。
應用場景思考,使用ZooKeeper管理分布式配置項的機制:
假設我們的程式是分布式部署在多台機器上,如果我們要改變程式的配置檔案,
需要逐台機器去修改,非常麻煩,現在把這些配置全部放到zookeeper上去,
儲存在 zookeeper 的某個目錄節點中,然後所有相關應用程式作為ZooKeeper的用戶端
對這個目錄節點進行監聽,一旦配置資訊發生變化,每個應用程式就會收到 ZooKeeper的通知,
進而擷取新的配置資訊應用到系統中。
3.3.2.1.Elastic-Job任務資訊的儲存
- Elastic-Job使用ZooKeeper完成對任務資訊的存取,任務執行執行個體作為ZooKeeper用戶端對其znode操作,任務資訊儲存在znode中。
使用ZooInspector檢視zookeeper節點
1、zookeeper圖像化用戶端工具的下載下傳位址:
https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
2、下載下傳完後解壓壓縮包,輕按兩下位址為ZooInspector\build\zookeeper-dev-ZooInspector.jar的jar包;
如果輕按兩下沒有反應?首先電腦要配好java環境,使用java -jar 再加上你的jar檔案的路徑 啟動即可.
java -jar zookeeper-dev-ZooInspector.jar
節點記錄了任務的配置資訊,包含執行類,
cron表達式,分片算法類,分片數量,分片參數
。預設狀态下,如果你修改了Job的配置比如cron表達式,分片數量等是不會更新到zookeeper上去的,需要把
LiteJobConfiguration的參數overwrite修改成true
,或者删除zk的結點再啟動作業重新建立。
2.3.2.2.Elastic-Job任務執行執行個體選舉
Elastic-Job使用ZooKeeper實作任務執行執行個體選舉,若要使用ZooKeeper完成選舉,就需要了解ZooKeeper的znode類型了,ZooKeeper有四種類型的znode,用戶端在建立znode時可以指定:
- PERSISTENT-持久化目錄節點
用戶端建立該類型znode,此用戶端與ZooKeeper斷開連接配接後該節點依舊存在,如果建立了重複的key,比如/data,第二次建立會失敗。
- PERSISTENT_SEQUENTIAL-持久化順序編号目錄節點
用戶端與ZooKeeper斷開連接配接後該節點依舊存在,允許重複建立相同key,Zookeeper給該節點名稱進行順序編号,如zk會在後面加一串數字比如 /data/data0000000001,如果重複建立,會建立一個/data/data0000000002節點(一直往後加1)
- EPHEMERAL-臨時目錄節點
用戶端與ZooKeeper斷開連接配接後,該節點被删除,不允許重複建立相同key。
- EPHEMERAL_SEQUENTIAL-臨時順序編号目錄節點
用戶端與ZooKeeper斷開連接配接後,該節點被删除,允許重複建立相同key,依然采取順序編号機制。
執行個體選舉實作過程分析:
每個Elastic-Job的任務執行執行個體作為ZooKeeper的用戶端來操作ZooKeeper的znode
1)任意一個執行個體啟動時首先建立一個 /server 的PERSISTENT節點
2)多個執行個體同時建立 /server/leader EPHEMERAL子節點
3) /server/leader子節點隻能建立一個,後建立的會失敗。建立成功的執行個體被選為leader節點 , 用來執行任務。
4)所有任務執行個體監聽 /server/leader 的變化,一旦節點被删除,就重新進行選舉,搶占式地建立 /server/leader節點,誰建立成功誰就是leader。
3.4 小結
通過本章,我們完成了對Elastic-Job技術的快速入門程式,并了解了Elastic-Job整體架構
和工作原理。對于應用程式,隻需要将任務執行細節包裝為ElasticJob接口的實作類
并對任務細節進行配置即可完成與ElasticJob的內建,而Elastic-Job需要依賴Zookeeper
進行執行任務資訊的存取,執行任務執行個體的選舉。通過對快速入門程式的測試,
我們可以看到Elastic-Job确實解決了分布式任務排程的核心問題。
4. Spring Boot開發分布式任務
4.1 內建Spring Boot
将Elastic-job快速入門中的例子改造為spring boot內建方式。
4.1.1 導入maven依賴
建立elastic-job-springboot工程,依賴如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itheima</groupId>
<artifactId>elastic-job-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>elastic-job-springboot</name>
<description>elastic-job-springboot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
工程結構圖如下:
4.1.2 編寫spring boot配置檔案及啟動類
spring boot 配置檔案:
server.port=${PORT:56081}
spring.application.name=task‐scheduling‐springboot
logging.level.root=info
spring boot 啟動類:
@SpringBootApplication
public class ElasticJobApp {
public static void main(String[] args) {
SpringApplication.run(ElasticJobApp.class, args);
}
}
4.1.3 編寫Elastic-Job配置類及任務類
Zookeeper配置類:
@Configuration
public class RegistryCenterConfig {
//zookeeper端口
private final int ZOOKEEPER_PORT=2181;
//zookeeper連接配接字元串 localhost:2181
private final String ZOOKEEPER_CONNECTION_STRING="localhost:"+ZOOKEEPER_PORT;
//定時任務命名空間
private final String JOB_NAMESPACE="elastic-job-example-java";
//zk的配置
//注冊中心配置
@Bean(initMethod = "init")
public CoordinatorRegistryCenter setUpRegistryCenter(){
//注冊中心配置
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING,JOB_NAMESPACE);
//減少zk的逾時時間
zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
//建立注冊中心
CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
//registryCenter.init();
return registryCenter;
}
}
Elastic-Job配置類:
@Configuration
public class ElasticJobConfig {
@Autowired
FileBackupJob fileBackupJob;
@Autowired
CoordinatorRegistryCenter registryCenter;
@Bean(initMethod="init")
public SpringJobScheduler initSimpleElasticJob(){
LiteJobConfiguration jobConfiguration = createJobConfiguration(fileBackupJob.getClass(), "0/3 * * * * ?", 3, null);
SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileBackupJob, registryCenter,jobConfiguration);
return springJobScheduler;
}
/**
* 配置任務詳細資訊
* @param jobClass 任務執行類
* @param cron 執行政策
* @param shardingTotalCount 分片數量
* @param shardingItemParameters 分片個性化參數
* @return
*/
private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob>
jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters) {
//建立JobCoreConfiguration 每3秒鐘啟動一次 分片數量3
JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
if(StringUtils.isNotEmpty(shardingItemParameters)){
builder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration configuration=builder.build();
//建立SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(configuration,jobClass.getCanonicalName());
//啟動任務
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
return liteJobConfiguration;
}
}
Elastic-Job任務類:
/*
檔案備份任務
*/
@Component
public class FileBackupJob implements SimpleJob {
//檔案清單(模拟)
public static List<FileCustom> files=new ArrayList<>();
//每次任務執行要備份檔案的數量
private final int FETCH_SIZE=1;
static {
for(int i=1;i<11;i++){
files.add(new FileCustom(String.valueOf(i+10),"檔案"+ (i+10),"text","content"+ (i+10)));
files.add(new FileCustom(String.valueOf(i+20),"檔案"+ (i+20),"image","content"+ (i+20)));
files.add(new FileCustom(String.valueOf(i+30),"檔案"+ (i+30),"radio","content"+ (i+30)));
files.add(new FileCustom(String.valueOf(i+40),"檔案"+ (i+40),"video","content"+ (i+40)));
}
System.out.println("生成測試資料完成");
}
//任務執行代碼邏輯
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("作業分片資訊:"+shardingContext.getShardingItem());
//1.擷取未備份檔案
List<FileCustom> fileCustoms = fetchUnBackupFiles(FETCH_SIZE);
//進行檔案備份
backupFiles(fileCustoms);
}
//擷取未備份檔案
public List<FileCustom> fetchUnBackupFiles(int count){
//擷取的檔案清單
List<FileCustom> fileCustoms=new ArrayList<>();
int num=0;
for (FileCustom file : files) {
if(count<=num){
break;
}
//未備份
if(!file.getBackedUp()){
fileCustoms.add(file);
num++;
}
}
System.out.printf("time:%s,擷取檔案%d個\n", LocalDateTime.now(),num);
return fileCustoms;
}
/*
檔案備份
*/
public void backupFiles(List<FileCustom> files){
for (FileCustom file : files) {
file.setBackedUp(true);
System.out.printf("time:%s,備份檔案,名稱:%s,類型:%s\n", LocalDateTime.now(),file.getName(),file.getType());
}
}
}
FileCustom
@Data
public class FileCustom {
private String id;//辨別
private String name;//檔案名
private String type;//檔案類型,如text、image、radio、vedio
private String content;//檔案内容
private Boolean backedUp=false;//是否已備份
public FileCustom(String id, String name, String type, String content) {
this.id = id;
this.name = name;
this.type = type;
this.content = content;
}
}
設定三個啟動配置,修改端口,模拟多線程
運作結果,分片處理作業
4.2.作業分片
4.2.1.分片概念
作業分片是指任務的分布式執行,需要将一個任務拆分為多個獨立的任務項,
然後由分布式的應用執行個體分别執行某 一個或幾個分片項。
例如:Elastic-Job快速入門中檔案備份的例子,現有2台伺服器,每台伺服器分别跑一個應用執行個體。為了快速的執 行作業,那麼可以将作業分成4片,每個應用執行個體個執行2片。作業周遊資料的邏輯應為:
執行個體1查找text和image
類型檔案執行備份;實
例2查找radio和video類型檔案執行備份
。 如果由于伺服器擴容應用執行個體數量增加為4,則 作業周遊資料的邏輯應為:4個執行個體分别處理
text、image、radio、video
類型的檔案。
- 可以看到,通過對任務合理的分片化,進而達到任務并行處理的效果,大限度的提高執行作業的吞吐量。
分片項與業務處了解耦
Elastic-Job并不直接提供資料處理的功能,架構隻會将分片項配置設定至各個運作中的作業伺服器,開發者需要自行處 理分片項與真實資料的對應關系。
最大限度利用資源
将分片項設定為大于伺服器的數量,好是大于伺服器倍數的數量,作業将會合理的利用分布式資源,動态的配置設定 分片項。
例如:3台伺服器,分成10片,則分片項配置設定結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。 如果伺服器C 崩潰,則分片項配置設定結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9。在不丢失分片項的情況下,大限度的利用現 有資源提高吞吐量。
4.2.2 作業分片實作
- 基于Spring boot內建方式的而産出的工程代碼,完成對作業分片的實作,檔案資料備份采取更接近真實項目的數 據庫存取方式。
4.2.2.1 建立資料庫
資料庫:mysql-8.0
建立elastic_job_demo資料庫:
CREATE DATABASE `elastic_job_demo` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `t_file` (
`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`name` VARCHAR (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`type` VARCHAR (255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`backedUp` tinyint(1) DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB CHARACTER
SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
項目結構
4.2.2.2 maven依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
</dependencies>
4.2.2.3 編寫檔案服務類
FileService
@Service
public class FileService {
@Autowired
JdbcTemplate jdbcTemplate;
/**
* @description
* @author ThinkPad
* @param[1] fileType
* @param[2] count
* @throws
* @return List<FileCustom>
* @time 2023/2/2 16:03
*/
public List<FileCustom> fetchUnBackupFiles(String fileType,Integer count){
String sql="select *from t_file where type=? and backedUp=0 limit 0,?";
List<FileCustom> files = jdbcTemplate.query(sql, new Object[]{fileType, count}, new BeanPropertyRowMapper(FileCustom.class));
return files;
}
/**
* @description 備份檔案
* @author ThinkPad
* @param[1] files
* @throws
* @time 2023/2/3 9:32
*/
public void backupFiles(List<FileCustom> files){
for (FileCustom file : files) {
String sql="update t_file set backedUp=1 where id=?";
jdbcTemplate.update(sql,file.getId());
System.out.println(String.format("線程 %d | 已備份檔案:%s 檔案類型:%s"
,Thread.currentThread().getId()
,file.getName()
,file.getType()));
}
}
}
4.2.2.4 Elastic-Job任務類
FileBackupJobDb
/*
檔案備份任務
*/
@Component
public class FileBackupJobDb implements SimpleJob {
//檔案清單(模拟)
public static List<FileCustom> files=new ArrayList<>();
//每次任務執行要備份檔案的數量
private final int FETCH_SIZE=1;
@Autowired
private FileService fileService;
//任務執行代碼邏輯
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("作業分片資訊:"+shardingContext.getShardingItem());
//分片參數 0=text,1=image,2=radio,3=video
//擷取分片參數
String jobParameter = shardingContext.getShardingParameter();
//1.擷取未備份檔案
List<FileCustom> fileCustoms = fetchUnBackupFiles(jobParameter,FETCH_SIZE);
//進行檔案備份
backupFiles(fileCustoms);
}
//擷取未備份檔案
public List<FileCustom> fetchUnBackupFiles(String fileType,int count){
List<FileCustom> fileCustoms = fileService.fetchUnBackupFiles(fileType, count);
System.out.printf("time:%s,擷取檔案%d個\n", LocalDateTime.now(),count);
return fileCustoms;
}
/*
檔案備份
*/
public void backupFiles(List<FileCustom> files){
fileService.backupFiles(files);
}
}
4.2.2.5 編寫Elastic-Job配置類及任務類
Zookeeper配置類
@Configuration
public class RegistryCenterConfig {
//zookeeper連接配接字元串 localhost:2181
private final String ZOOKEEPER_CONNECTION_STRING="localhost:2181";
//定時任務命名空間
private final String JOB_NAMESPACE="elastic-job-example-java";
//zk的配置
//注冊中心配置
@Bean(initMethod = "init")
public CoordinatorRegistryCenter setUpRegistryCenter(){
//注冊中心配置
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING,JOB_NAMESPACE);
//減少zk的逾時時間
zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
//建立注冊中心
CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
//registryCenter.init();
return registryCenter;
}
}
Elastic-Job配置類
@Configuration
public class ElasticJobConfig {
@Autowired
FileBackupJobDb fileBackupJob;
@Autowired
CoordinatorRegistryCenter registryCenter;
@Bean(initMethod="init")
public SpringJobScheduler initSimpleElasticJob(){
LiteJobConfiguration jobConfiguration = createJobConfiguration(fileBackupJob.getClass(), "0/3 * * * * ?", 4, "0=text,1=image,2=radio,3=video");
SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileBackupJob, registryCenter,jobConfiguration);
return springJobScheduler;
}
/**
* 配置任務詳細資訊
* @param jobClass 任務執行類
* @param cron 執行政策
* @param shardingTotalCount 分片數量
* @param shardingItemParameters 分片個性化參數
* @return
*/
private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob>
jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters) {
//建立JobCoreConfiguration 每3秒鐘啟動一次 分片數量3
JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
if(StringUtils.isNotEmpty(shardingItemParameters)){
builder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration configuration=builder.build();
//建立SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(configuration,jobClass.getCanonicalName());
//啟動任務
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
return liteJobConfiguration;
}
}
4.2.2.6 編寫檔案實體類
FileCustom
@Data
@NoArgsConstructor
public class FileCustom {
private String id;//辨別
private String name;//檔案名
private String type;//檔案類型,如text、image、radio、vedio
private String content;//檔案内容
private Boolean backedUp=false;//是否已備份
public FileCustom(String id, String name, String type, String content) {
this.id = id;
this.name = name;
this.type = type;
this.content = content;
}
}
4.2.2.7 編寫spring boot配置檔案及啟動類
spring boot 配置檔案:
server.port=${PORT:56081}
spring.application.name=task?scheduling?springboot
logging.level.root=info
#資料源定義
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/elastic_job_demo?useUnicode=true
spring.datasource.username=root
spring.datasource.password=root
spring boot 啟動類:
@SpringBootApplication
public class ElasticJobApp {
public static void main(String[] args) {
SpringApplication.run(ElasticJobApp.class, args);
}
}
4.2.2.8 測試
增加測試資料:
通過junit單元測試程式來增加:
@RunWith(SpringRunner.class)
@SpringBootTest
class GenerateTestData {
@Autowired
JdbcTemplate jdbcTemplate;
@Test
public void testGenerateTestData(){
//清除資料
clearTestFiles();
//制造資料
generateTestFiles();
}
//清除資料
private void clearTestFiles() {
jdbcTemplate.update("delete from t_file");
}
/**
* 建立模拟資料
*/
public void generateTestFiles(){
List<FileCustom> files =new ArrayList<>();
for(int i=1;i<11;i++){
files.add(new FileCustom(String.valueOf(i),"檔案"+ i,"text","content"+ i));
files.add(new FileCustom(String.valueOf((i+10)),"檔案"+(i+10),"image","content"+ (i+10)));
files.add(new FileCustom(String.valueOf((i+20)),"檔案"+(i+20),"radio","content"+ (i+20)));
files.add(new FileCustom(String.valueOf((i+30)),"檔案"+(i+30),"video","content"+ (i+30)));
}
for(FileCustom file : files){
jdbcTemplate.update("insert into t_file (id,name,type,content,backedUp) values (?,?,?,?,?)",
new Object[]{file.getId(),file.getName(),file.getType(),file.getContent(),file.getBackedUp()});
}
}
}
啟動Spring boot的main方法
ElasticJobApp
,并檢視控制台:
可以看出,text、image、radio、vedio四個分片被分布到這一個執行個體中執行。
分片彈性擴容縮容機制測試:
elastic-job的分片是通過zookeeper來實作的。分片由主節點配置設定,如下三種情況都會觸發主節點上的分片算法執行:
新的Job執行個體加入叢集 現有的Job執行個體下線(如果下線的是leader節點,那麼先選舉然後觸發分片算法的執行) 主節點選舉
測試1:同時啟動兩個控制台ElasticJobApp 和ElasticJobApp(1)
檢視控制台輸出可以得出如下結論: 1、任務運作期間,如果有新機器加入,則會立刻觸發分片機制,将任務相對 平均的配置設定到每台機器上并行執行排程。 2、如果有機器退出叢集,則經過短暫的一段時間(大約40秒)後又會重 新觸發分片機制
如果在設定zookeeper注冊中心時,設定了session逾時時間100 毫秒,則下次任務前就會觸發分片
如果在sessionTimeoutMs的時間段之内觸發任務,則異常分片的任務會丢失。
舉個例子:假如 sessionTimeoutMs被設定成1分鐘,而本身的任務是30秒執行一次,
有三個任務執行個體在三台機器各自執行分片 1,2,3。當分片3所在的機器出現問題,
和zookeeper斷開了,那麼zookeeper節點失效至少要到1分鐘以後。
期間30 秒執行一次的任務分片3,至少會少執行一次。1分鐘過後,zookeeper節點失效,
觸發 ListenServersChangedJobListener類的dataChanged方法,
在這裡方法中判斷instance節點變化,然後通過方法 shardingService.setReshardingFlag
設定重新分片标志位,下次執行任務的時候,leader節點重新配置設定分片,
分片 3就會轉移到其他好的機器上。
4.2.3 作業配置說明
注冊中心配置
ZookeeperConfiguration屬性詳細說明
作業配置
作業配置分為3級,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。 LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,層層嵌 套。 JobTypeConfiguration根據不同實作類型分為SimpleJobConfiguration,DataflowJobConfiguration和 ScriptJobConfiguration。
4.2.4 作業分片政策
AverageAllocationJobShardingStrategy
全路徑:
政策說明:
基于平均配置設定算法的分片政策,也是預設的分片政策。
如果分片不能整除,則不能整除的多餘分片将依次追加到序号小的伺服器。
如:如果有3台伺服器,分成9片,則每台伺服器分到的分片是:
1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3台伺服器,分成8片,則每台伺服器分到的分片是:
1=[0,1,6], 2=[2,3,7], 3=[4,5] 如果有3台伺服器,分成10片,
則每台伺服器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
OdevitySortByNameJobShardingStrategy
全路徑:
政策說明:
根據作業名的哈希值奇偶數決定IP升降序算法的分片政策。
作業名的哈希值為奇數則IP升序。
作業名的哈希值為偶數則IP降序。
用于不同的作業平均配置設定負載至不同的伺服器。
AverageAllocationJobShardingStrategy的缺點是,一旦分片數小于作業伺服器數,
作業将永遠配置設定至IP位址靠前 的伺服器,導緻IP位址靠後的伺服器空閑。
而OdevitySortByNameJobShardingStrategy則可以根據作業名稱重新配置設定伺服器負載。
如:如果有3台伺服器,分成2片,作業名稱的哈希值為奇數,
則每台伺服器分到的分片是:1=[0], 2=[1], 3=[] 如果有3台伺服器,分成2片,
作業名稱的哈希值為偶數,則每台伺服器分到的分片是:3=[0], 2=[1], 1=[]
RotateServerByNameJobShardingStrategy
全路徑:
政策說明:
根據作業名的哈希值對伺服器清單進行輪轉的分片政策。
配置分片政策
與配置通常的作業屬性相同,在spring命名空間或者JobConfiguration中配置
jobShardingStrategyClass屬性,屬 性值是作業分片政策類的全路徑。
分片政策配置xml方式:
4.3 Dataflow類型定時任務
- Dataflow類型的定時任務需實作DataflowJob接口,該接口提供2個方法可供覆寫,分别用于抓取(fetchData)和處理(processData)資料。咱們繼續對例子進行改造。
- Dataflow類型用于
,它和SimpleJob不同,它以資料流的方式執行,調用處理資料流
,直到抓取不到資料才停止作業。fetchData抓取資料
新增FileBackupDataFlowJob:
/*
檔案備份任務
*/
@Component
public class FileBackupJobDataFlow implements DataflowJob<FileCustom> {
//檔案清單(模拟)
public static List<FileCustom> files=new ArrayList<>();
//每次任務執行要備份檔案的數量
private final int FETCH_SIZE=1;
@Autowired
private FileService fileService;
//抓取資料
@Override
public List<FileCustom> fetchData(ShardingContext shardingContext) {
System.out.println("作業分片資訊:"+shardingContext.getShardingItem());
//分片參數 0=text,1=image,2=radio,3=video
//擷取分片參數
String jobParameter = shardingContext.getShardingParameter();
//1.擷取未備份檔案
List<FileCustom> fileCustoms = fetchUnBackupFiles(jobParameter,FETCH_SIZE);
return fileCustoms;
}
//處理資料
@Override
public void processData(ShardingContext shardingContext, List<FileCustom> list) {
//進行檔案備份
backupFiles(list);
}
//擷取未備份檔案
public List<FileCustom> fetchUnBackupFiles(String fileType,int count){
List<FileCustom> fileCustoms = fileService.fetchUnBackupFiles(fileType, count);
System.out.printf("time:%s,擷取檔案%d個\n", LocalDateTime.now(),fileCustoms.size());
return fileCustoms;
}
/*
檔案備份
*/
public void backupFiles(List<FileCustom> files){
fileService.backupFiles(files);
}
}
ElasticJobConfig修改配置:
@Configuration
public class ElasticJobConfig {
/* @Autowired
FileBackupJobDb fileBackupJob;*/
@Autowired
FileBackupJobDataFlow fileBackupJob;
@Autowired
CoordinatorRegistryCenter registryCenter;
@Bean(initMethod="init")
public SpringJobScheduler initSimpleElasticJob(){
LiteJobConfiguration jobConfiguration = createFlowJobConfiguration(fileBackupJob.getClass(), "0/10 * * * * ?", 4, "0=text,1=image,2=radio,3=video");
SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileBackupJob, registryCenter,jobConfiguration);
return springJobScheduler;
}
/**
* 配置任務詳細資訊
* @param jobClass 任務執行類
* @param cron 執行政策
* @param shardingTotalCount 分片數量
* @param shardingItemParameters 分片個性化參數
* @return
*/
/* private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob>
jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters) {
//建立JobCoreConfiguration 每3秒鐘啟動一次 分片數量3
JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
if(StringUtils.isNotEmpty(shardingItemParameters)){
builder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration configuration=builder.build();
//建立SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(configuration,jobClass.getCanonicalName());
//啟動任務
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
return liteJobConfiguration;
}*/
//建立支援dataflow類型的作業的配置資訊
private LiteJobConfiguration createFlowJobConfiguration(final Class<? extends ElasticJob>
jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters) {
//建立JobCoreConfiguration 每3秒鐘啟動一次 分片數量3
JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
if(StringUtils.isNotEmpty(shardingItemParameters)){
builder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration configuration=builder.build();
//建立DataflowJobConfiguration
DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(configuration,jobClass.getCanonicalName(),true);
//啟動任務
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(dataflowJobConfiguration).overwrite(true).build();
return liteJobConfiguration;
}
}
啟動應用後,日志輸出如下:
從輸出日志可以看出,每次運作定時任務都會開啟4個線程執行fetchData抓取資料,
抓取以後調用processData處理資料,如果是流式處理資料
(new DataflowJobConfiguration第三個參數為true)且fetchData方法的傳回值為
null或集合長度為空時,作業才停止處理。
5. Elastic-Job進階
5.1 事件追蹤
Elastic-Job-Lite在配置中提供了JobEventConfiguration,支援資料庫方式配置,會在資料庫中自動建立 JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG兩張表以及若幹索引,來記錄作業的相關資訊。
5.1.1 修改Elastic-Job配置類
在ElasticJobConfig中修改:
@Configuration
public class ElasticJobConfig {
/* @Autowired
FileBackupJobDb fileBackupJob;*/
@Autowired
DataSource dataSource;//資料源已存在
@Autowired
FileBackupJobDb fileBackupJob;
@Autowired
CoordinatorRegistryCenter registryCenter;
@Bean(initMethod="init")
public SpringJobScheduler initSimpleElasticJob(){
LiteJobConfiguration jobConfiguration = createJobConfiguration(fileBackupJob.getClass(), "0/10 * * * * ?", 4, "0=text,1=image,2=radio,3=video");
//增加任務事件追蹤配置
JobEventConfiguration jobEventConfig=new JobEventRdbConfiguration(dataSource);
SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileBackupJob, registryCenter,jobConfiguration,jobEventConfig);
return springJobScheduler;
}
/**
* 配置任務詳細資訊
* @param jobClass 任務執行類
* @param cron 執行政策
* @param shardingTotalCount 分片數量
* @param shardingItemParameters 分片個性化參數
* @return
*/
private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob>
jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters) {
//建立JobCoreConfiguration 每3秒鐘啟動一次 分片數量3
JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
if(StringUtils.isNotEmpty(shardingItemParameters)){
builder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration configuration=builder.build();
//建立SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(configuration,jobClass.getCanonicalName());
//啟動任務
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
.monitorPort(9888)//設定dump端口
.build();
return liteJobConfiguration;
}
}
5.1.2 啟動項目
啟動後會發現在elastic_job_demo資料庫中新增以下兩個表。
job_execution_log:
job_status_trace_log:
5.2 運維
elastic-job中提供了一個elastic-job-lite-console控制台
設計理念
- 本控制台和Elastic Job并無直接關系,是通過
展現作業狀态,或更新注冊中心 資料修改全局配置。 2. 控制台隻能控制作業本身是否運作,但不能控制作業程序的啟停,因為控制台和作業本身伺服器是完全分布 式的,讀取Elastic Job的注冊中心資料
。控制台并不能控制作業伺服器
主要功能
- 檢視作業以及伺服器狀态
- 快捷的修改以及删除作業設定
- 啟用和禁用作業
- 跨注冊中心檢視作業
- 檢視作業運作軌迹和運作狀态
不支援項
-
添加作業。
因為作業都是在首次運作時自動添加,使用控制台添加作業并無必要。直接在作業伺服器啟動包 含Elastic Job的作業程序即可
具體搭建步驟如下:
5.2.1 搭建
下載下傳位址:https://raw.githubusercontent.com/miguangying/elastic-job-lite-console/master/elastic-job-liteconsole-2.1.4.tar.gz
解壓縮 elastic-job-lite-console-${version}.tar.gz 。
5.2.2 配置及使用
1、 配置注冊中心位址
先啟動zookeeper 然後在注冊中心配置界面 點添加
點選送出後,然後點連接配接(zookeeper必須處于啟動狀态)
連接配接成功後,在作業次元下可以顯示該命名空間下作業名稱、分片數量及該作業的cron表達式等資訊
在伺服器次元可以檢視伺服器ip、目前運作的執行個體數、作業總數等資訊。
2、配置事件追蹤資料源
在事件追蹤資料源配置頁面點添加按鈕,輸入相關資訊
送出後點選連接配接即可在作業曆史下檢視作業曆史記錄
5.3 dump指令
使用Elastic-Job-Lite過程中可能會碰到一些問題,導緻作業運作不穩定。由于無法在生産環境調試,通過dump命 令可以把作業内部相關資訊dump出來,友善開發者debug分析。
(1)開啟dump監控端口,并運作程式 修改中ElasticJobConfig中的createJobConfiguration方法裡JobRootConfiguration的配置,開啟dump監控端口:
會在目前目錄生成job_debug_dump.txt檔案,打開job_debug_dump.txt後看到:
6. 課程總結
重要知識點回顧:
- 什麼是任務排程?
- 任務排程的應用場景?
- 什麼是分布式任務排程?
- 分布式任務排程需要解決那些問題?這些問題的大緻解決思路?
- Elastic-Job是什麼?
- Zookeeper在Elastic-Job整個架構中起到了什麼作用? Elastic-Job分片的概念?分片是為了解決什麼問題?
- Elastic-Job主要的配置類有哪些?各職責?
- Dataflow任務類型和SimpleJob類型有什麼不同?