天天看點

datax 定時執行多個job_從DataX學插件式架構設計

datax 定時執行多個job_從DataX學插件式架構設計

前言

DataX是阿裡巴巴開源的離線多資料源同步工具,被應用到阿裡内部多個資料産品如Dataworks中。開源版本落後内部版本年餘的時間,但是整體架構和思想沒有變化。内部版本改進整體上屬于錦上添花的功能,如修複了一些bug,提升了部分reader/writer的性能,支援流式資料統計的能力等。熟悉整套架構的思路和方法後,自己動手,豐衣足食,也按需增添自己的能力。

完整分析DataX代碼實作不是一篇文章可以承載的,是以本文的思路在于學習方法論的東西。想象DataX面臨的場景:實作從一個資料源導入到另一個資料源的流程。在不同環境裡,這個流程在業務上是相似的(資料的抽取->處理->寫入),但是細節上有差異。按照低内聚高耦合原則,設計者就要考慮将未來的變化點封裝在獨立的子產品裡,新需求不會對原來的代碼有傷筋動骨的改動。于是就引出了“插件式”架構的概念,這個思想在浏覽器如Chrome和IDE如Eclipse裡廣泛應用,是以得名。“插件”就是變化點,當使用者需要新功能的時候,隻要下載下傳并安裝相應的插件,而不用下載下傳一個新版本的浏覽器或者IDE,應用本身就可以做的比較小,隻包含必要的功能(Office系列就是個反例)。作為架構開發者,設計好插件格式,開發完幾個常用的插件之後,就可以把精力放在架構性能和fix bug上,而不用被那些重複性質的“髒活累活”牽扯精力。

插件式架構設計,需要考慮的東西有

  • 能力如何封裝和暴露:對使用者,如何讓他們簡單的就可以使用datax的能力;對開發者,如何友善的接入架構,擴充能力。
  • 邊界在哪裡:哪些能力由架構實作,哪些能力歸插件實作的。
  • 隔離性怎麼做:如何将外部的能力接入到架構内部,同時不破壞架構的穩定性。

回到DataX,其設計思路是非常清晰的:首先是配置和邏輯分離,配置放在json檔案裡,啟動的時候傳給程序。配置分系統參數(core.json,plugin.json)和任務參數(job.json),系統參數可以被覆寫。程序啟動式掃描配置和插件目錄,加載相應的插件。其次,将邏輯分成reader/writer和架構兩部分,read/writer是可以交出去的部分,也是變化點(異構資料源通路和資料處理),架構負責排程處理和流控。

開始分析

啟動腳本分析

DataX以python腳本啟動java程序的方式運作,相比shell方式,增大了運維負擔,畢竟多數Linux主機需要額外安裝python。

python bin/datax.py example.json
           

example.json裡包含任務的配置,參見

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
           

啟動腳本datax.py分析來看,其主要功能是:

  • 準備配置參數
  • 将配置參數作為啟動參數傳給java程序

前者在generateJobConfigTemplate裡

def 
           

解析json檔案,并将其轉成業務參數,傳給java程序。

後者在buildStartCommand裡,原始模版是

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
           

可以看到是個經典的啟動配置,入口主函數是com.alibaba.datax.core.Engine,對JVM預設參數不滿意可以在啟動的時候調整。

調試

datax的編譯和調試十分友好:下載下傳源碼之後,本地編譯,通過maven-assembly-plugin插件将結果輸出到target/datax目錄下。将datax.py裡221行注釋去掉,變成

print startCommand
           

即可列印啟動參數,配置檔案是現成的。執行

python ./bin/datax.py ./job/job.json
           

即可輸出。本機輸出如下:

'java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/Users//code/DataX/target/datax/datax/log
 -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/Users//code/DataX/target/datax/datax/log
 -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener
 -Djava.security.egd=file:///dev/urandom -Ddatax.home=/Users//code/DataX/target/datax/datax
 -Dlogback.configurationFile=/Users//code/DataX/target/datax/datax/conf/logback.xml -classpath
 /Users//code/DataX/target/datax/datax/lib/*:.  -Dlog.file.name=x_datax_job_job_json com.alibaba.datax.core.Engine
 -mode standalone -jobid -1 -job /Users//code/DataX/target/datax/datax/job/job.json'
           

