天天看點

Flink 生态:Pulsar Connector 機制剖析

整理:呂宴全(Flink 社群志願者)

作者:申毅傑(StreamNative)

摘要:本文由 Apache Pulsar Committer,StreamNative 進階工程師申毅傑老師分享,社群志願者呂宴全整理。本次分享将會分成以下四個内容展開:
  1. Pulsar 簡介
  2. Pulsar 架構
  3. Pulsar Connector 内部機制
  4. 未來規劃
Tips:點選「 閱讀原文 」,可檢視更多 Flink 生态直播~

Apache Pulsar 是 Yahoo 開源的下一代分布式消息系統,在2018年9月從 Apache 軟體基金會畢業成為頂級項目。Pulsar 特有的分層分片的架構,在保證大資料消息流系統的性能和吞吐量的同時,也提供了高可用性、高可擴充性和易維護性。

分片架構将消息流資料的存儲粒度從分區拉低到了分片,以及相應的層級化存儲,使 Pulsar 成為 unbounded streaming data storage 的不二之選。這使得 Pulsar 可以更完美地比對和适配 Flink 的批流一體的計算模式。

1. Pulsar 簡介

1.1 特點

随着開源後,各行業企業可以根據不同需求,為 Pulsar 賦予更豐富的功能,是以目前它也不再隻是中間件的功能,而是慢慢發展成為一個 Event Streaming Platform(事件流處理平台),具有 Connect(連接配接)、Store(存儲)和 Process(處理)功能。

■ Connect

在連接配接方面,Pulsar 具有自己單獨的 Pub/Sub 模型,可以同時滿足 Kafka 和 RocketMQ 的應用場景。同時 Pulsar IO 的功能,其實就是 Connector,可以非常友善地将資料源導入到 Pulsar 或從 Pulsar 導出等。

另外,在Pulsar 2.5.0 中,我們新增了一個重要機制:Protocol handler。這個機制支援在 broker 自定義添加額外的協定支援,可以保證在不更改原資料庫的基礎上,也能享用 Pulsar 的一些進階功能。是以 Pulsar 也延展出比如:KoP、ActiveMQ、Rest 等。

■ Store

Pulsar 提供了可以讓使用者導入的途徑後就必然需要考慮在 Pulsar 上進行存儲。Pulsar 采用的是分布式存儲,最開始是在 Apache BookKeeper 上進行。後來添加了更多的層級存儲,通過 JCloud 和 HDFS 等多種模式進行存儲的選擇。當然,層級存儲也受限于存儲容量。

■ Process

Pulsar 提供了一個無限存儲的抽象,友善第三方平台進行更好的批流融合的計算。即 Pulsar 的資料處理能力。Pulsar 的資料處理能力實際上是按照你資料計算的難易程度、實效性等進行了切分。

目前 Pulsar 包含以下幾類內建融合處理方式:

  • Pulsar Function:Pulsar 自帶的函數處理,通過不同系統端的函數編寫,即可完成計算并運用到 Pulsar 中。
  • Pulsar-Flink connector 和 Pulsar-Spark connector:作為批流融合計算引擎,Flink 和 Spark 都提供流計算的機制。如果你已經在使用他們了,那恭喜你。因為 Pulsar 也全部支援這兩種計算,無需你再進行多餘的操作了。
  • Presto (Pulsar SQL):有的朋友會在應用場景中更多的使用 SQL,進行互動式查詢等。Pulsar 與 Presto 有很好的內建處理,可以用 SQL 在 Pulsar 進行處理。
Flink 生态:Pulsar Connector 機制剖析

1.2 訂閱模型

從使用來看,Pulsar 的用法與傳統的消息系統類似,是基于釋出-訂閱模型的。使用者被分為生産者(Producer)和消費者(Consumer)兩個角色,對于更具體的需求,還可以以 Reader 的角色來消費資料。使用者可以以生産者的身份将資料釋出在特定的主題之下,也可以以消費者的身份訂閱(Subscription)特定的主題,進而擷取資料。在這個過程中,Pulsar 實作了資料的持久化與資料分發,Pulsar 還提供了Schema 功能,能夠對資料進行驗證。

