天天看點

Rabbitmq叢集,鏡像隊列和分布式原理

前言

基于前兩次的分享會,結合rabbitmq相關知識,做一個小結。說明一緻性的設計思想,在此說明相關的基礎理論。

CAP定理:

在計算機科學裡,CAP定理又被稱作布魯爾定理(Brewer theorem)。他認為對于一個分布式計算機系統來說,不可能同時滿足以下三點。

  1. 一緻性(Consistence)
  2. 可用性(Availability):關于系統可以被使用的時間的描述,以丢失的時間為驅動(Be Driven by Lost Time)

可用性計算公式:A = Uptime/(Uptime+ Downtime)。Uptime是可用時間,Downtime是不可用時間

     3.分區容錯性(Partition tolerance)

在大型分布式系統實踐中,分布式意味着必須滿足分區容錯性,也是P。為了追求更高的可用性,在一緻性上會做一定的妥協,通常會選擇AP。CAP定理如下圖1所示:

Rabbitmq叢集,鏡像隊列和分布式原理

                                                       圖檔1

BASE理論

      基于對大規模分布式系統的實踐總結,eBay的架構師Dan Pritchett在ACM(國際計算機學會)上發表文章提出了BASE理論,BASE理論是對CAP定理的延伸。

  1. BA: Basically available,基本可用。
  2. S: Soft state,軟狀态。
  3. E: Eventually consistent,最終一緻

       BASE理論的核心思想是:如果無法做到強一緻性,或者做到強一緻性要付出很大的代價,那麼應用可以根據自身業務特點,采用适當的方式來使系統達到最終一緻性,隻要對最終的使用者沒有影響,或者影響是可接受的即可。

Quorum機制(NWR 模型)

       在分布式場景中:如果多個服務分别向三個節點寫資料,為保持強一緻,就必須要求三個節點全部成功才傳回。這樣在讀的時候可以讀任意節點,就不會有不一緻的情況了。但是,同步寫三個節點的性能較低,如果換一個思路,一緻性并不一定要在寫資料的時候完成,可以在讀的階段決策,隻要每次讀到最新的版本就可以了。這就是Quorum機制的核心思想。

      簡單來說,Quorum機制就是要滿足公式:W+R>N,式中N代表備份個數,W代表要寫入至少W份才認為成功,R表示至少讀取R個備份。這個公式把選擇權交給了業務使用者,讓使用者來做出最終決策。

      NWR原理,如下圖2所示,兩個程序同時往3個節點寫資料,v1表示版本1,v2表示版本2,如何保證每次讀取都至少讀到一個最新的版本呢?

Rabbitmq叢集,鏡像隊列和分布式原理

                                                     圖檔2

      假設N=3, W=1, R=1,W+R<N,在節點1寫入,節點2讀取,無法讀取到最新的資料

      假設N=3, W=1,R=2,W+R=N,在節點1寫入,節點2、3讀取,無法讀到最新的資料

      假設N=3,W=2,R=2,W+R>N,寫入任意兩個節點,讀取任意兩個節點,一定會讀取到最新版本的資料。

      假設N=3,W=3,R=1,W+R>N,同時寫入所有節點,則讀取任意節點就可以得到最新的資料。

      當R=1且W=N時,适合讀多寫少的場景,讀操作是最優的。當W=1且R=N,适合寫多讀少的場景,可以得到非常快的寫操作。

      但是還存在一個寫入的沖突問題:假設N=3,W=1,一共有三個節點,隻要寫入一個就認為成功。如果第一次寫入A節點,對變量a進行減1操作,變量a在A節點上由10變成了9,記錄版本為v2,變量a還沒來得及同步到另外兩個節點上,a在B、C節點上的值還是10;第二次寫入到B節點,同樣對B節點進行減1操作,變量a在B上的值變為9,版本為v3,根據W+R>N規則,要讀取三個節點,同時得到了v2,v3版本的資料,這時候就需要合并資料,處理沖突。由于v3版本更新,就回覆寫v2版本,結果a=9,但是實際上執行了兩次減1操作。

       如何要解決這個問題,,就要同時滿足公式W>N/2。這樣能保證每次寫入都能和上次寫入有交集,變成一個樂觀鎖,隻有超過半數寫成功才算成功。

       Aurora是AWS的分布式關系型資料庫,它的存儲層就是基于Quorum協定的,除滿足W+R>N外,還有一個要求是W>N/2。Aurora會部署在三個AZ(AvailablityZone)上,每個AZ包含了2個副本,總共6個副本,至少4個執行個體寫成功才算成功,至少讀三份資料。這樣即使任意一個AZ不可使用,還有4個副本,不會丢失資料,不影響讀寫。此外,當任意一個AZ不可用的同時,另外兩個AZ中的一個副本節點也不可用了,這樣隻會影響寫,不會影響讀。由于在同一個機房通過備份恢複一個副本節點會很快,Aurora采用萬兆網卡,可以在10s恢複一個10GB副本。

租約機制(Lease)

    一種是采用投票機制(Paxos算法);另一種則是采用租約機制-Lease(參考etcd)