上面幾行是jvm啟動參數(VM options),最後一行是程式啟動參數(program arguments),将其配置在Intellij IDE的EditConfiguration裡,再啟動程式即可debug。

運作時分析

DataX的的概念和運作邏輯在 插件開發寶典 中有詳細的描述,建議先讀完文檔再接着看。通過代碼閱讀,對其内容有更加深刻的認識,先看下幾個概念:

  • Job: Job是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX資料同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。
  • Task: Task是為最大化而把Job拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若幹個并發執行。
  • TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執行下的Task集合稱之為TaskGroup
  • JobContainer: Job執行器,負責Job全局拆分、排程、前置語句和後置語句等工作的工作單元。類似Yarn中的JobTracker
  • TaskGroupContainer: TaskGroup執行器,負責執行一組Task的工作單元。

程式入口是Engine類的main函數,接收的參數有任務配置,任務id和執行模式,entry函數裡将系統參數與任務參數組合在一起。start函數進入JobContainer類,主要功能都在這個類的start函數裡實作,看源碼的時候我主要想了解它是如何做到這幾件事情的:

  1. 架構與插件之間邏輯的切換和連接配接
  2. 如何加載插件資源到架構的
  3. 任務是如何執行的
  4. 資料從Reader同步到Writer的

分别解答如下:

  1. 架構與插件之間邏輯的切換和連接配接:模版(Template)模式

通過模版模式在主類裡定義好流程順序,然後由派生類加載自定義邏輯。DataX裡有兩個流程:Job負責資源的準備,任務資訊的配置。Task則負責任務邏輯的執行。兩個流程均由模版模式定義,在JobContainer類的start函數中定義的流程為:init() -> prepare() -> split() -> schedule() -> post(),task的流程定義在ReaderRuner和WriterRuner兩個類中:init() -> prepare() -> startRead/startWrite()-> post(),均表示 “初始化 -> 事前 -> 事中 -> 事後” 的流程。每種資料源都需要重載Reader/Writer裡裡面包含裡Job和Task的定義。不是所有流程都要重載,最主要的是init,startRead/startWrtier幾個函數。

datax 定時執行多個job_從DataX學插件式架構設計

類繼承圖

2. 如何加載插件資源到架構的

插件配置檔案裡有類名和名稱資訊

{
    "name": "mysqlwriter",
    "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
    "description": "Use Jdbc connect to database, execute insert sql.",
    "developer": "alibaba"
}
           

在JobContainer的init函數裡,通過JarLoader從路徑加載類定義并示例化,為了防止多個類加載沖突,每次加載都要使用自定義的classLoader。

3. 任務是如何執行的

datax 定時執行多個job_從DataX學插件式架構設計

執行類圖

在schedule流程裡會建立一個固定線程池Executors.newFixedThreadPool,将拆分好任務轉成TaskGroupContainerRunner送出到線程池中執行。每個TaskGroupContainerRunner裡有一個TaskGroupContainer,這是任務運作時最重要的類:任務執行對主流程在這個裡面。這個線程裡會為每個任務new一個TaskExecutor對象,其中包含了一個ReaderRunner和WriterRunner(因為是1:1關系),看名稱就知道會啟動兩個線程,分别讀取和寫入資料,兩個線程之間的通訊方式見下。

4. 資料從Reader同步到Writer的:通過線程之間的共享記憶體

在插件的Reader裡将資料寫入到RecordSender中,而在Writer中則從RecordReceiver中讀取資料,這兩個都是接口,實際實作BufferedRecordExchanger同時繼承自這兩個接口,然後有自己的實作,調用Channel接口。Channel接口的實作管理資料是緩沖到記憶體還是持久化。在生成ReaderRunner和WriterRunner的時候,傳入同一個channel的實作MemoryChannel,Reader将資料寫入記憶體,Writer從記憶體中讀取資料。

datax 定時執行多個job_從DataX學插件式架構設計

總結看來,DataX的架構設計非常直接,沒有用到什麼複雜高深的技巧,可以稱為大巧不工。