如下圖所示,Pulsar 裡面有幾種訂閱模式:

  1. 獨占訂閱(Exclusive)
  2. 故障轉移訂閱(Failover)
  3. 共享訂閱(Shared)
  4. Key保序共享訂閱(Key_shared)
Flink 生态:Pulsar Connector 機制剖析
Flink 生态:Pulsar Connector 機制剖析

Pulsar 裡的主題分成兩類,一類是分區主題(Partitioned Topic),一類是非分區主題(Not Partitioned Topic)。

分區主題實際上是由多個非分區主題組成的。主題和分區都是邏輯上的概念,我們可以把主題看作是一個大的無限的事件流,被分區切分成幾條小的無限事件流。

而對應的,在實體上,Pulsar 采用分層結構。每一條事件流存儲在一個 Segment 中,每個Segment 包括了許多個Entry,Entry 裡面存放的才是使用者發送過來的一條或多條消息實體。

Message 是 Entry 中存放的資料,也是 Pulsar 中消費者消費一次獲得的資料。Message 中除了包括位元組流資料,還有 Key 屬性,兩種時間屬性和 MessageId 以及其他資訊。MessageId 是消息的唯一辨別,包括了ledger-id、entry-id、 batch-index、 partition-index 的資訊,如下圖,分别記錄了消息在Pulsar 中的Segment、Entry、Message、Partition 存儲位置, 是以也可以據此從實體上找到Message的資訊内容。

Flink 生态:Pulsar Connector 機制剖析

2. Pulsar 架構

一個 Pulsar 叢集由 Brokers 叢集和 Bookies 叢集組成。Brokers 之間是互相獨立的,負責向生産者和消費者提供關于某個主題的服務。Bookies 之間也是互相獨立的,負責存儲 Segment 的資料,是消息持久化的地方。為了管理配置資訊和代理資訊,Pulsar 還借助了 Zookeeper 這個元件,Brokers 和 Bookies 都會在 zookeeper 上注冊,下面從消息的具體讀寫路徑(見下圖)來介紹 Pulsar 的結構。

Flink 生态:Pulsar Connector 機制剖析
Flink 生态:Pulsar Connector 機制剖析

在寫路徑中,生産者建立并發送一條消息到主題中,該消息可能會以某種算法(比如Round robin)被路由到一個具體的分區上,Pulsar 會選擇一個Broker 為這個分區服務,該分區的消息實際會被發送到這個 Broker上。當Broker 拿到一條消息,它會以 Write Quorum (Qw)的方式将消息寫入到 Bookies 中。當成功寫入到 Bookies 的數量達到設定時,Broker 會收到完成通知,并且 Broker 也會傳回通知生産者寫入成功。

在讀路徑中,消費者首先要發起一次訂閱,之後才能與主題對應的 Broker 進行連接配接,Broker 從 Bookies 請求資料并發送給消費者。當資料接受成功,消費者可以選擇向 Broker 發送确認資訊,使得 Broker 能夠更新消費者的通路位置資訊。前面也提到,對于剛寫入的資料,Pulsar 會存儲在緩存中,那麼就可以直接從 Brokers 的緩存中讀取了,縮短了讀取路徑。

Pulsar 将存儲與服務相分離,實作了很好的可拓展性,在平台層面,能夠通過調整Bookies 的數量來滿足不同的需求。在使用者層面,隻需要跟 Brokers 通信,而Brokers 本身被設計成沒有狀态的,當某個 Broker 因故障無法使用時,可以動态的生成一個新的 Broker 來替換。

3. Pulsar Connector 内部機制

首先,Pulsar Connector 在使用上是比較簡單的,由一個 Source 和一個 Sink 組成,source 的功能就是将一個或多個主題下的消息傳入到 Flink 的Source中,Sink的功能就是從 Flink 的 Sink 中擷取資料并放入到某些主題下,在使用方式上,如下所示,與 Kafa Connector 很相似,使用時需要設定一些參數。

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); 
props.setProperty("topic", "test-source-topic") FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
              serviceUrl, 
              adminUrl, 
              new SimpleStringSchema(), 
              props); 