狀态機(Replicated State Machine)

       在分布式環境下,如果要保證更高的可用性,更好的容錯性,就必須建立多個副本。一個簡單的辦法是執行相同的指令,也就是說在每個資料中心都有相同的初始狀态,執行相同指令,并且指令的順序保持一緻,這樣就可以保證最終資料也相同了。

       Replicated State Machine的一個典型應用案例就是MySQL同步,因為MySQL本身隻支援Master-Slave結構。假設我們想同時寫入三個節點,我們就可以在寫入MySQL之前先通過Proxy寫log,三個節點的Proxy之間通過Raft選取Leader,Leader接受請求,然後通過log實作各個replica(副本)的同步。這樣可以保證讀取到最新的資料

1 基礎概念

       AMQP,即Advanced Message Queuing Protocol,進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。消息中間件主要用于元件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

       AMQP的主要特征是面向消息、隊列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

       RabbitMQ是一個開源的AMQP實作,伺服器端用Erlang語言編寫,支援多種用戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不俗。

Erlang分布式并發程式設計:

       Erlang 并發程式設計是在同一 Erlang 虛拟機上建立多個程序來實作的。而Erlang 的分布式程式設計可以通過在不同主機上的Erlang 虛拟機(可以運作在同一主機中或可以互相通過網絡主機上)實作,其中運作Erlang 虛拟機的主機也稱作Erlang 的節點。在Erlang 中,可以實作在一個Erlang 虛拟機上遠端在另一個Erlang 虛拟機上建立新的工作的程序,然後利用其消息傳遞機制,将要計算的資料發送到另一虛拟機上的工作程序,在另一虛拟機中計算後,再使用消息傳遞機制将計算結果發送回來;也可以實作在一個Erlang虛拟機上以遠端調用的方式在遠端虛拟機上進行資料處理,并直接傳回處理結果。

 Erlang語言中與分布式程式設計有關的幾個重要函數文法如下:

    1.{pname,nodename}!Message

    實作不同Erlang節點間的消息發送,其中pname是遠端節點程序的注冊名,nodename是遠端節點名。

    2.spawn(nodename,modulename,fun,[arg1,arg2..])

    實作在遠端節點上啟動一個程序,與spawn/3相比隻是多了個節點名……

    3.rpc:call(nodename,modulename,fun,[arg1,arg2..])

    實作遠端調用的函數,它是Erlang标準庫中的子產品rpc中的函數,其參數與2中相同。

    4.node()

    傳回本地節點名稱

    Erlang節點的啟動方法:

    1.在同一主機上啟動多個Erlang節點指令行格式如下:

    erl -sname nodename

    使用短名稱的形式啟動一個名稱為nodename的節點,其節點名稱為啟動後Erlang指令提示符所示,比如Erlang指令提示符為:

    ([email protected])2>

    則其節點名稱為:[email protected]。

    2.在可以互相通信的主機上分别啟動Erlang節點指令行格式如下:

    erl -name nodename -setcookie abc

    與1大緻相同,差別是使用參數設定了節點的cookie,如果要實作節點間的通信,就需要多個節點的cookie的值完全相同。而1中在同一主機中,會自動設定為相同的cookie,是以沒有進行顯式的設定。

    以下舉一簡單執行個體:

-module(nodetest).

-compile(export_all).

remotes() ->

    receive

        {From,Data} ->

            From!"Remote recved:" ++ Data,

            io:format("~s~n",[Data])

    end,

    remotes().

recv() ->

    receive

        Data -> io:format("~s~n",[Data])

    end,

    recv().

    其中定義了一個remotes/0函數,可以将收到的消息輸出後發回對方;recv/0函數隻是簡單的接收消息并輸出。

2 rabbitmq簡介

 2.1 rabbitmq整體介紹

       從生産—消費—以及服務端這三個方面來簡單說說 Rabbitmq,如下圖2-1-1所示:

Rabbitmq叢集,鏡像隊列和分布式原理

                                                                                          圖2-1-1所示

一條消息經由生産者producer發出,然後轉交到交換器exchange,然後又exchange路由到隊列中來做存儲。消費者連接配接隊列直接從隊列中讀取消息。我們這舉一個例子: 寄件人producer投遞一個包裹到郵局exchange。有郵局負責幫忙投遞到目的地queue中,之後再由收件人consumer拆件,這一套組織邏輯其實就是AMAQP協定的一套邏輯,注意RabbitMq中的exchange并不是一個處理子產品的程序,而僅僅類似一張路由表而已。

2.2rabbitmq相關概念

ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket連結,它封裝了socket協定相關部分邏輯。ConnectionFactory為Connection的制造工廠。 Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、釋出消息等。

Queue

Queue(隊列)是RabbitMQ的内部對象,用于存儲消息,用下圖表示。

Rabbitmq叢集,鏡像隊列和分布式原理

RabbitMQ中的消息都隻能存儲在Queue中,生産者(下圖中的P)生産消息并最終投遞到Queue中,消費者(下圖中的C)可以從Queue中擷取消息并消費。

