天天看點

Pulsar雲原生分布式消息和流平台v2.8.0

Pulsar雲原生分布式消息和流平台v2.8.0

Apache Pulsar是一個雲原生的分布式消息和流媒體平台,最初建立于雅虎!現在是Apache軟體基金會的頂級項目。官網首頁列舉一些關鍵特性和目前使用公司包括國内深度合作騰訊,目前最新版本為2.8.0,背後的開源流資料公司 StreamNative,2019年創立一家公司,作為雲原生時代專注技術細分領域的佼佼者。Pulsar即可以支援queue模式的消息中間件比如RabbitMQ和RocketMQ,也可以支援stream流模式的Kafka,幾乎涵蓋消息應用的領域,加上豐富企業特性如多租戶隔離、百萬級Topics、跨地域複制、鑒權認證,是雲原生時代其他消息中間件的演化或者說是替代品也不為過

Pulsar雲原生分布式消息和流平台

**本人部落格網站 **IT小神 www.itxiaoshen.com

Pulsar官方網站

Apache Pulsar是一個雲原生的分布式消息和流媒體平台,最初建立于雅虎!現在是Apache軟體基金會的頂級項目

官網首頁列舉一些關鍵特性和目前使用公司包括國内深度合作騰訊,目前最新版本為2.8.0,背後的開源流資料公司 StreamNative,2019年創立一家公司,作為雲原生時代專注技術細分領域的佼佼者

什麼是Pulsar

Pulsar即可以支援queue模式的消息中間件比如RabbitMQ和RocketMQ,也可以支援stream流模式的Kafka,幾乎涵蓋消息應用的領域,加上豐富企業特性如多租戶隔離、百萬級Topics、跨地域複制、鑒權認證,是雲原生時代其他消息中間件的演化或者說是替代品也不為過

Pulsar雲原生分布式消息和流平台v2.8.0
Pulsar雲原生分布式消息和流平台v2.8.0

部署模式

支援多種部署模式,比如本地開發測試環境下單機運作環境,生産使用叢集部署或多叢集部署,還有基于容器化的Docker和K8s部署等

Pulsar雲原生分布式消息和流平台v2.8.0

概覽

Pulsar 是一個用于伺服器到伺服器的消息系統,具有多租戶、高性能等優勢。 Pulsar 最初由 Yahoo 開發,目前由 Apache 軟體基金會管理。

Pulsar 的關鍵特性如下:

  • Pulsar 的單個執行個體原生支援多個叢集,可跨機房在叢集間無縫地完成消息複制。
  • 極低的釋出延遲和端到端延遲。
  • 可無縫擴充到超過一百萬個 topic。
  • 簡單的用戶端 API,支援 Java、Go、Python 和 C++。
  • 支援多種 topic 訂閱模式(獨占訂閱、共享訂閱、故障轉移訂閱)。
  • 通過 Apache BookKeeper 提供的持久化消息存儲機制保證消息傳遞 。
    • 由輕量級的 serverless 計算架構 Pulsar Functions 實作流原生的資料處理。
  • 基于 Pulsar Functions 的 serverless connector 架構 Pulsar IO 使得資料更易移入、移出 Apache Pulsar。
  • 分層式存儲可在資料陳舊時,将資料從熱存儲解除安裝到冷/長期存儲(如S3、GCS)中。

Pulsar需要解決問題

  • 企業需求和資料規模
    • 多租戶-百萬Topics-低延遲-持久化-跨地域複制
    • 解除存儲計算耦合
      • 運維痛點:替換機器、服務擴容、資料rebalance
    • 減少檔案系統依賴
      • 性能難保障:持久化、一緻性、多Topic
      • IO不隔離:消費者度Backlog的時候會影響其他生産者和消費者,Kakfa采用順序寫的機制提升性能,當Topic和分區數大量增大後便會退化為随機寫而極大減低起IO性能

架構

Pulsar底層最為關鍵技術是采用存儲和計算分離以及分層+分片的架構,節點是對等的可以獨立擴充并支援靈活擴容和快速容錯機制,這也是為什麼說Pulsar是雲原生架構的主要原因;

