天天看點

[AirFlow]AirFlow使用指南三 第一個DAG示例

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/SunnyYoona/article/details/76615699

經過前兩篇文章的簡單介紹之後,我們安裝了自己的AirFlow以及簡單了解了DAG的定義檔案.現在我們要實作自己的一個DAG.

1. 啟動Web伺服器 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/AirFlow/%5BAirFlow%5DAirFlow%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97%E4%B8%89%20%E7%AC%AC%E4%B8%80%E4%B8%AADAG.md#1-web

使用如下指令啟用:

airflow webserver
           

現在可以通過将浏覽器導航到啟動Airflow的主機上的8080端口來通路Airflow UI,例如:http://localhost:8080/admin/

備注

Airflow附帶了許多示例DAG。 請注意,在你自己的`dags_folder`中至少有一個DAG定義檔案之前,這些示例可能無法正常工作。你可以通過更改`airflow.cfg`中的`load_examples`設定來隐藏示例DAG。
           

2. 第一個AirFlow DAG http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/AirFlow/%5BAirFlow%5DAirFlow%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97%E4%B8%89%20%E7%AC%AC%E4%B8%80%E4%B8%AADAG.md#2-airflow-dag

現在一切都準備好了,我們開始寫一些代碼,來實作我們的第一個DAG。 我們将首先建立一個Hello World工作流程,其中除了向日志發送"Hello world!"之外什麼都不做。

建立你的

dags_folder

,那就是你的DAG定義檔案存儲目錄---

$AIRFLOW_HOME/dags

。在該目錄中建立一個名為hello_world.py的檔案。

AIRFLOW_HOME
├── airflow.cfg
├── airflow.db
├── airflow-webserver.pid
├── dags
│   ├── hello_world.py
│   └── hello_world.pyc
└── unittests.cfg
           

将以下代碼添加到

dags/hello_world.py

中:

# -*- coding: utf-8 -*-

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

#-------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

default_args = {
    'owner': 'jifeng.si',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

#-------------------------------------------------------------------------------
# dag

dag = DAG(
    'example_hello_world_dag',
    default_args=default_args,
    description='my first DAG',
    schedule_interval=timedelta(days=1))

#-------------------------------------------------------------------------------
# first operator

date_operator = BashOperator(
    task_id='date_task',
    bash_command='date',
    dag=dag)

#-------------------------------------------------------------------------------
# second operator

sleep_operator = BashOperator(
    task_id='sleep_task',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag)

#-------------------------------------------------------------------------------
# third operator

def print_hello():
    return 'Hello world!'

hello_operator = PythonOperator(
    task_id='hello_task',
    python_callable=print_hello,
    dag=dag)

#-------------------------------------------------------------------------------
# dependencies

sleep_operator.set_upstream(date_operator)
hello_operator.set_upstream(date_operator)
           

該檔案建立一個簡單的DAG,隻有三個運算符,兩個BaseOperator(一個列印日期一個休眠5秒),另一個為PythonOperator在執行任務時調用print_hello函數。

3. 測試代碼 http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/AirFlow/%5BAirFlow%5DAirFlow%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97%E4%B8%89%20%E7%AC%AC%E4%B8%80%E4%B8%AADAG.md#3

使用如下指令測試一下我們寫的代碼的正确性:

python ~/opt/airflow/dags/hello_world.py
           

如果你的腳本沒有抛出異常,這意味着你代碼中沒有錯誤,并且你的Airflow環境是健全的。

下面測試一下我們的DAG中的Task.使用如下指令檢視我們

example_hello_world_dag

DAG下有什麼Task:

xiaosi@yoona:~$ airflow list_tasks example_hello_world_dag
[2017-08-03 11:41:57,097] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-08-03 11:41:57,220] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-08-03 11:41:57,241] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-08-03 11:41:57,490] {models.py:167} INFO - Filling up the DagBag from /home/xiaosi/opt/airflow/dags
date_task
hello_task
sleep_task
           

可以看到我們有三個Task:

date_task
hello_task
sleep_task
           

下面分别測試一下這幾個Task:

(1) 測試date_task