Rabbitmq叢集,鏡像隊列和分布式原理

多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。

Rabbitmq叢集,鏡像隊列和分布式原理

Message acknowledgment

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就當機(或出現其他意外)的情況,這種情況下就可能會導緻消息丢失。為了避免這種情況發生,我們可以要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後才将該消息從Queue中移除;如果RabbitMQ沒有收到回執并檢測到消費者的RabbitMQ連接配接斷開,則RabbitMQ會将該消息發送給其他消費者(如果存在多個消費者)進行處理。這裡不存在timeout概念,一個消費者處理消息時間再長也不會導緻該消息被發送給其他消費者,除非它的RabbitMQ連接配接斷開。

這裡會産生另外一個問題,如果我們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這将會導緻嚴重的bug——Queue中堆積的消息會越來越多;消費者重新開機後會重複消費這些消息并重複執行業務邏輯…

Message durability

如果我們希望即使在RabbitMQ服務重新開機的情況下,也不會丢失消息,我們可以将Queue與Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丢失。但依然解決不了小機率丢失事件的發生(比如RabbitMQ伺服器已經接收到生産者的消息,但還沒來得及持久化該消息時RabbitMQ伺服器就斷電了),如果我們需要對這種小機率事件也要管理起來,那麼我們要用到事務。由于這裡僅為RabbitMQ的簡單介紹,是以這裡将不講解RabbitMQ相關的事務。

Prefetch count

前面我們講到如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能會導緻某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作并一直空閑的情況。我們可以通過設定prefetchCount來限制Queue每次發送給每個消費者的消息數,比如我們設定prefetchCount=1,則Queue每次給每個消費者發送一條消息;消費者處理完這條消息後Queue會再給該消費者發送一條消息。

Rabbitmq叢集,鏡像隊列和分布式原理

Exchange

在上一節我們看到生産者将消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生産者将消息發送到Exchange(交換器,下圖中的X),由Exchange将消息路由到一個或多個Queue中(或者丢棄)。

Rabbitmq叢集,鏡像隊列和分布式原理

Exchange是按照什麼邏輯将消息路由到Queue的?這個将在Binding一節介紹。 RabbitMQ中的Exchange有四種類型,不同的類型有着不同的路由政策,這将在Exchange Types一節介紹。

routing key

生産者在将消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。 在Exchange Type與binding key固定的情況下(在正常使用時一般這些内容都是固定配置好的),我們的生産者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪裡。 RabbitMQ為routing key設定的長度限制為255 bytes。

Binding

   RabbitMQ中通過Binding将Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正确地将消息路由到指定的Queue了。

Rabbitmq叢集,鏡像隊列和分布式原理

Binding key

在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key;消費者将消息發送給Exchange時,一般會指定一個routing key;當binding key與routing key相比對時,消息将會被路由到對應的Queue中。這個将在Exchange Types章節會列舉實際的例子加以說明。 在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。 binding key 并不是在所有情況下都生效,它依賴于Exchange Type,比如fanout類型的Exchange就會無視binding key,而是将消息路由到所有綁定到該Exchange的Queue。

Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規範裡還提到兩種Exchange Type,分别為system與自定義,這裡不予以描述),下面分别進行介紹。

fanout

fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。

Rabbitmq叢集,鏡像隊列和分布式原理

上圖中,生産者(P)發送到Exchange(X)的所有消息都會路由到圖中的兩個Queue,并最終被兩個消費者(C1與C2)消費。

direct

direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全比對的Queue中。

Rabbitmq叢集,鏡像隊列和分布式原理

以上圖的配置為例,我們以routingKey=”error”發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以routingKey=”info”或routingKey=”warning”來發送消息,則消息隻會路由到Queue2。如果我們以其他routingKey發送消息,則消息不會路由到這兩個Queue中。

topic

前面講到direct類型的Exchange路由規則是完全比對binding key與routing key,但這種嚴格的比對方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在比對規則上進行了擴充,它與direct類型的Exchage相似,也是将消息路由到binding key與routing key相比對的Queue中,但這裡的比對規則有些不同,它約定:

routing key為一個句點号“. ”分隔的字元串(我們将被句點号“. ”分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key與routing key一樣也是句點号“. ”分隔的字元串

binding key中可以存在兩種特殊字元“”與“#”,用于做模糊比對,其中“”用于比對一個單詞,“#”用于比對多個單詞(可以是零個)

Rabbitmq叢集,鏡像隊列和分布式原理

以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(隻會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都比對);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将會被丢棄,因為它們沒有比對任何bindingKey。

headers

headers類型的Exchange不依賴于routing key與binding key的比對規則來路由消息,而是根據發送的消息内容中的headers屬性進行比對。 在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全比對Queue與Exchange綁定時指定的鍵值對;如果完全比對則消息會路由到該Queue,否則不會路由到該Queue。 該類型的Exchange沒有用到過(不過也應該很有用武之地),是以不做介紹。

RPC

MQ本身是基于異步的消息處理,前面的示例中所有的生産者(P)将消息發送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。 但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端将我的消息處理完成後再進行下一步處理。這相當于RPC(Remote Procedure Call,遠端過程調用)。在RabbitMQ中也支援RPC。

Rabbitmq叢集,鏡像隊列和分布式原理

RabbitMQ中實作RPC的機制是:

用戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協定中定義了14種properties,這些屬性會随着消息一起發送)中設定兩個值replyTo(一個Queue名稱,用于告訴伺服器處理完成後将通知我的消息發送到這個Queue中)和correlationId(此次請求的辨別号,伺服器處理完成後需要将此屬性返還,用戶端将根據這個id了解哪條請求被成功執行了或執行失敗)