Pulsar雲原生分布式消息和流平台v2.8.0
Pulsar雲原生分布式消息和流平台v2.8.0

Pulsar企業級存儲層采用的是Apache BookKeeper持久化存儲。 BookKeeper是一個分布式的預寫日志(WAL)系統,滿足低延遲、高吞吐、持久化、強一緻性、高可用、I/O隔離,中繼資料服務基于rocksdb legder存儲,而基于New Sql新一代分布式關系資料庫TiDb的k-v存儲節點底層也是采用 性能非常強大單機存儲引擎rocksdb,關于 BookKeeper我們本篇不做延展介紹,後續有時間再單獨闡述。

Broker作為計算層依靠Zookeeper作為生産者和消費者的橋梁,天然屬于無狀态的服務,擴容通過服務發現自動動态感覺;另一方面底層是分布式存儲,是以擴容直接添加存儲節點即可,原來在Kafka擴容節點後,如果沒有屬于該節點的分區資料則擴容節點是無法起作用的,需要做分區管理或rebalance,而在Pulsar中新增加節點則會實時增加資料進來,這個得益于Pulsar的架構設計,采用分層+分片的邏輯存儲概念,每一塊存儲是可以存儲不同Topic不同分區的資料,然後依賴于索引系統原理實作檢索;存儲節點出現故障後由于是對等架構,分布式存儲有多副本機制,是以可繼續提供正常服務且也不需要立即進行故障轉移,可以在合适時機再做副本遷移,是以對于應用來說是無感覺的

Pulsar穩定的IO品質底層機制

Pulsar雲原生分布式消息和流平台v2.8.0
Ledger是一個隻追加的資料結構,并且隻有一個寫入器,這個寫入器負責多個BookKeeper存儲節點(就是Bookies)的寫入。 Ledger的條目會被複制到多個bookies。 Ledgers本身有着非常簡單的語義:
  • Pulsar Broker可以建立ledeger,添加内容到ledger和關閉ledger。
  • 當一個ledger被關閉後,除非明确的要寫資料或者是因為寫入器挂掉導緻ledger關閉,這個ledger隻會以隻讀模式打開。
  • 最後,當ledger中的條目不再有用的時候,整個legder可以被删除(ledger分布是跨Bookies的)。

Ledger讀一緻性

BookKeeper的主要優勢在于他能在有系統故障時保證讀的一緻性。 由于Ledger隻能被一個程序寫入(之前提的寫入器程序),這樣這個程序在寫入時不會有沖突,進而寫入會非常高效。 在一次故障之後,ledger會啟動一個恢複程序來确定ledger的最終狀态并确認最後送出到日志的是哪一個條目。 在這之後,能保證所有的ledger讀程序讀取到相同的内容。

Managed ledgers

Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. managed ledger即消息流的抽象,有一個寫入器程序不斷在流結尾添加消息,并且有多個cursors 消費這個流,每個cursor有自己的消費位置。

Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:

  1. 在故障之後,原有的某個ledger不能再寫了,需要建立一個新的。
  2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.

日志存儲

In BookKeeper, journal files contain BookKeeper transaction logs. 在更新到 ledger之前,bookie需要確定描述這個更新的事務被寫到持久(非易失)存儲上面。 在bookie啟動和舊的日志檔案大小達到上限(由

journalMaxSizeMB

參數配置)的時候,新的日志檔案會被建立。

Palsar Schema

