azkaban 可以作為hadoop 任務的排程工具,也可以作為shell任務和java任務的排程工具。安裝過程有點繁瑣,見安裝文檔。
Job Type
見連結
https://azkaban.github.io/
任務DAG
通過任務之間的依賴關系(dependencies)建構DAG
下面是一個 upload.job 的例子, 依賴 report-en.job
type=java
#指定類的全路徑
job.class=com.example.demo.task.BbUploadJob
#指定執行jar包的路徑
classpath=lib/*
#依賴任務
dependencies=report-en
#jvm 參數
Xmx=M
# 自定義參數
batch.timestamp=${azkaban.flow.start.timestamp}
schedule
azkaban 目前包含Quartz,支援 Cron 表達式
java job
JavaJob 目前需要配置Hadoop,但是可以通過修改源代碼來去掉 Hadoop 的依賴關系,參考安裝過程。
Java Job template
- run 相當于 main 方法,任務代碼寫在這裡
- cancel 在 run 方法出現 Exception 之後調用,任務失敗後處理
- getJobGeneratedProperties 是輸出的參數,用于給下一個任務傳遞參數
import azkaban.utils.Props;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JavaJob1 {
private static final Logger logger = LoggerFactory.getLogger(JavaJob1.class);
private Props props;
public JavaJob1(String name, Props props) {
this.props = props;
}
public void run() {
String timestamp = props.getString("azkaban.flow.start.timestamp");
logger.info("timestamp value is ==> " + timestamp);
}
public void cancel(){
}
public Props getJobGeneratedProperties(){
Props props = new Props();
return props;
}
}
java job 中使用 Spring
在JavaJob 中可以使用Spring,和普通的Java代碼一模一樣
this.classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
this.reportService = (ReportService) this.classPathXmlApplicationContext.getBean("reportService");
其他問題
郵件通知
azkaban 原生就支援通知功能,在安裝的時候配置smtp伺服器,在job檔案中裡配置failure.emails, success.emails, notify.emails 來通知任務執行情況(多個郵箱位址用逗号分隔)
任務之間參數傳遞
azkaban支援任務之間傳遞參數,A任務可以向依賴A的任務B傳遞參數。實際上是通過讀寫臨時檔案來實作這個功能。
System.getenv("JOB_OUTPUT_PROP_FILE") // 任務輸出的參數檔案
System.getenv("JOB_PROP_FILE") // 任務初始化的參數檔案
B任務初始化的參數檔案中會包含A任務輸出的參數,這是azkaban幫我們做的。
JavaJob 更加友善,隻要在Job中增加一個getJobGeneratedProperties方法,傳回Props對象,然後B任務可以直接在初始化的Props中讀取到這個參數。
public Props getJobGeneratedProperties(){
Props props = new Props();
props.put("demo.test.arg1", "Hello World!");
return props;
}