伺服器端收到消息并處理

伺服器端處理完消息後,将生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性

用戶端之前已訂閱replyTo指定的Queue,從中收到伺服器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理

3 rabbitmq叢集方案的原理

RabbitMQ這款消息隊列中間件産品本身是基于Erlang編寫,Erlang語言天生具備分布式特性(通過同步Erlang叢集各節點的magic cookie來實作)。

是以,RabbitMQ天然支援Clustering。這使得RabbitMQ本身不需要像ActiveMQ、Kafka那樣通過ZooKeeper分别來實作HA方案和儲存叢集的中繼資料。叢集是保證可靠性的一種方式,同時可以通過水準擴充以達到增加消息吞吐量能力的目的。下面先來看下RabbitMQ叢集的整體方案:

Rabbitmq叢集,鏡像隊列和分布式原理

上面圖中采用三個節點組成了一個RabbitMQ的叢集,Exchange A的中繼資料資訊在所有節點上是一緻的,而Queue(存放消息的隊列)的完整資料則隻會存在于它所建立的那個節點上。,其他節點隻知道這個queue的metadata資訊和一個指向queue的owner node的指針。

3.1 RabbitMQ叢集中繼資料的同步

RabbitMQ叢集會始終同步四種類型的内部中繼資料(類似索引): a.隊列中繼資料:隊列名稱和它的屬性; b.交換器中繼資料:交換器名稱、類型和屬性; c.綁定中繼資料:一張簡單的表格展示了如何将消息路由到隊列; d.vhost中繼資料:為vhost内的隊列、交換器和綁定提供命名空間和安全屬性; 是以,當使用者通路其中任何一個RabbitMQ節點時,通過rabbitmqctl查詢到的queue/user/exchange/vhost等資訊都是相同的。

3.2為何RabbitMQ叢集僅采用中繼資料同步的方式

 想要實作HA方案,那将RabbitMQ叢集中的所有Queue的完整資料在所有節點上都儲存一份不就可以了麼?(可以類似MySQL的主主模式嘛)這樣子,任何一個節點出現故障或者當機不可用時,那麼使用者的用戶端隻要能連接配接至其他節點能夠照常完成消息的釋出和訂閱嘛。

設計主要還是基于叢集本身的性能和存儲空間上來考慮。

第一,存儲空間,如果每個叢集節點都擁有所有Queue的完全資料拷貝,那麼每個節點的存儲空間會非常大,叢集的消息積壓能力會非常弱(無法通過叢集節點的擴容提高消息積壓能力);

第二,性能,消息的釋出者需要将消息複制到每一個叢集節點,對于持久化消息,網絡和磁盤同步複制的開銷都會明顯增加。

3.3 RabbitMQ叢集發送/訂閱消息的基本原理

RabbitMQ叢集的工作原理圖如下:

Rabbitmq叢集,鏡像隊列和分布式原理

場景1:用戶端直接連接配接隊列所在節點

如果有一個消息生産者或者消息消費者通過amqp-client的用戶端連接配接至節點1進行消息的釋出或者訂閱,那麼此時的叢集中的消息收發隻與節點1相關,這個沒有任何問題;如果用戶端相連的是節點2或者節點3(隊列1資料不在該節點上),那麼情況又會是怎麼樣呢?

場景2:用戶端連接配接的是非隊列資料所在節點

如果消息生産者所連接配接的是節點2或者節點3,此時隊列1的完整資料不在該兩個節點上,那麼在發送消息過程中這兩個節點主要起了一個路由轉發作用,根據這兩個節點上的中繼資料(也就是上文提到的:指向queue的owner node的指針)轉發至節點1上,最終發送的消息還是會存儲至節點1的隊列1上。

同樣,如果消息消費者所連接配接的節點2或者節點3,那這兩個節點也會作為路由節點起到轉發作用,将會從節點1的隊列1中拉取消息進行消費。

在此提個問題:如果用戶端連接配接的是非隊列資料所在節點,但是隊列所在節點挂掉了,情況如何?接着往下看,用到了鏡像隊列

4 rabbitmq重要的原理概念

4.1 rabbitmq之惰性隊列

RabbitMQ從3.6.0版本開始引入了惰性隊列(Lazy Queue)的概念。惰性隊列會盡可能的将消息存入磁盤中,而在消費者消費到相應的消息時才會被加載到記憶體中,它的一個重要的設計目标是能夠支援更長的隊列,即支援更多的消息存儲。當消費者由于各種各樣的原因(比如消費者下線、當機亦或者是由于維護而關閉等)而緻使長時間内不能消費消息造成堆積時,惰性隊列就很有必要了。

