天天看點

工作流排程器Azkaban學習

1、Azkaban是什麼

        我們在工作中應該都遇到過這樣的場景:有一個任務,這個任務可以劃分成多個較小的任務完成,之是以進行劃分是因為小任務之間可以并發的進行,例如是一個shell腳本執行的指令吧,大任務A可以劃分成B、C、D、E四個子任務(腳本)完成,而B和C是可以同時進行的,D依賴B和C的輸出,E又依賴D的輸出,于是我們一般的做法可能就是開兩個終端同時執行B和C,等兩個都執行完成之後再執行D,接着在執行E。整個執行的過程都需要我們參與,但是整個的執行過程類似一個有向無環圖,每一個子任務的執行可以看作整個任務的一個流,我們可以同時從沒有入度的節點開始執行,任何沒有流向(兩個節點之間沒有通路)關系節點都可以并行得執行,人為的控制難免就有點力不從心了(因為很多任務都需要在深夜執行,一般我們都是寫腳本并設定cron),這時候我們需要的就是一個工作流排程器。         Azkaban就是完成這種任務的(其實主要還是用于對hadoop生态圈的任務的支援),它是由Linkedin實作并開源的,主要用于在一個工作流内以一個特定的順序運作一組工作和流程,它的配置是通過簡單的key:value對的方式,通過配置中的dependencies來設定依賴關系,這個依賴關系必須是無環的,否則會被視為無效的工作流。 Azkaban有如下功能特點:

  • Web使用者界面
  • 友善上傳工作流
  • 友善設定任務之間的關系
  • 排程工作流
  • 認證/授權(權限的工作)
  • 能夠殺死并重新啟動工作流
  • 子產品化和可插拔的插件機制
  • 項目工作區
  • 工作流和任務的日志記錄和審計

      我覺得這些都是一些主流的工作流排程器應該支援的功能,我覺得azkaban的web頁面做得比較好,這樣可以大大降低管理成本,它支援的任務排程類型是基于插件的,這也就使得我們可以實作自己的插件來完成特定的需求。另外,它還能夠在任務完成、失敗、成功的時候發送email,支援SLA設定等功能,總體來說,功能還是很強大的。

2、安裝部署

      azkaban分為三個組建:mysql伺服器、web伺服器和executor伺服器,其中mysql用于存儲一些項目以及執行計劃(所有任務的屬性資訊、執行計劃、執行的結果以及輸出),每次執行情況等資訊;web伺服器使用Jetty對外提供web服務,是使用者可以通過web頁面友善的管理;執行伺服器是負責具體的工作流的送出,執行,可以啟動多個執行伺服器,它們通過mysql資料庫來協調任務的執行。

工作流排程器Azkaban學習

      首先需要從官網上下載下傳各個子產品,都是二進制的安裝包格式,當然也可以使用源碼編譯,下載下傳位址:http://azkaban.github.io/downloads.html 接下來的安裝過程可以參考:http://blog.javachen.com/2014/08/25/install-azkaban/ 因為web用戶端是通過https的方式進行通路的,是以這裡需要建立一個keystore證書檔案,使用指令:keytool -keystore keystore -alias jetty -genkey -keyalg RSA,按照提示輸入需要的資訊,最後輸入的”輸入 <jetty> 的密鑰密碼“可以和密鑰庫密碼一樣,并且需要在web伺服器的配置檔案azkaban.properties中修改Jetty伺服器的屬性,其中 jetty.keystore=keystore jetty.password=redhat jetty.keypassword=redhat jetty.truststore=keystore jetty.trustpassword=redhat       設定為你生成的證書檔案的資訊。接着就可以在浏覽器中輸入https://ip:8443通路azkaban了(登入的使用者名和密碼是在web伺服器的user配置檔案中設定的,這裡我們使用的是admin)。

3、測試

      這裡我們進行簡單的測試,由于azkaban原生是支援shell指令(是以也就可以支援shell腳本以及python等其他腳本程式)的,是以可以使用簡單的shell指令進行測試,我們建立4個子任務,每一個子任務的配置都是任務名.job檔案。它們的配置如下: test.job

type=command

command=sleep 3

command.1=echo "Hello World"