啟用 schema 後,Pulsar 會解析資料,即接收位元組作為輸入并發送位元組作為輸出。 雖然資料不僅是位元組,但的确需要解析這些資料,解析時還可能發生解析異常,解析異常主要出現在以下幾種情況中:
  • 字段不存在
  • 字段類型已更改(例如,将

    string

    更改為

    int

簡單來說,當我們使用 schema 去建立 producer 生産者則不再需要将消息序列化為位元組,因為Pulsar schema 會在背景幫我們執行序列化操作。

Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
        .topic(topic)
        .create();
User user = new User("Tom", 28);
producer.send(user);
           

Parsar Functions

Pulsar Functions 是輕量級計算流程,具有以下特點:
  • 從一個或多個 Pulsar topic 中消費消息;
  • 将使用者提供的處理邏輯應用于每條消息;
  • 将運作結果釋出到另一個 topic。
  • Pulsar Functions可以看做是一種程式設計模型,背後的核心目标是使您能夠輕松建立各種級别的複雜的的處理邏輯,而無需部署單獨的類似系統(例如 Apache Storm, Apache Heron, Apache Flink, 等等) Pulsar Functions are computing infrastructure of Pulsar messaging system. The core goal is tied to a series of other goals:
    • 提高開發者的生産力(用開發者熟悉的語言和Pulsar Function 的函數SDK)
    • 簡單的故障排查
    • 操作簡單(不需要外部處理系統)

Function 抽象,計算對象是消息,Function 将收到消息進行計算執行業務邏輯并寫進 Output topic,Function 為開發者提供了很多便利,簡單的計算都可以通過 Function 完成。可以将 Function 結合起來,由此提出一個新概念 Function Mesh,其主要基于 K8s 開發。

Pulsar雲原生分布式消息和流平台v2.8.0
package org.example.functions;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

public class WordCountFunction implements Function<String, Void> {
    // This function is invoked every time a message is published to the input topic
    @Override
    public Void process(String input, Context context) throws Exception {
        Arrays.asList(input.split(" ")).forEach(word -> {
            String counterKey = word.toLowerCase();
            context.incrCounter(counterKey, 1);
        });
        return null;
    }
}
           

将上面的代碼編譯成可部署的 JAR 檔案,可以使用如下指令行将 JAR 包部署到 Pulsar 叢集中。

$ bin/pulsar-admin functions create \
  --jar target/my-jar-with-dependencies.jar \
  --classname org.example.functions.WordCountFunction \
  --tenant public \
  --namespace default \
  --name word-count \
  --inputs persistent://public/default/sentences \
  --output persistent://public/default/count
           

Pulsar IO

Pulsar IO連接配接器使您能夠輕松建立、部署和管理與外部系統互動的連接配接器,如Apache Cassandra、Aerospike等。

可以通過 Connector Admin CLI并結合 sources 和 sinks 子指令來管理 Pulsar 連接配接器(例如,建立、更新、啟動、停止、重新開機、重載、删除以及其他操作)。

連接配接器(sources 和 sinks)和 Functions 是執行個體的組成部分,都在 Functions workers 上運作。 通過 Connector Admin CLI 或 Functions Admin CLI 管理 source、sink 或者 function 時,在 worker 上就啟動了一個執行個體。 了解更多資訊,參閱 Functions worker。

Pulsar雲原生分布式消息和流平台v2.8.0
Pulsar雲原生分布式消息和流平台v2.8.0

Pulsar SQL

Apache Pulsar 用于存儲事件資料流,事件資料結構由預定義字段組成。 借助 Schema Registry 的實作,你可以在 Pulsar 中存儲結構化資料,并通過使用Trino(原先叫 Presto SQL)查詢這些資料。

作為 Pulsar SQL 的核心,Presto Pulsar 連接配接器支援 Presto 叢集中的 Presto worker 查詢 Pulsar 資料。

Pulsar雲原生分布式消息和流平台v2.8.0

查詢性能高效且高度可擴充,這得益于 Pulsar 的 分層分片架構。

Pulsar 中的主題以分片形式存儲在 Apache BookKeeper 中。 每個主題分片會被複制到多個 BookKeeper 節點,可以支撐并發讀和高吞吐。 你可以配置 BookKeeper 節點的數量,預設節點數是

3

。 在 Presto Pulsar 連接配接器中,資料直接從 BookKeeper 讀取,是以 Presto worker 能從水準擴充的 BookKeeper 節點中并發讀取資料。

Pulsar雲原生分布式消息和流平台v2.8.0

Tiered Storage

Pulsar 的分層存儲 功能允許将曆史 backlog 資料從 BookKeeper 中轉移到更加低廉的存儲媒體中并且允許用戶端通路無變化的 backlog 資料。

  • 分層存儲通過 Apache jclouds 來實作在 Amazon S3 和 GCS (Google Cloud Storage) 進行歸檔存儲。

    通過 jclouds,可以在未來便捷地去擴充對其他雲存儲的支援。

  • 分層存儲通過 Apache Hadoop 來實作在檔案系統中進行歸檔存儲。

    通過 Hadoop,可以在未來便捷地去擴充對其他檔案系統的支援。

Transactions

Pulsar事務 (txn) 使事件流應用程式能夠在一個原子操作中消費、處理和生成消息。

Pulsar雲原生分布式消息和流平台v2.8.0

Pulsar事務支援端到端的恰好一次流處理,這意味着消息不會從源算子(source operator)丢失,并且消息不會重複發給接收算子(sink operator)。

随着Pulsar 2.8.0中引入的事務,Pulsar Flink接收器連接配接器可以通過實作指定的

TwoPhaseCommitSinkFunction

并使用Pulsar事務 API 連接配接 Flink 接收器消息生命周期來支援exactly-once語義

Pulsar 周邊和生态

Pulsar 作為一個流原生消息平台,主要包括存儲(Stream Storage)、消息(Messaging)、計算(Processing)三個方面的工作。

Messaging 是 Pulsar 誕生之初的一個主要方向。通過 Pulsar IO 和外部系統打。

下圖藍色 Processing 方面 Queries 的引擎比如 Presto 和 HIVE 進行深度整合,讓 Presto 和 HIVE 能夠直接讀取 Pulsar 的 topic ,再結合 Pulsar 本身自帶的 Schema,将 Pulsar topic 作為的一個表直接查找 Pulsar topic 中的資料。

在 Streaming & Batch Processing 方面,與大資料處理引擎包括 Storm、Flink、Spark 進行深度整合。

在 Processing 方向的思路是與現有的大資料生态做深度的融合,讓大資料生态能夠更好地通路 Pulsar,把 Pulsar 當作資料的存儲引擎。

除此之外,Pulsar 推出了 Pulsar Function — 一個輕量級的計算架構。Pulsar Function 可以減輕很多資料的傳輸,可以靠近資料端完成計算,目前很多 IoT 場景的使用者如塗鴉智能、EMQ、中國電信及一些車聯網公司都在使用 Pulsar Function。

除了 Messaging 和 Processing ,Pulsar 擁有一個很堅實的基礎,就是擁有專門為消息、流存儲而設計的存儲引擎 Apache BookKeeper。結合 Pulsar 對分區再分片的存儲特性,我們很自然地把老的分片遷移到二級存儲中,是以 Pulsar 的架構很容支援二級存儲。二級存儲的媒體包括雲上的各種資源:S3、HDFS

Pulsar雲原生分布式消息和流平台v2.8.0

消息領域

Pulsar雲原生分布式消息和流平台v2.8.0

很多使用者在使用 Pulsar 的過程中,會發現用戶端應用的改造和遷移會很難落地。比如 Kafka 往 Pulsar 遷移過程中,客戶很可能也會有大量基于 Kafka Clients 的應用需要更改,由于需要更改協定導緻遷移很困難。

由于短時間不可能完全從 Kafka 遷移到 Pulsar,導緻對背景的運維甚至整個業務的切換帶來很大的不便捷性。

Pulsar 和 Kafka 一樣是以 topic 作為基礎,以 log 作為抽象,Pulsar 的一緻性、延遲、吞吐會更優,在這個基礎上要複用 Pulsar 的存儲層,在 Broker 端實作協定的解析,使用者的切換成本更低。Pulsar Broker 端提供 Protocol Handler 插件(現在已經實作 Kafka、AMQP、MQTT 協定的支援)的方式來支援多種協定。這種在 Broker 端做協定解析的方法,可以更友善地支援多種協定。其次還利用 Pulsar 在存儲層擁有存儲、計算分離的優勢,服務上層多種協定。

KoP(Kafka on Pulsar)

目前 StreamNative 聯合合作夥伴已推出了 KoP 項目,主要滿足想要從 Kafka 應用程式切換到 Pulsar 的使用者的強烈需求。

KoP 将 Kafka 協定處理插件引入 Pulsar broker,進而實作 Apache Pulsar 對原生 Apache Kafka 協定的支援。将 KoP 協定處理插件添加到現有 Pulsar 叢集後,使用者不用修改代碼就可以将現有的 Kafka 應用程式和服務遷移到 Pulsar,進而使用 Pulsar 的強大功能。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-9i0cziCO-1629624254691)(http://www.itxiaoshen.com:3001/assets/1629618989303RWFe8zct.png)]

KoP 相關特性:

  • Broker 的插件,Client 不需要做任何的改動;
  • 共享通路;
  • 支援 Kafka 0.10-2.x 版本;
  • 連續 Offset:增加對連續 ID 的支援。
  • 性能改進:實作與 Kafka broker 類似的機制,無需 KoP 針對 Kafka 發送的 batch 消息進行拆包解包,将 Kafka 發送過來的消息直接以 Kafka 格式進行存儲,并在 Pulsar Client 增加對 kafka 協定的解析器。
  • 支援 Envoy,并實作 Pulsar Schema 與 Kafka Schema 的相容。

AoP(AMQP on Pulsar)

AoP(AMQP on Pulsar)是 StreamNative 聯合中國移動共同開發推進的項目,類似 KoP,主要解決 AMQP 應用程式遷移到 Pulsar 的需求。目前 AoP 實作了對 AMQP 協定 0.9.1 版本的支援,2021 年計劃對 AMQP 1.0 協定進行整合支援。目前除了中國移動正在大規模應用 AoP 外,國外也有越來越多的使用者正在使用 AoP,希望更多小夥伴加入到 AoP 使用中來,共同豐富 AoP 場景,協作增強 AoP 功能。

MoP(MQTT on Pulsar)

MQTT 協定在物聯網應用十分廣泛,類似 KoP、AoP,目前 Pulsar 也通過 MoP 項目提供了對 MQTT 協定的支援。目前 MoP 支援 QoS level 0、QoS level 1 協定,2021 年計劃實作對 QoS level 2 協定的支援。

Apache Pulsar在日志系統中的應用

常見日志架構ELK

Pulsar雲原生分布式消息和流平台v2.8.0

消息隊列在日志場景中主要作用:削峰解耦、資料分發

日志系統常見挑戰:

Pulsar雲原生分布式消息和流平台v2.8.0

更多功能性要求:

Pulsar雲原生分布式消息和流平台v2.8.0

Kafka和Pulsar對比

Pulsar雲原生分布式消息和流平台v2.8.0
  • Kafka僅支援user/client-id級别、broker設定;而Pulsar則支援namespace/topic級别,粒度較小
  • Kafka增加新節點需要reassign partition才能使用;而Pulsar存儲和計算分離,可以按需增加計算或存儲節點,增加即生效,不需要reassign
  • Kafka消費能力受限Topic設定的partition數量;而Pulsar消費能力不受限Topic設定的partition數量,可以通過增加消費者數量增大消費能力
  • Kafka随着partition增多,請求下降嚴重,追加寫模式退化為随機些;而Pulsar topic/partition僅是邏輯概念,保證追加寫模式

Pulsar引入架構V1

Pulsar雲原生分布式消息和流平台v2.8.0

Pulsar引入架構V2

Pulsar雲原生分布式消息和流平台v2.8.0

Pulsar引入架構V3

Pulsar雲原生分布式消息和流平台v2.8.0

将ETL邏輯從LogStash遷移到Apache Pulsar Functions/IO中,進而起到降本提效,将資料offload到二級緩存中,滿足等保要求

Pulsar引入架構V4

Pulsar雲原生分布式消息和流平台v2.8.0
Pulsar雲原生分布式消息和流平台v2.8.0

可以通過Pulsar SQL實作千萬級别的精确資料查詢,注意是不支援模拟查詢,模拟還是需要在ES中進行

應用

  • Apache Pulsar在騰訊大資料場景落地實踐如TDBank-大資料實時接入平台騰訊慧聚
  • Apache Pulsar在華為物聯網(AMQP)之旅
  • Apache Pulsar在電信計費系統的應用
  • Apache Pulsar 在拉卡拉的技術實踐
  • KoP(Kafka on Pulsar)在 BIGO 的性能優化實踐