DataStream<String> stream = see.addSource(source);

FlinkPulsarSink<Person> sink = 
      new FlinkPulsarSink(
              serviceUrl, 
              adminUrl, 
              Optional.of(topic), // mandatory target topic 
              props, 
              TopicKeyExtractor.NULL, // replace this to extract key or topic for each record
              Person.class); 
stream.addSink(sink);           

現在介紹 Kulsar Connector 一些特性的實作機制。

3.1 精确一次

因為 Pulsar 中的 MessageId 是全局唯一且有序的,與消息在 Pulsar 中的實體存儲也對應,是以為了實作 Exactly Once,Pulsar Connector 借助 Flink 的 Checkpoint 機制,将 MessageId 存儲到 Checkpoint。

對于連接配接器的 Source 任務,在每次觸發 Checkpoint 的時候,會将各個分區目前處理的 MessageId 儲存到狀态存儲裡面,這樣在任務重新開機的時候,每個分區都可以通過 Pulsar 提供的 Reader seek 接口找到 MessageId 對應的消息位置,然後從這個位置之後讀取消息資料。

通過 Checkpoint 機制,還能夠向存儲資料的節點發送資料使用完畢的通知,進而能準确删除過期的資料,做到存儲的合理利用。

3.2 動态發現

考慮到Flink中的任務都是長時間運作的,在運作任務的過程中,使用者也許會需要動态的增加部分主題或者分區,Pulsar Connector 提供了自動發現的解決方案。

Pulsar 的政策是另外啟動一個線程,定期的去查詢設定的主題是否改變,分區有沒有增删,如果發生了新增分區的情況,那麼就額外建立新的Reader 任務去完成主題下的資料的反序列化,當然如果是删除分區,也會相應的減少讀取任務。

3.3 結構化資料

在讀取主題下的資料的過程中,我們可以将資料轉化成一條條結構化的記錄來處理。Pulsar 支援 Avro schema and avro/json/protobuf Message 格式類型的資料轉化成 Flink 中的 Row格式資料。對于使用者關心的中繼資料,Pulsar 也在 Row 中提供了對應的中繼資料域。

另外,Pulsar 基于 Flink 1.9 版本進行了新的開發,支援 Table API 和 Catalog,Pulsar 做了一個簡單的映射,如下圖所示,将 Pulsar 的租戶/命名空間對應到 Catalog 的資料庫,将主題對應為庫中的具體表。

Flink 生态:Pulsar Connector 機制剖析

4. 未來規劃

首先,之前提到 Pulsar 将資料存儲在 Bookeeper 中,還可以導入到 Hdfs 或者 S3 這樣的檔案系統中,但對于分析型應用來說,我們往往隻關心所有資料中每條資料的部分屬性,是以采用列存儲的方式對 IO 和網絡都會有性能提升,Pulsar 也在嘗試在Segment 中以列的方式存儲。

其次,在原來的讀路徑中,不管是 Reader 還是Comsumer,都需要通過 Brokers 來傳遞資料。如果采用新的 Bypass Broker方式,通過查詢中繼資料,就能直接找到每條 Message 存儲的 Bookie 位置,這樣可以直接從 Bookie 讀取資料,縮短讀取路徑,進而提升效率。

最後,Pulsar 相對 Kafka 來說,由于資料在實體上是存放在一個個 Segment 中的,那麼在讀取的過程中,通過提高并行化的方式,建立多線程同時讀取多個 Segment,就能夠提升整個作業的完成效率,不過這也需要你的任務自身對每個Topic 分區的通路順序沒有嚴格要求,并且對于新産生的資料,是不儲存在 Segement 的,還是需要做緩存的通路來擷取資料,是以,并行讀取将成為一個可選項,為使用者提供更多的選擇方案。