天天看點

Flink 原理與實作:如何處理反壓問題

原文連結: http://wuchong.me/blog/2016/04/26/flink-internals-how-to-handle-backpressure/

流處理系統需要能優雅地處理反壓(backpressure)問題。反壓通常産生于這樣的場景:短時負載高峰導緻系統接收資料的速率遠高于它處理資料的速率。許多日常問題都會導緻反壓,例如,垃圾回收停頓可能會導緻流入的資料快速堆積,或者遇到大促或秒殺活動導緻流量陡增。反壓如果不能得到正确的處理,可能會導緻資源耗盡甚至系統崩潰。

目前主流的流處理系統 Storm/JStorm/Spark Streaming/Flink 都已經提供了反壓機制,不過其實作各不相同。

Storm 是通過監控 Bolt 中的接收隊列負載情況,如果超過高水位值就會将反壓資訊寫到 Zookeeper ,Zookeeper 上的 watch 會通知該拓撲的所有 Worker 都進入反壓狀态,最後 Spout 停止發送 tuple。具體實作可以看這個 JIRA STORM-886。

JStorm 認為直接停止 Spout 的發送太過暴力,存在大量問題。當下遊出現阻塞時,上遊停止發送,下遊消除阻塞後,上遊又開閘放水,過了一會兒,下遊又阻塞,上遊又限流,如此反複,整個資料流會一直處在一個颠簸狀态。是以 JStorm 是通過逐級降速來進行反壓的,效果會較 Storm 更為穩定,但算法也更複雜。另外 JStorm 沒有引入 Zookeeper 而是通過 TopologyMaster 來協調拓撲進入反壓狀态,這降低了 Zookeeper 的負載。

Flink 中的反壓

那麼 Flink 是怎麼處理反壓的呢?答案非常簡單:Flink 沒有使用任何複雜的機制來解決反壓問題,因為根本不需要那樣的方案!它利用自身作為純資料流引擎的優勢來優雅地響應反壓問題。下面我們會深入分析 Flink 是如何在 Task 之間傳輸資料的,以及資料流如何實作自然降速的。

Flink 在運作時主要由 operators 和 streams 兩大元件構成。每個 operator 會消費中間态的流,并在流上進行轉換,然後生成新的流。對于 Flink 的網絡機制一種形象的類比是,Flink 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。還記得經典的線程間通信案例:生産者消費者模型嗎?使用 BlockingQueue 的話,一個較慢的接受者會降低發送者的發送速率,因為一旦隊列滿了(有界隊列)發送者會被阻塞。Flink 解決反壓的方案就是這種感覺。

在 Flink 中,這些分布式阻塞隊列就是這些邏輯流,而隊列容量是通過緩沖池(

LocalBufferPool

)來實作的。每個被生産和被消費的流都會被配置設定一個緩沖池。緩沖池管理着一組緩沖(

Buffer

),緩沖在被消費後可以被回收循環利用。這很好了解:你從池子中拿走一個緩沖,填上資料,在資料消費完之後,又把緩沖還給池子,之後你可以再次使用它。

在解釋 Flink 的反壓原理之前,我們必須先對 Flink 中網絡傳輸的記憶體管理有個了解。

網絡傳輸中的記憶體管理

如下圖所示展示了 Flink 在網絡傳輸場景下的記憶體管理。網絡上傳輸的資料會寫到 Task 的 InputGate(IG) 中,經過 Task 的處理後,再由 Task 寫到 ResultPartition(RS) 中。每個 Task 都包括了輸入和輸入,輸入和輸出的資料存在 

Buffer

 中(都是位元組資料)。Buffer 是 MemorySegment 的包裝類。