start.job

type=command

command=sleep 5

command.1=echo "start execute"

sleep.job

type=command

dependencies=test, start

command=sleep 10

finish.job

type=command

dependencies=sleep

command=echo "finish"       這裡通過dependencies屬性來辨別該任務依賴的任務,可以有一個或者多個,通過","分割,這些任務的type都是command,azkaban也支援其它類型的指令類型,有些需要安裝插件才能支援。       然後我們将這四個job檔案放在一個目錄下壓縮成一個zip檔案,在Azkaban的web界面的首頁可以通過”Create Project“按鈕來建立新的一個工作流,輸入必要的資訊之後會進入到project界面,我們可以通過upload上傳我們要執行的任務流,可以重複upload進行覆寫。但是之前的任務流的執行結果不會被覆寫。如果工作流的配置有問題(例如出現互相依賴),上傳會不成功,但是沒有看到提示。等待壓縮檔案上傳成功之後,我們可以通過界面檢視各個任務的依賴關系圖:

工作流排程器Azkaban學習

      可以通過”Execute Flow“按鈕來啟動一個工作流的一次執行,點選之後會進入配置界面,包括其中包括”Flow View“、”Notification“、”Failure Options“、”Concurrent“、”Flow Parameters“,另外還需要注意的是左下角的Schedule按鈕,這裡可以設定工作流的定時執行。注意,這裡是每一次工作流執行的時候都需要設定的,目前沒有看到儲存曆史設定的情況,當然如果希望重複之前執行的一次設定的話可以找到之前的那次執行,然後再次運作(這時候還是需要進入配置頁面,但是會儲存那次運作的配置)。其中需要注意的是在”Failure Options“和”Concurrent“中的配置,他們分别配置了在工作流中一個任務執行失敗之後的處理和這個project的多次執行流(多次Execute)如果存在并行時的處理。我們在這裡不進行配置,直接執行指令: 送出之後會提示本次執行的id(我覺得這裡通過一個可識别的字元串進行标示會更好一些),這個id是全局唯一的,也就是說多個project的每一次執行都會遞增得到新的exec id。

這些配置項的作用如下: Flow View:可以激活或者取消其中的某一個job,這裡應該是在執行flow的時候執行或者不需要執行某個job再執行下面的jobs

Notification:設定執行完成(失敗或者成功執行)之後設定通知的email,這裡可以通過修改代碼添加其他的通知方式,例如短信等

Failure Options:這裡可以設定某一個job失敗之後的動作,目前有三個選項:1、等待着所有正在執行的job的完成(可能在一個flow中有多個job可以并行執行);2、全部取消,立即終止全部的job,flow執行失敗;3、盡可能的繼續執行,隻要它的依賴jobs能夠執行完成。 Concurrent:因為一個flow的執行時間可能比較長,這裡設定多個flow并發執行時的政策,有三個選項:1、順序執行,如果該flow正在執行則不再執行;2、并發執行,不管并發執行的flow;3、pipeline,有兩種政策,第一種是等到jobA需要等到前一個flow的jobA執行完成之後才能執行,第二種是jobA需要等到前一個flow中所有依賴A的job都執行完成之後才能執行。

Flow Parameters:設定flow執行的私有配置項

執行完成之後,可以通過web界面檢視每一個任務流的執行結果以及每一個子任務的執行結果。

工作流排程器Azkaban學習

      在Graph标簽下可以檢視每一個任務執行的情況、目前執行到哪一個任務了,Flow Log中會實時得輸出工作流的運作日志,點選每一個子任務可以檢視子任務的運作狀态以及實時輸出的日志資訊,總體來說還是非常友善的。

     這裡涉及的幾個概念:project、flow和job,首先一個project是一個要執行任務的整體,它可以包含多個flow,每一個project可以上傳一個.zip的檔案,flow之間是互相獨立的,但是有一個總的flow,這個flow可能引用其他的flow作為它執行的一部分(相當于總flow的一個子job,但是這個job是一個flow)。每一個flow包含多個job,這些job是互相獨立的,通過job檔案中dependencies設定依賴關系,每一個flow的結束job可以作為這個flow的辨別(flow名),我們可以通過這樣的方式将一個flow作為一個job加入到另外的flow中: jobGroup.job  type=flow

