天天看點

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

架構師(JiaGouX)我們都是架構師!對于Spark這樣的分布式計算系統,任務會分發到多台機器上執行。如何榨幹有限的叢集資源來實作快速并行計算,是需要考慮的重要問題之一。而這個問題又可以拆解為:如何将有限的叢集資源都配置設定給Spark使用;如何将配置設定到的資源都利用起來。本文的話題屬于後者的範疇,将從筆者在實踐中遇到的場景出發,探讨如何在Spark下并行執行多個Job。 背景  在我們的資料系統中,有一些實時流任務與離線任務會将處理結果以Parquet的形式存儲到AWS S3上。為了便于後續處理與快速查詢,需要按照資料類型(目前有四種類型)、時間間隔(按天分隔)來劃分Parquet分區,即一批資料會分散存儲到不同的路徑下,比如s3://data/type=security/interval=1552608000、s3://data/type=policy/interval=1552608000等。目前,我們通過Spark DataFrame的API來寫檔案,有兩種實作方式:

  • 通過for循環依次根據資料類型、時間間隔将資料Filter出來,然後寫入相應的路徑下。
for type in types:    for interval in intervals:        df.filter(df.type==type).filter(df.interval==interval).write.parquet("s3://data/type=%s/interval=%s" % (type, interval))           
  • 通過partitionBy功能讓Spark自動做将資料寫入不同的分區路徑。
df.write.partitionBy("type", "interval").mode("append").parquet("s3://data")           

  這兩種實作方式,前者是顯式的一件一件做,每循環一次就是一個Job,後者是在一個Job中完成。看上去後者的執行效率會更高,代碼也簡潔,但實際效果并非如此。兩個原因:首先,第二種方式在每個Task中還是根據相應的資料類型、時間間隔來串行寫入的,并沒有真正提升寫入速度。其次,第一種方式會更加可控,可以顯式的知道做到了哪裡,如果出錯的話可以知道錯在哪個環節,恢複回來時可以根據checkpoint将已經做過的跳過去,避免重複寫入。目前,我們主要采用第一種方式,這也是我們後面探讨的前提。  對于一個Spark Job,我們總是期望能充分利用所有的cpu-vcore來并行執行,是以通常會将資料repartition成cpu-vcore的個數,即每個cpu-vcore上跑一個Task。而對于寫檔案的Job,每個Task會寫入到自己的一個檔案中,最終生成的檔案數是由Task個數決定。在下圖1中,假設叢集總共有12個cpu-vcore配置設定給Executor使用,那麼就會有12個Task并行執行寫入,最終生成12個檔案。

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖1. 充分利用資源來寫檔案  從充分利用資源的角度來看,這樣的設計無疑是最佳的。但是,對于一些實時流處理任務或者周期性的離線任務而言,這樣做會産生大量的小Parquet檔案,會給後續的檔案加載和快速查詢帶來困難。是以,從盡可能産生少量檔案的角度出發,需要采用圖2所示的寫入方式,即在寫入前,将資料配置設定到少量的Partition中,用少量的Task來執行。但是,這樣做就會導緻有部分cpu-vcore在寫入過程中處于閑置狀态,造成了資源浪費。

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖2. 産生少量檔案的方案  顯然,在這件事情上,“充分利用資源”和“産生少量檔案”兩個方向發生了沖突。那麼,有沒有一個兩全之策呢?即既保證産生少量檔案,又能把原本閑置的資源利用起來。如下圖3所示,假設我們能同時跑多個寫入檔案的Job,每個Job利用一部分cpu-vcore來執行,似乎就可以達到這個目的了。帶着這樣的思路,筆者做了一番調研與實踐。

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖3. 多Job并行執行 可行性分析  上述思路可以總結為:通過一個SparkContex并行送出多個Job,由Spark自己來排程資源,實作并行執行。針對這個思路,首先要搞清楚Spark是否支援這麼玩,如果支援的話又是怎麼支援的。本節将簡單梳理下Spark的任務排程機制。  圖4是筆者在公司分享時用的一張圖,主要希望表達三個意圖:任務排程的流程、任務排程的對象、任務排程涉及哪些子產品。綜合起來,簡要概括如下:

  • SparkContext向DAGScheduler送出一個Job後,會建立一個JobWaiter對象,用于阻塞目前線程,等待Job的執行結果。

    是以,在一個線程中,Job是順序執行的。

  • DAGScheduler會根據RDD的依賴關系将一個Job劃分為若幹個Stage(以Shuffle為界)。

    因為前後Stage存在資料上的依賴,是以隻有父Stage執行完畢才能送出目前Stage。

  • DAGScheduler在送出Stage時,會根據Partition資訊生成相應的Task,打包成TaskSet,送出給TaskScheduler。

    而TaskScheduler收到後,會将TaskSet封裝成TaskSetManager,丢到任務隊列中等待執行。

  • SchedulerBackend負責Executor狀态與資源的管理,當發現有空閑資源時,就會通過TaskScheduler從任務隊列中取出相應的TaskSetManager去排程執行。
  • TaskSetManager中的Task最終會分發到Executor中的線程裡去執行。
datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖4. Spark任務排程機制

  從上述分析中不難發現,Spark是以TaskSetManager為單元來排程任務的。通常情況下,任務隊列中隻會有一個TaskSetManager,而通過多線程送出多個Job時,則會有多個TaskSetManager被丢到任務隊列中。在有空閑資源的情況下,誰會從隊列裡被取出來執行就取決于相應的排程政策了。目前,Spark支援FIFO和FAIR兩種排程政策,在實踐中分别有圖5所示的三種模式。

  • 預設是FIFO政策,顧名思義,采用先進先出的方式。隻有TaskSetManager A中沒有Task需要執行并且有多餘的資源時,才會排程TaskSetManager B,即優先做前面的任務。舉例來說,假設有12個cpu-vcore,TaskSetManager A和B分别有18個、10個Task需要執行,那麼隻有當TaskSetManager A已經執行完了前面12個Task,在執行剩下的6個Task時,才會有多餘的6個cpu-vcore配置設定給TaskSetManager B。另一方面,假設TaskSetManager A和B各有6個Task需要執行,則剛好可以并行執行了。
  • 通過參數spark.scheduler.mode可以設定政策為FAIR,該方式讓多個TaskSetManager都有機會執行。但是,需要配合FAIR Pool來使用。預設情況下,TaskSetManager會被全部丢到一個default的Pool裡,此時排程效果與是FIFO一樣的。
  • Spark支援線上程中設定自己的FAIR Pool,進而将該線程中送出的TaskSetManager丢到指定的Pool中。多個FAIR Pool是會被輪詢執行的,執行權重可以預先設定。通過這種方式,可以讓所有TaskSetManager都有機會被排程,而不會被先進隊列的需要長時間運作的其他任務阻塞住。但是,需要注意的是,隻是讓A、B、C都有機會執行了,整體執行的時間(A+B+C)并不會被縮短(相比FIFO而言)。
datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖5. Spark任務排程政策 實踐探索  通過上節的分析,基本可以明确以下兩點:

  • Spark支援通過多線程在一個SparkContext上送出多個Job,每個線程裡面的Job是順序執行的,但是不同線程的Job是可以并行執行的,取決當時Executor中是否有充足的cpu-vcore。
  • 任務隊列中的TaskSetManager是有序執行,還是輪詢執行(可配置設定權重)取決于采用哪種排程政策,即上述的模型1或3(不推薦使用2)。

    不管是哪種執行方式,整體執行時間基本是一緻的,隻是模型3更适用于多使用者的場景,讓大家都有機會幹活。

  在這樣的思路下,我們做了一些實踐探索,部分代碼如下所示。該代碼從固定路徑下讀取檔案資料,然後啟動5個線程分别送出5個寫檔案的Job。實驗中,叢集共有55個cpu-vcore配置設定給Executor,每個寫檔案Job使用11個cpu-vcore來寫入,剛好可以配置設定5個Job并行執行。另外,該實驗采用預設的FIFO排程政策。

var df = spark.read.parquet("s3://data/type=access/interval=1551484800").repartition(55)// df.cache()// val c = df.count()// println(s"${c}")val jobExecutor = Executors.newFixedThreadPool(5)for( _ 0,   jobExecutor.execute(new Runnable {    override def run(): Unit = {      val id = UUID.randomUUID().toString()      df.coalesce(11).write.parquet(s"s3://data/test/${id}")    }  })}           

  看上去上述的配置設定沒有問題,但是第一次實驗時就失敗了。這裡的"失敗",指的是Spark沒有按照我們預期的那樣去并行執行寫入。如下圖6所示,有5個Active Stage,但是隻有一個處于running狀态,其有467個Task需要執行。造成這個問題的原因是,Spark是Lazy執行的,5個寫入Job都需要先去讀取原始檔案資料,再執行自己的寫入,而讀取檔案這個動作需要467個Task去執行,導緻阻塞。

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖6. 實驗一  知道了問題所在,就可以出招應對了。我們期望的是讀取檔案和相關處理隻做一次,而寫檔案分别由不同的線程并行執行。是以,需要将前者預先執行掉,并将資料緩存在記憶體中,即使用上述注釋掉的代碼。之後,就可以達到預期的并行執行的效果了,如下圖7所示。但是,我們又發現另一個問題:不斷有Executor因為記憶體不足被替換。原因是,相比原先一個一個寫檔案的情形,5個Job并行做事時,對記憶體的消耗也接近5倍了,需要配置設定更多的記憶體給Executor。

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖7. 實驗二  綜合來看,要達到并行執行的效果,并不是通過多線程送出多個Job就好了,需要通盤考慮多個事情。

  • 并行執行時,資源怎麼配置設定給不同的Job
  • 哪些動作是需要統一做的,哪些是需要并行執行的,在哪裡做Cache來應對Lazy執行帶來的問題
  • 合理提升Executor的記憶體來保證并行執行過程中記憶體是夠用的
  • 其他未知事項

  下圖8所示,是實驗結果的一個對比。可以看到,執行1個寫入Job需要14秒,那麼順序執行5個就需要70秒了;而并行執行5個Job時,最長的一個需要30秒,整體節省了57%的時間。另一方面,為何并行執行時最長的需要花費30秒,相比14秒,增長了1倍多?目前分析來看,跟資料傳輸和GC時間有關,會進一步研究。

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

圖8. 整體并行執行時間對比 總結  目前,多Job并行執行的方案已在筆者項目中使用,也達到了預期的效果。期望本文的分享能對有同樣需求的讀者有所幫助。

作者:Mr-Bruce

原文:

https://blog.csdn.net/zwgdft/article/details/88349295

如有侵權或不周之處,敬請勞煩聯系若飛(微信:1321113940)馬上删除,謝謝!

·END·

架構師

我們都是架構師!

datax 定時執行多個job_談談Spark下并行執行多個Job的問題

關注架構師(JiaGouX),添加“星标”

擷取每天技術幹貨,讓我們一起成為牛逼的架構師

微信技術群請加若飛微信:1321113940 進社群

投稿、合作郵箱:[email protected]

繼續閱讀