預設情況下,當生産者将消息發送到RabbitMQ的時候,隊列中的消息會盡可能的存儲在記憶體之中,這樣可以更加快速的将消息發送給消費者。即使是持久化的消息,在被寫入磁盤的同時也會在記憶體中駐留一份備份。當RabbitMQ需要釋放記憶體的時候,會将記憶體中的消息換頁至磁盤中,這個操作會耗費較長的時間,也會阻塞隊列的操作,進而無法接收新的消息。雖然RabbitMQ的開發者們一直在更新相關的算法,但是效果始終不太理想,尤其是在消息量特别大的時候。

惰性隊列會将接收到的消息直接存入檔案系統中,而不管是持久化的或者是非持久化的,這樣可以減少了記憶體的消耗,但是會增加I/O的使用,如果消息是持久化的,那麼這樣的I/O操作不可避免,惰性隊列和持久化消息可謂是“最佳拍檔”。注意如果惰性隊列中存儲的是非持久化的消息,記憶體的使用率會一直很穩定,但是重新開機之後消息一樣會丢失。

隊列具備兩種模式:default和lazy。預設的為default模式,在3.6.0之前的版本無需做任何變更。lazy模式即為惰性隊列的模式,可以通過調用channel.queueDeclare方法的時候在參數中設定,也可以通過Policy的方式設定,如果一個隊列同時使用這兩種方式設定的話,那麼Policy的方式具備更高的優先級。如果要通過聲明的方式改變已有隊列的模式的話,那麼隻能先删除隊列,然後再重新聲明一個新的。

在隊列聲明的時候可以通過“x-queue-mode”參數來設定隊列的模式,取值為“default”和“lazy”。下面示例中示範了一個惰性隊列的聲明細節:

Map<String, Object> args = new HashMap<String, Object>();

args.put("x-queue-mode", "lazy");

channel.queueDeclare("myqueue", false, false, false, args);

對應的Policy設定方式為:

rabbitmqctl set_policy Lazy "^myqueue$" '{"queue-mode":"lazy"}' --apply-to queue

 惰性隊列和普通隊列相比,隻有很小的記憶體開銷。這裡很難對每種情況給出一個具體的數值,但是我們可以類比一下:當發送1千萬條消息,每條消息的大小為1KB,并且此時沒有任何的消費者,那麼普通隊列會消耗1.2GB的記憶體,而惰性隊列隻消耗1.5MB的記憶體。

據官網測試資料顯示,對于普通隊列,如果要發送1千萬條消息,需要耗費801秒,平均發送速度約為13000條/秒。如果使用惰性隊列,那麼發送同樣多的消息時,耗時是421秒,平均發送速度約為24000條/秒。出現性能偏差的原因是普通隊列會由于記憶體不足而不得不将消息換頁至磁盤。如果有消費者消費時,惰性隊列會耗費将近40MB的空間來發送消息,對于一個消費者的情況,平均的消費速度約為14000條/秒。

如果要将普通隊列轉變為惰性隊列,那麼我們需要忍受同樣的性能損耗。當轉變為惰性隊列的時候,首先需要将緩存中的消息換頁至磁盤中,然後才能接收新的消息。反之,當将一個惰性隊列轉變為普通隊列的時候,和恢複一個隊列執行同樣的操作,會将磁盤中的消息批量的導入到記憶體中。

4.2 rabbitmq之鏡像隊列

4.2.1 rabbitmq鏡像隊列介紹

假想概述:

如果RabbitMQ叢集隻有一個broker節點,那麼在該節點的失效将導緻整個服務臨時性的不可用,并且可能會導緻message的丢失(尤其是在非持久化message存儲于非持久化queue中的時候)。當然可以将所有的publish的message都設定為持久化的,并且使用持久化的queue,但是這樣仍然無法避免由于緩存導緻的問題:因為message在發送之後和被寫入磁盤并執行fsync之間存在一個雖然短暫但是會産生問題的時間窗。通過publisher的confirm機制能夠確定用戶端知道哪些message已經存入磁盤,盡管如從,一般不希望遇到因單點故障導緻的服務不可用。

如果rabbitmq叢集是由多個broker節點構成的,那麼從服務的整體可用性上來講,該叢集對于單點失效是有彈性的,但是同時也需要注意: 盡管exchange和binding能夠在單點失效問題上幸免于難,但是queue和其上持有的message卻不行,這是因為queue及其内容僅僅儲存于單個節點之上,是以一個節點的失效表現為其對應的queue不可用。

引入Rabbitmq的鏡像隊列機制,将queue鏡像到cluster中其他的節點之上。在該實作下,如果其中的一個節點失效了,queue能自動地切換到鏡像中的另一個節點以保證服務的可用性。在通常的用法中,針對每一個鏡像隊列都包含一個master和多個slave,分别對用于不同的節點。Slave會準确地按照master執行指令的順序進行指令執行,故slave與master上維護的狀态應該是相同的。除了publish外所有動作都之後會向master發送,然後由master将指令執行的結果廣播給slave們,故看似從鏡像隊列中消費操作實際上是在master上執行的的。

