azkaban 可以作为hadoop 任务的调度工具,也可以作为shell任务和java任务的调度工具。安装过程有点繁琐,见安装文档。
Job Type
见链接
https://azkaban.github.io/
任务DAG
通过任务之间的依赖关系(dependencies)构建DAG
下面是一个 upload.job 的例子, 依赖 report-en.job
type=java
#指定类的全路径
job.class=com.example.demo.task.BbUploadJob
#指定执行jar包的路径
classpath=lib/*
#依赖任务
dependencies=report-en
#jvm 参数
Xmx=M
# 自定义参数
batch.timestamp=${azkaban.flow.start.timestamp}
schedule
azkaban 目前包含Quartz,支持 Cron 表达式
java job
JavaJob 目前需要配置Hadoop,但是可以通过修改源代码来去掉 Hadoop 的依赖关系,参考安装过程。
Java Job template
- run 相当于 main 方法,任务代码写在这里
- cancel 在 run 方法出现 Exception 之后调用,任务失败后处理
- getJobGeneratedProperties 是输出的参数,用于给下一个任务传递参数
import azkaban.utils.Props;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JavaJob1 {
private static final Logger logger = LoggerFactory.getLogger(JavaJob1.class);
private Props props;
public JavaJob1(String name, Props props) {
this.props = props;
}
public void run() {
String timestamp = props.getString("azkaban.flow.start.timestamp");
logger.info("timestamp value is ==> " + timestamp);
}
public void cancel(){
}
public Props getJobGeneratedProperties(){
Props props = new Props();
return props;
}
}
java job 中使用 Spring
在JavaJob 中可以使用Spring,和普通的Java代码一模一样
this.classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
this.reportService = (ReportService) this.classPathXmlApplicationContext.getBean("reportService");
其他问题
邮件通知
azkaban 原生就支持通知功能,在安装的时候配置smtp服务器,在job文件中里配置failure.emails, success.emails, notify.emails 来通知任务执行情况(多个邮箱地址用逗号分隔)
任务之间参数传递
azkaban支持任务之间传递参数,A任务可以向依赖A的任务B传递参数。实际上是通过读写临时文件来实现这个功能。
System.getenv("JOB_OUTPUT_PROP_FILE") // 任务输出的参数文件
System.getenv("JOB_PROP_FILE") // 任务初始化的参数文件
B任务初始化的参数文件中会包含A任务输出的参数,这是azkaban帮我们做的。
JavaJob 更加方便,只要在Job中增加一个getJobGeneratedProperties方法,返回Props对象,然后B任务可以直接在初始化的Props中读取到这个参数。
public Props getJobGeneratedProperties(){
Props props = new Props();
props.put("demo.test.arg1", "Hello World!");
return props;
}