Flink 原理與實作:如何處理反壓問題
  1. TaskManager(TM)在啟動時,會先初始化

    NetworkEnvironment

    對象,TM 中所有與網絡相關的東西都由該類來管理(如 Netty 連接配接),其中就包括

    NetworkBufferPool

    。根據配置,Flink 會在 NetworkBufferPool 中生成一定數量(預設2048)的記憶體塊 MemorySegment(關于 Flink 的記憶體管理,後續文章會詳細談到),記憶體塊的總數量就代表了網絡傳輸中所有可用的記憶體。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的,每個 TM 隻會執行個體化一個。
  2. Task 線程啟動時,會向 NetworkEnvironment 注冊,NetworkEnvironment 會為 Task 的 InputGate(IG)和 ResultPartition(RP) 分别建立一個 LocalBufferPool(緩沖池)并設定可申請的 MemorySegment(記憶體塊)數量。IG 對應的緩沖池初始的記憶體塊數量與 IG 中 InputChannel 數量一緻,RP 對應的緩沖池初始的記憶體塊數量與 RP 中的 ResultSubpartition 數量一緻。不過,每當建立或銷毀緩沖池時,NetworkBufferPool 會計算剩餘空閑的記憶體塊數量,并平均配置設定給已建立的緩沖池。注意,這個過程隻是指定了緩沖池所能使用的記憶體塊數量,并沒有真正配置設定記憶體塊,隻有當需要時才配置設定。為什麼要動态地為緩沖池擴容呢?因為記憶體越多,意味着系統可以更輕松地應對瞬時壓力(如GC),不會頻繁地進入反壓狀态,是以我們要利用起那部分閑置的記憶體塊。
  3. 在 Task 線程執行過程中,當 Netty 接收端收到資料時,為了将 Netty 中的資料拷貝到 Task 中,InputChannel(實際是 RemoteInputChannel)會向其對應的緩沖池申請記憶體塊(上圖中的①)。如果緩沖池中也沒有可用的記憶體塊且已申請的數量還沒到池子上限,則會向 NetworkBufferPool 申請記憶體塊(上圖中的②)并交給 InputChannel 填上資料(上圖中的③和④)。如果緩沖池已申請的數量達到上限了呢?或者 NetworkBufferPool 也沒有可用記憶體塊了呢?這時候,Task 的 Netty Channel 會暫停讀取,上遊的發送端會立即響應停止發送,拓撲會進入反壓狀态。當 Task 線程寫資料到 ResultPartition 時,也會向緩沖池請求記憶體塊,如果沒有可用記憶體塊時,會阻塞在請求記憶體塊的地方,達到暫停寫入的目的。
  4. 當一個記憶體塊被消費完成之後(在輸入端是指記憶體塊中的位元組被反序列化成對象了,在輸出端是指記憶體塊中的位元組寫入到 Netty Channel 了),會調用 

    Buffer.recycle()

     方法,會将記憶體塊還給 LocalBufferPool (上圖中的⑤)。如果LocalBufferPool中目前申請的數量超過了池子容量(由于上文提到的動态容量,由于新注冊的 Task 導緻該池子容量變小),則LocalBufferPool會将該記憶體塊回收給 NetworkBufferPool(上圖中的⑥)。如果沒超過池子容量,則會繼續留在池子中,減少反複申請的開銷。

反壓的過程

下面這張圖簡單展示了兩個 Task 之間的資料傳輸以及 Flink 如何感覺到反壓的:

Flink 原理與實作:如何處理反壓問題
  1. 記錄“A”進入了 Flink 并且被 Task 1 處理。(這裡省略了 Netty 接收、反序列化等過程)
  2. 記錄被序列化到 buffer 中。
  3. 該 buffer 被發送到 Task 2,然後 Task 2 從這個 buffer 中讀出記錄。

不要忘了:記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer。

結合上面兩張圖看:Task 1 在輸出端有一個相關聯的 LocalBufferPool(稱緩沖池1),Task 2 在輸入端也有一個相關聯的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來序列化記錄 “A”,我們就序列化并發送該 buffer。

這裡我們需要注意兩個場景:

  • 本地傳輸:如果 Task 1 和 Task 2 運作在同一個 worker 節點(TaskManager),該 buffer 可以直接交給下一個 Task。一旦 Task 2 消費了該 buffer,則該 buffer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢,那麼 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度,導緻緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。
  • 遠端傳輸:如果 Task 1 和 Task 2 運作在不同的 worker 節點上,那麼 buffer 會在發送到網絡(TCP Channel)後被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然後拷貝網絡中的資料到 buffer 中。如果沒有可用的 buffer,會停止從 TCP 連接配接中讀取資料。在輸出端,通過 Netty 的水位值機制來保證不往網絡中寫入太多資料(後面會說)。如果網絡中的資料(Netty輸出緩沖中的位元組數)超過了高水位值,我們會等到其降到低水位值以下才繼續寫入資料。這保證了網絡中不會有太多的資料。如果接收端停止消費網絡中的資料(由于接收端緩沖池沒有可用 buffer),網絡中的緩沖資料就會堆積,那麼發送端也會暫停發送。另外,這會使得發送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請求 buffer,阻塞了 writer 往 ResultSubPartition 寫資料。