未解決的問題

DataX采用的是單機單程序運作的方式,同步完畢則程序結束,整體更偏工具屬性。分布式精細化的東西做的不多,比如整個架構如果是一直在跑,持續提供服務,使用者送出的代碼,執行完畢結果輸出之後果把資源回收,多租戶可以向一個中心服務送出任務,這就有了serverless的味道了。需要考慮資源隔離共享,啟動速度等一系列問題。

DataX在國外用的不是很普遍,原因不細表。類似的資料同步工具有Sqoop和Kettle,有時間研究下。

後記

使用datax已有一段時間,寫作本文卻遷延多日,一些概念以為了解清楚了,表述的時候發現細節還是不清楚。從會用到能講之間,還有一段需要努力的距離,對此要做到心中有數。把知識按照按照學習長短分類,可以大緻分為三類:

一天的知識:學習時間在一天内的知識,通常就是一篇文章的長度,講解一些概念或者算法。來自搜尋引擎,微信,公衆号等,采集的過程是用眼睛讀,讀完通常的結論是:我懂了。但是如果要跟别人解釋,或者實際用的時候,發現還是有不得勁的地方。

一月的知識:學習時間需要若幹周的時間,需要精心準備,反複記憶的東西。常見于學習一本書,一門課程和一門語言。整個過程是個水磨功夫,用眼睛看已經不夠了這樣學到的知識通常是比較牢靠的,但如果沒有抓到其中的精髓,或者後面的鞏固提高,很多會随着時間淡忘。比如考完試之後一兩個月就忘差不多了,一門語言學完之後不用很快也就忘了。

一年的知識,這些知識往往就不是“學”出來的了,而是經年累月的實踐,踩坑獲得教訓,或者是某天頓悟獲得。通過反複的錘煉,知識之間建立了牢固的連結,形成了“元知識”,“方法論”。遇到問題的時候可以從現有知識出發,通過推導得到新知識。

這裡有個誤區,就是花一個月掌握了幾十個一天的知識,并不等于掌握了月級别的知識。以面試找工作為例,有大量的文章和公衆号,處于吸引流量的考慮,往往大量推薦各種“一天的知識”,如果花費了大量時間記憶這些“知識”,面試的時候發現很難兜住面試官的問題。要麼是有印象,記不住,要麼是對來龍去脈不清楚。究其根由,在于現代軟體業的發展已經十分發達,通過“封裝”,将複雜的實作細節隐藏在接口後面,目的是降低使用的難度。而對于那些公号作者來說,候選者這麼多,随便一個應用選一個點戳下去,就能炮制好幾篇文章。這些知識并非不重要,但選擇的時候要有判斷力,世上知識那麼多,全部學是學不過來的。擇吸收進自己的知識體系裡,思考新場景裡的應用可能。如果吸收不了,放棄也好過死記硬背。退一步說,也要考慮稀缺性問題:公衆号流量那麼大,看的人多,大家都背下來也展現不出你的優勢。如果工作中真要用到這方面的知識,留個心眼到時候翻出這篇公号文章再學一遍也行是不是?

一天的知識淺薄,一年的知識可遇而不可求,可以從一月的知識開始,比如學習一個完整的課程。學習的時候不能隻靠眼睛,而要動手記錄甚至實際操練。對學習時間要有産出的評估,比如花了一周時間學習這個東西,産出是什麼?是一篇文章,還是一次分享,還是應用到了産品中?學習目的是将新知識與已有的知識體系連接配接起來,拓展知識的邊界。這樣學起來雖然慢,但是知識掌握的牢靠,從長時間尺度看有事半功倍之效果。一月的知識學起來是比較痛苦和費事兒的,一方面要選好精力的投入點,與長久目标吻合,勞逸結合。另一方面,要一魚多吃,與過去掌握的知識進行比較,将其提升為元知識,對知識破碎重組,觸類旁通,這樣才對得起付出的時間。平常對于刷公衆号不要有太多期望,看成對自己已有知識的一個比對校驗,或者學習新知識的線索,是個比較平和的心态。

繼續閱讀