一旦完成了選中的slave被提升為master的動作,發送到鏡像隊列的message将不會在丢失:publish到鏡像隊列的所有消息總是被直接publish到master和所有的slave之上。這樣一旦master失效了,message仍然可以繼續發送到其他slave上。

Rabbitmq的鏡像隊列同時支援publisher, confirm和事物兩種機制。在事物機制中,隻有目前事物在全部鏡像queue中執行之後,用戶端才會收到Tx.CommitOK的消息。同樣的,在publisher confirm機制中,向publish進行目前message确認的前提是該messsage被全部鏡像所接受了

鏡像隊列設定:

鏡像隊列的配置通過添加policy完成,policy添加的指令為:

1.rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

-p Vhost: 可選參數,針對指定vhost下的queue進行設定

Name: policy的名稱

Pattern: queue的比對模式(正規表達式)

Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode

ha-mode:指明鏡像隊列的模式,有效值為 all/exactly/nodes

all:表示在叢集中所有的節點上進行鏡像

exactly:表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定

nodes:表示在指定的節點上進行鏡像,節點名稱通過ha-params指定

ha-params:ha-mode模式需要用到的參數

ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic和manual

priority:可選參數,policy的優先級

例如,對隊列名稱以“queue_”開頭的所有隊列進行鏡像,并在叢集的兩個節點上完成進行,policy的設定指令為:

1 rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

也可以通過RabbitMQ的web管理界面設定:

Rabbitmq叢集,鏡像隊列和分布式原理

普通MQ的結構:

Rabbitmq叢集,鏡像隊列和分布式原理

通常隊列由兩部分組成:一部分是AMQQueue,負責AMQP協定相關的消息處理,即接收生産者釋出的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相關的接口供AMQQueue調用,完成消息的存儲以及可能的持久化工作等。

在RabbitMQ中BackingQueue又由5個子隊列組成:Q1, Q2, Delta, Q3和Q4。RabbitMQ中的消息一旦進入隊列,不是固定不變的,它會随着系統的負載在隊列中不斷流動,消息的不斷發生變化。與這5個子隊列對于,在BackingQueue中消息的生命周期分為4個狀态:

Alpha:消息的内容和消息索引都在RAM中。Q1和Q4的狀态。

Beta:消息的内容儲存在DISK上,消息索引儲存在RAM中。Q2和Q3的狀态。

Gamma:消息内容儲存在DISK上,消息索引在DISK和RAM都有。Q2和Q3的狀态。

Delta:消息内容和索引都在DISK上。Delta的狀态。

注意:對于持久化的消息,消息内容和消息所有都必須先儲存在DISK上,才會處于上述狀态中的一種,而Gamma狀态的消息是隻有持久化的消息才會有的狀态。

上述就是RabbitMQ的多層隊列結構的設計,我們可以看出從Q1到Q4,基本經曆RAM->DISK->RAM這樣的過程。這樣設計的好處是:當隊列負載很高的情況下,能夠通過将一部分消息由磁盤儲存來節省記憶體空間,當負載降低的時候,這部分消息又漸漸回到記憶體,被消費者擷取,使得整個隊列具有很好的彈性。下面我們就來看一下,整個消息隊列的工作流程。

引起消息流動主要有兩方面因素:其一是消費者擷取消息;其二是由于記憶體不足引起消息換出到磁盤。RabbitMQ在系統運作時會根據消息傳輸的速度計算一個目前記憶體中能夠儲存的最大消息數量(Target_RAM_Count),當記憶體中的消息數量大于該值時,就會引起消息的流動。進入隊列的消息,一般會按照Q1->Q2->Delta->Q3->Q4的順序進行流動,但是并不是每條消息都一定會經曆所有的狀态,這個取決于目前系統的負載狀況。

當消費者擷取消息時,首先會從Q4隊列中擷取消息,如果Q4擷取成功,則傳回。如果Q4為空,則嘗試從Q3擷取消息,首先系統會判斷Q3是否為空,如果為空則傳回隊列為空,即此時隊列中無消息(後續會論證)。如果不為空,則取出Q3的消息,然後判斷此時Q3和Delta隊列的長度,如果都為空,則可認為Q2、Delta、Q3、Q4全部為空(後續會論證),此時将Q1中消息直接轉移到Q4中,下次直接從Q4中擷取消息。如果Q3為空,Delta不為空,則将Delta轉移到Q3中,如果Q3不為空,則直接下次從Q3中擷取消息。在将Delta轉移到Q3的過程中,RabbitMQ是按照索引分段讀取的,首先讀取某一段,直到讀到的消息非空為止,然後判斷讀取的消息個數與Delta中的消息個數是否相等,如果相等,則斷定此時Delta中已無消息,則直接将Q2和剛讀到的消息一并放入Q3中。如果不相等,則僅将此次讀取到的消息轉移到Q3。這就是消費者引起的消息流動過程。 