這種固定大小緩沖池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生産資料的速度不會快于消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的資料傳輸自然地擴充到更複雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。

Netty 水位值機制

下方的代碼是初始化 NettyServer 時配置的水位值參數。

// 預設高水位值為2個buffer大小, 當接收端消費速度跟不上,發送端會立即感覺到
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
      

當輸出緩沖中的位元組數超過了高水位值, 則 Channel.isWritable() 會傳回false。當輸出緩存中的位元組數又掉到了低水位值以下, 則 Channel.isWritable() 會重新傳回true。Flink 中發送資料的核心代碼在 

PartitionRequestQueue

 中,該類是 server channel pipeline 的最後一層。發送資料關鍵代碼如下所示。

private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
  if (fatalError) {
    return;
  }

  Buffer buffer = null;

  try {
    // channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK 
    // 和 WRITE_BUFFER_HIGH_WATER_MARK 實作發送端的流量控制
    if (channel.isWritable()) {
      // 注意: 一個while循環也就最多隻發送一個BufferResponse, 連續發送BufferResponse是通過writeListener回調實作的
      while (true) {
        if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
          return;
        }

        buffer = currentPartitionQueue.getNextBuffer();

        if (buffer == null) {
          // 跳過這部分代碼
          ...
        }
        else {
          // 構造一個response傳回給用戶端
          BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());

          if (!buffer.isBuffer() &&
              EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
            // 跳過這部分代碼。batch 模式中 subpartition 的資料準備就緒,通知下遊消費者。
            ...
          }

          // 将該response發到netty channel, 當寫成功後, 
          // 通過注冊的writeListener又會回調進來, 進而不斷地消費 queue 中的請求
          channel.writeAndFlush(resp).addListener(writeListener);

          return;
        }
      }
    }
  }
  catch (Throwable t) {
    if (buffer != null) {
      buffer.recycle();
    }

    throw new IOException(t.getMessage(), t);
  }
}

// 當水位值降下來後(channel 再次可寫),會重新觸發發送函數
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  writeAndFlushNextMessageIfPossible(ctx.channel());
}
      

核心發送方法中如果channel不可寫,則會跳過發送。當channel再次可寫後,Netty 會調用該Handle的 

channelWritabilityChanged

 方法,進而重新觸發發送函數。

反壓實驗

另外,官方部落格中為了展示反壓的效果,給出了一個簡單的實驗。下面這張圖顯示了:随着時間的改變,生産者(×××線)和消費者(綠色線)每5秒的平均吞吐與最大吞吐(在單一JVM中每秒達到8百萬條記錄)的百分比。我們通過衡量task每5秒鐘處理的記錄數來衡量平均吞吐。該實驗運作在單 JVM 中,不過使用了完整的 Flink 功能棧。

Flink 原理與實作:如何處理反壓問題

首先,我們運作生産task到它最大生産速度的60%(我們通過Thread.sleep()來模拟降速)。消費者以同樣的速度處理資料。然後,我們将消費task的速度降至其最高速度的30%。你就會看到背壓問題産生了,正如我們所見,生産者的速度也自然降至其最高速度的30%。接着,停止消費task的人為降速,之後生産者和消費者task都達到了其最大的吞吐。接下來,我們再次将消費者的速度降至30%,pipeline給出了立即響應:生産者的速度也被自動降至30%。最後,我們再次停止限速,兩個task也再次恢複100%的速度。總而言之,我們可以看到:生産者和消費者在 pipeline 中的處理都在跟随彼此的吞吐而進行适當的調整,這就是我們希望看到的反壓的效果。

反壓監控

在 Storm/JStorm 中,隻要監控到隊列滿了,就可以記錄下拓撲進入反壓了。但是 Flink 的反壓太過于天然了,導緻我們無法簡單地通過監控隊列來監控反壓狀态。Flink 在這裡使用了一個 trick 來實作對反壓的監控。如果一個 Task 因為反壓而降速了,那麼它會卡在向 

LocalBufferPool

 申請記憶體塊上。那麼這時候,該 Task 的 stack trace 就會長下面這樣:

java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request
[...]
      

那麼事情就簡單了。通過不斷地采樣每個 task 的 stack trace 就可以實作反壓監控。

Flink 原理與實作:如何處理反壓問題

總結

繼續閱讀