xiaosi@yoona:~$ airflow test example_hello_world_dag date_task 20170803
...
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2017-08-03 11:44:02,248] {models.py:1342} INFO - Executing <Task(BashOperator): date_task> on 2017-08-03 00:00:00
[2017-08-03 11:44:02,258] {bash_operator.py:71} INFO - tmp dir root location:
/tmp
[2017-08-03 11:44:02,259] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpxh6da9//tmp/airflowtmpxh6da9/date_tasktQQB0V
[2017-08-03 11:44:02,259] {bash_operator.py:81} INFO - Running command: date
[2017-08-03 11:44:02,264] {bash_operator.py:90} INFO - Output:
[2017-08-03 11:44:02,265] {bash_operator.py:94} INFO - 2017年 08月 03日 星期四 11:44:02 CST
[2017-08-03 11:44:02,266] {bash_operator.py:97} INFO - Command exited with return code 0
           

(2) 測試hello_task

xiaosi@yoona:~$ airflow test example_hello_world_dag hello_task 20170803
...
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2017-08-03 11:45:29,546] {models.py:1342} INFO - Executing <Task(PythonOperator): hello_task> on 2017-08-03 00:00:00
[2017-08-03 11:45:29,551] {python_operator.py:81} INFO - Done. Returned value was: Hello world!
           

(3) 測試sleep_task

xiaosi@yoona:~$ airflow test example_hello_world_dag sleep_task 20170803
...
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2017-08-03 11:46:23,970] {models.py:1342} INFO - Executing <Task(BashOperator): sleep_task> on 2017-08-03 00:00:00
[2017-08-03 11:46:23,981] {bash_operator.py:71} INFO - tmp dir root location:
/tmp
[2017-08-03 11:46:23,983] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpsuamQx//tmp/airflowtmpsuamQx/sleep_taskuKYlrh
[2017-08-03 11:46:23,983] {bash_operator.py:81} INFO - Running command: sleep 5
[2017-08-03 11:46:23,988] {bash_operator.py:90} INFO - Output:
[2017-08-03 11:46:28,990] {bash_operator.py:97} INFO - Command exited with return code 0
           

如果沒有問題,我們就可以運作我們的DAG了.

4. 運作DAG http://gitlab.corp.qunar.com/jifeng.si/learningnotes/blob/master/IT/%E5%A4%A7%E6%95%B0%E6%8D%AE/AirFlow/%5BAirFlow%5DAirFlow%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97%E4%B8%89%20%E7%AC%AC%E4%B8%80%E4%B8%AADAG.md#4-dag

為了運作你的DAG,打開另一個終端,并通過如下指令來啟動Airflow排程程式:

airflow scheduler
           
排程程式将發送任務進行執行。預設Airflow設定依賴于一個名為`SequentialExecutor`的執行器,它由排程程式自動啟動。在生産中,你可以使用更強大的執行器,如`CeleryExecutor`。
           

當你在浏覽器中重新加載Airflow UI時,應該會在Airflow UI中看到你的

hello_world

 DAG。

為了啟動DAG Run,首先打開工作流(off鍵),然後單擊

Trigger Dag

按鈕(Links 第一個按鈕),最後單擊

Graph View

按鈕(Links 第三個按鈕)以檢視運作進度:

你可以重新加載圖形視圖,直到兩個任務達到狀态成功。完成後,你可以單擊hello_task,然後單擊

View Log

檢視日志。如果一切都按預期工作,日志應該顯示一些行,其中之一是這樣的:

...
[2017-08-03 09:46:43,213] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-08-03 09:46:43,213] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 2
[2017-08-03 09:46:43,214] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-08-03 09:46:43,214] {base_task_runner.py:95} INFO - Subtask:
[2017-08-03 09:46:43,228] {base_task_runner.py:95} INFO - Subtask: [2017-08-03 09:46:43,228] {models.py:1342} INFO - Executing <Task(PythonOperator): hello_task> on 2017-08-03 09:45:49.070859
[2017-08-03 09:46:43,236] {base_task_runner.py:95} INFO - Subtask: [2017-08-03 09:46:43,235] {python_operator.py:81} INFO - Done. Returned value was: Hello world!
[2017-08-03 09:46:47,378] {jobs.py:2083} INFO - Task exited with return code 0