天天看點

排程系統之Airflow

一、Airflow簡介

Airflow 是一個使用 Python 語言編寫的 Data Pipeline 排程和監控工作流的平台。

Airflow 是通過 DAG(Directed acyclic graph 有向無環圖)來管理任務流程的任務排程工具,不需要知道業務資料的具體内容,設定任務的依賴關系即可實作任務排程。

這個平台擁有和 Hive、Presto、MySQL、HDFS、Postgres 等資料源之間互動的能力,并且提供了鈎子(hook)使其擁有很好地擴充性。除了使用指令行,該工具還提供了一個 WebUI 可以可視化的檢視依賴關系、監控進度、觸發任務等。

排程系統之Airflow
排程系統之Airflow

Airflow 的架構

在一個可擴充的生産環境中,Airflow 含有以下元件:

中繼資料庫:這個資料庫存儲有關任務狀态的資訊。

排程器:Scheduler 是一種使用 DAG 定義結合中繼資料中的任務狀态來決定哪些任務需要被執行以及任務執行優先級的過程。排程器通常作為服務運作。

執行器:Executor 是一個消息隊列程序,它被綁定到排程器中,用于确定實際執行每個任務計劃的工作程序。有不同類型的執行器,每個執行器都使用一個指定工作程序的類來執行任務。例如,LocalExecutor 使用與排程器程序在同一台機器上運作的并行程序執行任務。其他像 CeleryExecutor 的執行器使用存在于獨立的工作機器叢集中的工作程序執行任務。

Workers:這些是實際執行任務邏輯的程序,由正在使用的執行器确定。

排程系統之Airflow
排程系統之Airflow
排程系統之Airflow

Airflow 解決哪些問題

通常,在一個運維系統,資料分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限于:時間依賴:任務需要等待某一個時間點觸發。外部系統依賴:任務依賴外部系統需要調用接口去通路。任務間依賴:任務 A 需要在任務 B 完成後啟動,兩個任務互相間會産生影響。資源環境依賴:任務消耗資源非常多, 或者隻能在特定的機器上執行。crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。Airflow 是一種 WMS,即:它将任務以及它們的依賴看作代碼,按照那些計劃規範任務執行,并在實際工作程序之間分發需執行的任務。Airflow 提供了一個用于顯示目前活動任務和過去任務狀态的優秀 UI,并允許使用者手動管理任務的執行和狀态。Airflow 中的工作流是具有方向性依賴的任務集合。具體說就是 Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。DAG 中的每個節點都是一個任務,DAG 中的邊表示的是任務之間的依賴(強制為有向無環,是以不會出現循環依賴,進而導緻無限執行循環)。Airflow 在 ETL 上的實踐ETL,是英文 Extract,Transform,Load 的縮寫,用來描述将資料從來源端經過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。ETL 一詞較常用在資料倉庫,Airflow 在解決 ETL 任務各種依賴問題上的能力恰恰是我們所需要的。在現階段的實踐中,我們使用 Airflow 來同步各個資料源資料到數倉,同時定時執行一些批處理任務及帶有資料依賴、資源依賴關系的計算腳本。本文立意于科普介紹,故在後面的用例中隻介紹了 BashOperator,PythonOperator這倆個最為易用且在我們日常使用中最為常見的 Operator。Airflow 同時也具有不錯的叢集擴充能力,可使用 CeleryExecuter 以及多個 Pool 來提高任務并發度。Airflow在 CeleryExecuter 下可以使用不同的使用者啟動 Worker,不同的 Worker 監聽不同的 Queue,這樣可以解決使用者權限依賴問題。Worker 也可以啟動在多個不同的機器上,解決機器依賴的問題。Airflow 可以為任意一個 Task 指定一個抽象的 Pool,每個 Pool 可以指定一個 Slot 數。每當一個 Task 啟動時,就占用一個 Slot,當 Slot 數占滿時,其餘的任務就處于等待狀态。這樣就解決了資源依賴問題。

排程系統之Airflow
排程系統之Airflow

二、安裝及使用

假設:你已經安裝好了 Python 及配置好了其包管理工具 pip。

1、安裝airflow

pip install apache-airflow      

在安裝airflow的時候可能會報錯:

Cannot uninstall 'PyYAML'. It is a distutils installed project and thus we cannot      

忽略掉 ​

​PyYAML​

# 親測可用
pip install apache-airflow --ignore-installed PyYAML      

安裝成功後檢視指令:

[root@quant ~]# airflow -h
usage: airflow [-h] GROUP_OR_COMMAND ...

positional arguments:
  GROUP_OR_COMMAND

    Groups:
      celery         Celery components
      config         View configuration
      connections    Manage connections
      dags           Manage DAGs
      db             Database operations
      kubernetes     Tools to help run the KubernetesExecutor
      pools          Manage pools
      providers      Display providers
      roles          Manage roles
      tasks          Manage tasks
      users          Manage users
      variables      Manage variables

    Commands:
      cheat-sheet    Display cheat sheet
      info           Show information about current Airflow and environment
      kerberos       Start a kerberos ticket renewer
      plugins        Dump information about loaded plugins
      rotate-fernet-key
                     Rotate encrypted connection credentials and variables
      scheduler      Start a scheduler instance
      sync-perm      Update permissions for existing roles and DAGs
      version        Show the version
      webserver      Start a Airflow webserver instance

optional arguments:
  -h, --help         show this help message and exit
[root@quant ~]#      

2、初始化資料庫

# initialize the database
airflow db init      

報這樣的錯誤:

ImportError: Something is wrong with the numpy installation. While importing we detected an older version of numpy      

解決方案:

如報錯資訊所說

先解除安裝numpy:pip uninstall numpy

再解除安裝numpy,直到解除安裝到提示資訊顯示,此時完全已經沒有numpy了為止

下載下傳numpy:pip install numpy

此時應該可用;

若不可用,檢視python安裝目錄下的libs檔案夾,删除掉其中的另一個dll檔案,應該可用。

3、添加使用者

airflow users create \
    --username admin \
    --firstname Corwien \
    --lastname Wong \
    --role Admin \
    --email [email protected]      
排程系統之Airflow

建立的使用者密碼為:​

​quant​

4、啟動web服務

# start the web server, default port is 8080
airflow webserver --port 8080a      
排程系統之Airflow

5、啟動定時任務

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler

# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page      
排程系統之Airflow