消息換出的條件是記憶體中儲存的消息數量+等待ACK的消息的數量>Target_RAM_Count。當條件出發時,系統首先會判斷如果目前進入等待ACK的消息的速度大于進入隊列的消息的速度時,會先處理等待ACK的消息。

最後我們來分析一下前面遺留的兩個問題,一個是為什麼Q3隊列為空即可以認定整個隊列為空。試想如果Q3為空,Delta不空,則在Q3取出最後一條消息時,Delta上的消息就會被轉移到Q3上,Q3空沖突。如果Q2不空,則在Q3取出最後一條消息,如果Delta為空,則會将Q2的消息并入到Q3,與Q3為空沖突。如果Q1不為空,則在Q3取出最後一條消息,如果Delta和Q3均為空時,則将Q1的消息轉移到Q4中,與Q4為空沖突。這也解釋了另外一個問題,即為什麼Q3和Delta為空,Q2就為空。

通常在負載正常時,如果消息被消費的速度不小于接收新消息的速度,對于不需要保證可靠不丢的消息極可能隻會有Alpha狀态。對于durable=true的消息,它一定會進入gamma狀态,若開啟publish confirm機制,隻有到了這個階段才會确認該消息已經被接受,若消息消費速度足夠快,記憶體也充足,這些消息也不會繼續走到下一狀态。

通常在系統負載較高時,已接受到的消息若不能很快被消費掉,這些消息就會進入到很深的隊列中去,增加處理每個消息的平均開銷。因為要花更多的時間和資源處理“積壓”的消息,是以用于處理新來的消息的能力就會降低,使得後來的消息又被積壓進入很深的隊列,繼續加大處理每個消息的平均開銷,這樣情況就會越來越惡化,使得系統的處理能力大大降低。

4.2.2 鏡像隊列的實作原理

4.2.2.1 整體介紹

通常隊列由兩部分組成:一部分是amqqueue_process,負責協定相關的消息處理,即接收生産者釋出的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另一部分是backing_queue,它提供了相關的接口供amqqueue_process調用,完成消息的存儲以及可能的持久化工作等。

Rabbitmq叢集,鏡像隊列和分布式原理

鏡像隊列同樣由這兩部分組成,amqqueue_process仍舊進行協定相關的消息處理,backing_queue則是由master節點和slave節點組成的一個特殊的backing_queue。master節點和slave節點都由一組程序組成,一個負責消息廣播的gm,一個負責對gm收到的廣播消息進行回調處理。在master節點上回調處理是coordinator(協調員),在slave節點上則是mirror_queue_slave。mirror_queue_slave中包含了普通的backing_queue進行消息的存儲,master節點中backing_queue包含在mirror_queue_master中由amqqueue_process進行調用。

Rabbitmq叢集,鏡像隊列和分布式原理

注意:消息的釋出與消費都是通過master節點完成。master節點對消息進行處理的同時将消息的處理動作通過gm廣播給所有的slave節點,slave節點的gm收到消息後,通過回調交由mirror_queue_slave進行實際的處理。

4.2.2.2 gm(Guaranteed Multicast)

傳統的主從複制方式:由master節點負責向所有slave節點發送需要複制的消息,在複制過程中,如果有slave節點出現異常,master節點需要作出相應的處理;如果是master節點本身出現問題,那麼slave節點間可能會進行通信決定本次複制是否繼續。當然為了處

理各種異常情況,整個過程中的日志記錄是免不了的。然而rabbitmq中并沒有采用這種方式,而是将所有的節點形成一個循環連結清單,每個節點都會監控位于自己左右兩邊的節點,當有節點新增時,相鄰的節點保證目前廣播的消息會複制到新的節點上;當有節點失效時,相鄰的節點會接管保證本次廣播的消息會複制到所有節點。

在master節點和slave節點上的這些gm形成一個group,group的資訊會記錄在mnesia中。不同的鏡像隊列形成不同的group。消息從master節點對應的gm發出後,順着連結清單依次傳送到所有節點,由于所有節點組成一個循環連結清單,master節點對應的gm最終會收到自己發送的消息,這個時候master節點就知道消息已經複制到所有slave節點了。

4.2.2.3 重要的表結構

rabbit_queue表記錄隊列的相關資訊:

-record(q, {q, %% 隊列資訊資料結構amqqueue

exclusive_consumer, %% 目前隊列的獨有消費者

has_had_consumers, %% 目前隊列中是否有消費者的辨別

backing_queue, %% backing_queue對應的子產品名字

backing_queue_state, %% backing_queue對應的狀态結構

consumers, %% 消費者存儲的優先級隊列

expires, %% 目前隊列未使用就删除自己的時間

sync_timer_ref, %% 同步confirm的定時器,目前隊列大部分接收一次消息就要確定目前定時器的存在(200ms的定時器)

rate_timer_ref, %% 隊列中消息進入和出去的速率定時器

expiry_timer_ref, %% 隊列中未使用就删除自己的定時器

stats_timer, %% 向rabbit_event釋出資訊的資料結構狀态字段

msg_id_to_channel, %% 目前隊列程序中等待confirm的消息gb_trees結構,裡面的結構是Key:MsgId Value:{SenderPid, MsgSeqNo}

ttl, %% 隊列中設定的消息存在的時間

ttl_timer_ref, %% 隊列中消息存在的定時器

ttl_timer_expiry, %% 目前隊列頭部消息的過期時間點senders, %% 向目前隊列發送消息的rabbit_channel程序清單

dlx, %% 死亡消息要發送的exchange交換機(通過隊列聲明的參數或者policy接口來設定)

dlx_routing_key, %% 死亡消息要發送的路由規則(通過隊列聲明的參數或者policy接口來設定)

max_length, %% 目前隊列中消息的最大上限(通過隊列聲明的參數或者policy接口來設定)

max_bytes, %% 隊列中消息内容占的最大空間

args_policy_version, %% 目前隊列中參數設定對應的版本号,每設定一次都會将版本号加一

status %% 目前隊列的狀态

}).

注意:slave_pids的存儲是按照slave加入的時間來排序的,以便master節點失效時,提升

gm_group表記錄gm形成的group的相關資訊:%% 整個鏡像隊列群組的資訊,該資訊會存儲到Mnesia資料庫

-record(gm_group, { name, %% group的名稱,與queue的名稱一緻

version, %% group的版本号, 新增節點/節點失效時會遞增

members %% group的成員清單, 按照節點組成的連結清單順序進行排序

}).

view_member記錄目前GM群組中真實的視圖的資料結構:

%% 鏡像隊列群組視圖成員資料結構

-record(view_member, { id, %% 單個鏡像隊列(結構是{版本号,

該鏡像隊列的Pid})

aliases, %% 記錄id對應的左側死亡的GM程序清單

left, %% 目前鏡像隊列左邊的鏡像隊列(結構是{版本号,該鏡像隊列的Pid})

right %% 目前鏡像隊列右邊的鏡像隊列(結構是{版本号,該鏡像隊列的Pid})

}).

記錄單個GM程序中資訊的資料結構:

%% 記錄單個GM程序中資訊的資料結構

-record(member, { pending_ack, %% 待确認的消息,也就是已釋出的消息緩存的地方

last_pub, %% 最後一次釋出的消息計數

last_ack %% 最後一次确認的消息計數

}).

5 Rabbitmq出現的問題以及思考

5.1消息堆積的問題

當消息大量堆積的時候,首先大量堆積就以為大量的消息都得不到及時處理,時延問題可以忽略;其次,也不用考慮預取個數設定的過大而造成的其他消費者空閑,因為大家都一直都有消息在流入處理。消息堆積可能會引起換頁而性能下降,或者更槽糕的是觸發記憶體告警而阻塞所有的生産者。

消息堆積治理方案如下:

  • 方案一:

         丢棄政策: 設定消息保留時間或者保留大小(可以是個數或者占用大小),逾時就丢棄

  • 方案二:

Lazy Queue

  • 方案三:

    “移花接木”

第一種方案是丢棄政策,即當消息超過預定的保留時間以及目前消息堆積的個數或者是記憶體占用而選擇丢棄資料,這個可以類比于kafka的日志保留政策,當然這種情況适合于消息可靠性要求不高、可丢棄的場景。又比如Java線程池的飽和政策中就有一種是丢棄政策。Rabbitmq本身并沒有提供相應的配置和功能,這個需要外部平台的包裝。

第二種方案是前面所提及的惰性隊列,完全采用磁盤的空間來極大的增加堆積的能力(一般情況下,一條伺服器的磁盤容量比記憶體容量要大得多的多)

第三種方案,我們把它稱之為“移花接木”,下面看一張圖,

Rabbitmq叢集,鏡像隊列和分布式原理

當某個隊列中的消息嚴重堆積時,舉例:目前運作的叢集cluster1中隊列queue1的消息個數超過2kw或者占用記憶體大小超過10GB,就可以啟用shovel1将隊列queue1中的消息轉發至備份叢集cluster2中的隊列queue2中,這樣可以分攤堆積的壓力;當檢測到隊列queue1中的消息個數低于100w或者消息占用大小低于1GB時就停止shovel1,然後讓原本隊列queue1中的消費者慢慢處理剩餘的堆積;當檢測到隊列queue1中的消息個數低于100w或者消息占用大小低于1GB時就開啟shovel2将隊列queue2中暫存的消息返還給隊列queue1;當檢測到隊列queue1中的消息個數超過100w或者消息占用大小超過1GB時就将shovel2停掉,經過一個周期之後,再開啟shovel2,超過閥值時就停掉,如此反複多次直到将隊列queue2中的消息清空為止。

6參考資料

  1. Rabbitmq源碼:https://github.com/rabbitmq/rabbitmq-server
  2. Rabbitmq源碼解析:https://github.com/sky-big/RabbitMQ
  3. 參考網站資料:http://www.iocoder.cn/RabbitMQ/good-collection/
  4. 參考書籍:《持續演進的Cloud Native 雲原生架構下微服務最佳實踐》