flow.name=finish

dependencies=realStart finish是之前定義的flow的辨別(因為它是終止job),這個flow作為一個job可以設定其他的依賴關系,下面是一個包含子flow的任務依賴圖:

工作流排程器Azkaban學習

我覺得之是以要設計成這樣是為了将每個flow獨立出來,友善flow的重用。

4、使用者管理

      azkaban中有使用者和使用者組的概念,使用者和使用者組以及權限的配置資訊儲存在配置檔案azkaban-users.xml中,認證的方式是由azkaban.user.XmlUserManager來實作的,具體的配置可以在azkaban.properties(web伺服器的conf下)進行配置:

Parameter Default
user.manager.class azkaban.user.XmlUserManager
user.manager.xml.file azkaban-users.xml

      我們在azkaban-users.xml可以配置三類内容:user、group和role,user項可以配置username、password、roles、group資訊,分别配置使用者名、密碼、使用者的權限以及所屬的組;group項可以配置name和roles,分别用于配置組名和這個組使用的權限;role定義了權限資訊,可以配置name和permissions,分别表示規則名和賦予的權限資訊。azkaban支援的權限包括:

Permissions Values
ADMIN 可以做任務事情,包括給其他使用者添加、修改權限
READ 隻能通路每一個project的内容和日志資訊
WRITE 可以在已建立的project上傳、修改任務的屬性,可以删除任何的project
EXECUTE 允許使用者執行任何的任務流
SCHEDULE 允許使用者添加、删除任何任務流的排程資訊
CREATEPROJECTS 在禁止建立project的情況下仍允許建立新的project

      這裡的權限設定沒有細化到每一個user在每一個project中,每一個使用者所擁有的權限可以在每一個project下面執行相同的操作,另外使用者和使用者組之間的權限資訊還不是很明确,如果使用使用者組作為權限的配置設定機關(即一個使用者組下的所有使用者擁有相同的權限),每個使用者再次指定權限就有點多餘了。

5、API

     azkaban也提供了API接口來使用,這樣可以基于azkaban實作自己的管理方式,這些接口是通過HTTPS的方式與web伺服器進行通信的,因為在azkaban中有使用者和權限的概念,是以在調用API之前需要登入,登入成功之後會傳回使用者一個session id,之後所有的操作都需要攜帶這個id以判斷使用者是否有權限。如果session id無效,那麼調用API會傳回"error" : "session"的資訊,如果不攜帶session.id參數,會傳回登陸界面的html檔案内容(有些session id的通路也會傳回這樣的内容)。azkaban提供的API包括:具體請參照官方文檔: http://azkaban.github.io/azkaban/docs/2.5/#ajax-api

