天天看點

生産環境中的并行度和資源設定

《2021年最新版大資料面試題全面開啟更新》

    在使用Flink處理生産實際問題時,并行度和資源的配置調優是經常要面對的工作之一,如果有效和正确地配置并行度是任務能夠高效執行的必要條件。

Flink中的計算資源

     首先了解Flink中的計算資源的核心概念,比如Slot、Chain、Task等,這有助于我們快速定位生産中的問題。

Task Slot

     Flink都是以叢集在運作,在運作的過程中包含兩類程序,其中之一就是TaskManager。

     在Flink叢集中,一個TaskManager就是一個JVM程序,并且會用獨立的現場來執行task,為了控制一個TaskManager能接受多少個task,Flink提出了Task Slot的概念。

     可以簡單地把task slot了解為TaskManager的計算資源子集。比如一個TaskManager擁有5個Slot,那麼該TaskManager的計算資源會平均分為5份,不同的task在不同的slot中執行,避免資源競争。需要注意的是,Slot僅僅用來做記憶體的隔離,對CPU不起作用。那麼運作在同一個JVM的task可以共享TCP連接配接,減少網絡傳輸,在一定程對上提高了程式的運作效率,降低了資源消耗。

生産環境中的并行度和資源設定

Slot共享

     預設情況下,Flink還允許同一個Job的子任務共享slot。因為在一個Flink任務中,有很多算子,這些算子的計算壓力各不相同,比如簡單的map和filter算子所需要的資源不多,但是有些算子比如window、group by則需要更多的計算資源才能滿足計算所需。這時那些資源需求大的算子就可以共用其他slot,提高整個叢集的資源使用率。

Operator Chain

     此外Flink自身會把不同的算子的task連接配接在一起組成一個新的task。這麼做時因為Flink本身提供了非常有效的任務優化手段,因為task是在同一個線程中執行,那麼可以有效減少線程間上下文的切換,并且減少序列化/反序列化帶來的資源消耗,進而在整體上提高我們任務的吞吐量。

并行度

     Flink使用并行度來定義某一個算子被切分成多少個子任務。Flink代碼會被轉換成邏輯是圖,在實際運作時根據使用者的并行度設定會被轉換成對應的子任務進行執行。

生産環境中的并行度和資源設定

源碼解析

     Flink Job在執行中會通過SlotProvider向ResourceManager申請資源,RM負責協調TaskManager,滿足JobManager的資源請求。

生産環境中的并行度和資源設定

     整體的類圖如上所述,SlotProvider中的allocateSlot方法負責向SlotPool申請可用的slot資源,通過returnLogicSlot将空閑的slot釋放至SlotPool。

     在整個Flink的資源管理的類中,核心的類包括Scheduler、SlotPool、JobMaster。他們之間的互動流程主要:Scheduler排程器向SlotPool資源池申請和釋放slot;如果SlotPool不能滿足需求,那麼會向ResourceManager發起申請;擷取到的資源通過JobMaster配置設定給SlotPool。

如何設定并行度

     Flink本身支援不同級别來設定我們任務并行度的方法,他們分别是:

  • 算子級别
  • 環境級别
  • 用戶端級别
  • 叢集配置級别

     在編寫Flink程式時,可以在代碼中顯示的制定不同算子的并行度。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = ...
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.setParallelism(10)
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(1);

wordCounts.print();
env.execute("word count");
           

     如上,可以通過顯示的調用setParallelism()方法來顯示的指定每個算子的并行度配置。

     在實際生産中,推薦在算子級别顯示指定各自的并行度,友善進行顯示和精确的資源控制。

     環境級别的并行度設定指的是可以通過調用env.setParallelism()方法來這是整個任務的并行度:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
...
           

一旦設定了這個參數,表明任務中的所有算子的并行度都是指定的值,生産環境中不推薦。

     可以在使用指令送出Flink Job的時候指定并行度,當任務執行時發現代碼中沒有設定并行度,便會采用送出指令時的參數。

     通過 -p 指令來指定送出任務時候的并行度:

./bin/flink run -p 5 ../wordCount-java*.jar
           
parallelism.default:1
           

繼續閱讀