1、Authenticate:使用者登入操作,需要攜帶使用者名和密碼,如果成功登入則傳回一個session id用于之後的請求。 2、Create a Project:建立一個新的project,這需要在任何關于這個project操作之前進行,需要輸入project的name作為這個project的唯一标示,還需要包含這個project的描述資訊,其實和在web頁面上建立project的輸入一樣。 3、Delete a Project:删除一個已經存在的project,該請求沒有回複資訊,需要輸入project的辨別。 4、Upload a Project Zip:上傳一個zip檔案到一個project,一般在建立一個project完成之後,之後的上傳将覆寫以前上傳的内容。 5、Fetch Flows of a Project:擷取一個project下的所有flow資訊,輸入需要指定project的辨別,一個project下面可能存在多個flow,輸出的flow隻包含flowId辨別每一個flow。 6、Fetch Jobs of a Flow:擷取一個flow下所有job的資訊,因為在API端每個指令都是獨立的,是以這裡需要輸入project的辨別和flow的辨別,輸出包含每一個job的資訊,包括job的辨別(id)、job類型以及這個job直接以來的job。 7、Fetch Executions of a Flow:擷取flow的執行情況,需要制定特定的project和flow,這個接口可以分頁傳回,是以需要制定start指定開始的index和length指定傳回的個數,因為每一個flow都可以單獨的或者作為其他flow的子flow執行,這裡傳回該flow指定區間内的每一次執行的資訊。每一個執行資訊包括起始時間、送出執行的使用者、執行的狀态、送出時間、這次執行在全局的id(遞增的execid),projectid、結束時間和flowId。 8、Fetch Running Executions of a Flow:擷取目前正在執行的flow的執行資訊,輸入包括project和flow的辨別,傳回的是該flow正在執行的所有執行id(全局的exec id)。 9、Execute a Flow:啟動一個flow的執行,這個輸入比較多,因為在web界面上每次啟動flow的執行都需要設定幾項配置,可以在該接口設定出了排程之外的乞讨配置資訊,輸入還需要包括project和flow的辨別,輸出為這個flow的id和本次執行的exec id 10、Cancel a Flow Execution:取消一次flow的執行,需要輸入的是全局的exec id,因為這個id是全局唯一的,那麼可以通過它來進行辨別,不需要再輸入project和flow的辨別了,如果這個執行已經結束,會傳回錯誤資訊。 11、Pause a Flow Execution:暫停一次執行,輸入為exec id。如果這個執行不是處于running狀态,會傳回錯誤資訊。 12、Resume a Flow Execution:重新啟動一次執行,輸入為exec id,如果這次執行已經在進行,不傳回任何錯誤,如果它不再運作則傳回錯誤資訊。 13、Fetch a Flow Execution:擷取一次執行的所有資訊,輸入為exec id,輸出包括這次執行的屬性(參見7),還包括這次執行的所有的job的執行情況。 14、Fetch Execution Job Logs:擷取一次執行中的一個job的執行日志,可以将job的執行日志作為一個檔案,這裡需要制定exec id、job的辨別以及讀取這個檔案内容的傳回(offset+length),傳回的為指定範圍的日志内容。 15、Fetch Flow Execution Updates:這個是傳回上次檢視之後每個任務的執行情況?這個有點疑惑。應該是在flow執行的時候執行進度的資訊擷取。

      從這裡的接口可以看出,azkaban提供的API隻能用于簡單建立project、flow,檢視project、flow、execute等操作,而web界面的操作要比這豐富得多,如果我們希望基于azkaban進行開發的話,在這些接口的基礎上,我覺得還可以對azkaban的資料庫進行分析,從資料庫中得到我們想要的資訊(基本的寫操作都能夠通過這些API實作,是以我們隻需要從資料庫中讀取)。但是這樣相對于使用API還是有個弊端,畢竟随着版本的更新資料庫的結構可能會發生變化,但是這也不失為一種方式。

6、總結

      好了,本文主要介紹了azkaban的安裝以及使用情況,但是它主要還是用來執行hadoop生态圈裡面的各種操作以及java程式的,但是簡單的使用還是讓我認識到這個工具的強大,但是我還是有一個疑問,azkaban的三個子產品的主要功能分别是:mysql用于資料的存儲,web伺服器用來更方面的使用和圖形化展示,executor才是真正的執行任務的伺服器,是以所有job的執行都需要在executor所在的機器上進行,job的執行時啟動一個子程序的方式(可以通過在job執行是檢視正在執行的job指令判斷),那麼這個executor需要安裝所有的支援的任務的工具、jar包等。如果是對于那種占用資源比較多的job(例如一個java程式CPU使用率達到100%),那麼就會對其他的job的執行有影響,是以這種架構的可擴充性是否有點欠缺?或者是由于這個工具主要是執行一些hadoop任務,用戶端的壓力并不大,是以沒有考慮這方面。             不過總體來說這是一個比較好的工具,至少web界面可以很友善和直覺的檢視任務的執行以及運作結果(P.S.azkaban對任務執行結是否成功是怎麼判斷的?),雖然文檔上說它可以支援多個executor,但是實際上并沒有發現這麼用的,我覺得可以繼續改進它來實作多個機器之間任意的程式之間的并行,例如有多個job可以并行執行的,我有多台executor伺服器,我可以将任何一個job部署到任何一個executor上執行,充分利用所有的硬體資源。我靠,這不就成了hadoop中jobTracker和TaskTracker的結果了?算了。這個就純屬個人的瞎扯了。

繼續閱讀