天天看點

activemq 簡介 配置

文章不錯,有些和最新版本不一緻,但是可以參考,學習了。

 ActiveMQ簡介

1.  什麼是ActiveMQ

ActiveMQ是一種開源的,實作了JMS1.1規範的,面向消息(MOM)的中間件,為應用程式提供高效的、可擴充的、穩定的和安全的企業級消息通信。ActiveMQ使用Apache提供的授權,任何人都可以對其實作代碼進行修改。

ActiveMQ的設計目标是提供标準的,面向消息的,能夠跨越多語言和多系統的應用內建消息通信中間件。ActiveMQ實作了JMS标準并提供 了很多附加的特性。這些附加的特性包括,JMX管理(java Management Extensions,即java管理擴充),主從管理(master/salve,這是叢集模式的一種,主要展現在可靠性方面,當主中介(代理)出現故障,那麼從代理會替代主代理的位置,不至于使消息系統癱瘓)、消息組通信(同一組的消息,僅會送出給一個客戶進行處理)、有序消息管理(確定消息能夠按照 發送的次序被接受者接收)。消息優先級(優先級高的消息先被投遞和處理)、訂閱消息的延遲接收(訂閱消息在釋出時,如果訂閱者沒有開啟連接配接,那麼當訂閱者開啟連接配接時,消息中介将會向其送出之前的,其未處理的消息)、接收者處理過慢(可以使用動态負載平衡,将多數消息送出到處理快的接收者,這主要是對PTP 消息所說)、虛拟接收者(降低與中介的連接配接數目)、成熟的消息持久化技術(部分消息需要持久化到資料庫或檔案系統中,當中介崩潰時,資訊不會丢失)、支援遊标操作(可以處理大消息)、支援消息的轉換、通過使用Apache的Camel可以支援EIP、使用鏡像隊列的形式輕松的對消息隊列進行監控等。

2.  ActiveMQ 特性

支援JMS規範:ActiveMQ完全實作了JMS1.1規範。

JMS規範提供了同步消息和異步消息投遞方式、有且僅有一次投遞語義(指消息的接收者對一條消息必須接收到一次,并且僅有一次)、訂閱消息持久接收等。如果僅使用JMS規範,表明無論您使用的是哪家廠商的消息代理,都不會影響到您的程式。

連接配接方式的多樣化:ActiveMQ提供了廣泛的連接配接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的連接配接模式表明了ActiveMQ具有較高的靈活性。

可插入式的持久化和安全:ActiveMQ提供了多種持久化方案,您可以根據實際需要進行選擇。同時,也提供了完整的客戶授權模式。

使用Java建立消息應用程式:最常見的使用ActiveMQ的方式就是使用Java程式來發送和接收消息。

與其他的Java容器緊密內建:ActiveMQ提供了和其它流行的Java容器的結合,包括Apache Geronimo、Apache Tomcat、JBoss、Jetty等。

用戶端API:ActiveMQ提供了多種用戶端可通路的API,包括Java、C/C++,.NET,Perl、PHP、Python、Ruby等。當然,ActiveMQ中介必須運作在Java虛拟機中,但是使用它的用戶端可以使用其他的語言來實作。

中介叢集:多個ActiveMQ中介可以一起協同工作,來完成某項複雜的工作,這被稱為網絡型中介(networkof brokers),這種類型的中介将會支援多種拓撲類型。

3.  為什麼使用ActiveMQ

在設計分布式應用程式時,應用程式間的耦合(或稱內建)方式很重要。耦合意味着兩個或者多個應用程式或系統的互相依賴關系。一種簡單的方式是在所有的應用程式中從架構上設計他們與其他應用程式間的交叉實作。這樣必然導緻,一個應用程式的改變,直接導緻另一個應用程式的改變。按照這種方式內建的應用是 一種緊耦合的應用。一個應用的改變不會影響到其他應用的內建方式被稱為是松耦合的內建方式。簡單的說,松耦合應用程式內建能夠更容易的處理不可預見的應用變化。

像COM、CORBA、DCE和EJB等應用技術使用RPC(Remote Procedural Calls,即遠端過程調用)屬于緊耦合技術。使用RPC,一個應用程式調用另一個應用程式,調用者必須阻塞,直到被調用者執行結束傳回結果資訊為止。下 圖給出了這種緊耦合技術的描述:

許多系統架構使用RPC,并且獲得了巨大的成功,但是,緊耦合的架構有着天生的缺陷。首先,這種架構将會造成系統維護管理上的巨大消費,因為,即使是很小的改動,很可能會波及到整個系統。其次,由于調用者必須阻塞式的等待被調用者傳回,如果被調用者處理過程複雜,将會嚴重影響調用者的執行效率和資源 使用率。此外,如果調用失敗,整個架構即失敗。

下圖給出一種松耦合的方式,進行架構設計:

應用程式1向消息中介(MOM)發送一條消息,很可能一段時間之後,應用程式2調用MOM來收取消息。任何一個應用程式都不知道對方是否存在也不需要阻塞等待。這種通信方式大大縮減了維護開銷,因為對于一個應用程式的修改,會對其他應用程式影響極小。

ActiveMQ就是采用了上面提到的松耦合方式,是以,我們經常說應用程式發送消息僅僅是觸發後忘卻。應用程式将消息發送給ActiveMQ而并 不關心什麼時間以何種方式消息投遞給接收者。同樣的,消息接收者也不會關心消息來源于哪裡和消息是怎樣投遞給ActiveMQ的。對于多語言編寫的複雜應 用環境中,允許用戶端使用不同的程式設計語言甚至不同的消息包裝協定。ActiveMQ作為消息的中間件,允許複雜的多語言應用程式以一種一步的方式內建和交 互。是以說,ActiveMQ是一種好的,提供松散耦合的,能夠為多語言交叉應用提供內建的中間件。

4.  什麼情況下使用ActiveMQ

正如前面提到的,緊耦合應用系統存在許多問題,但是,要将緊耦合系統重構成松耦合系統是一件值得但比較繁瑣的事情。使用松耦合的主要優勢展現在将同步改為異步。使用異步通信,應用程式将從接收者回報的等待中解放出來,其他的任務可以得到執行,這樣提高了應用程式的效率。

隻要是兩個應用程式間需要通信的情況,都可以考慮使用JMS,不論這種通信是在本地的(就是通信的兩個應用程式在同一台主機上),還是分布在不同機 器上。盡管是在同一個主機上的兩個應用程式需要通信也可以使用ActiveMQ。ActiveMQ可以確定消息投遞成功并采用異步方式通信。

多個需要通信的應用程式在同一個機器上的情況下,您可以考慮在執行機上獨立運作ActiveMQ或者将ActiveMQ嵌入到Java應用服務中。 無論采用哪種方式,都可以確定應用程式能夠發送和接收消息。您可以選擇訂閱模式(pub/sub)或者采用PTP(pointto point)模式,這兩種模式都無需等待執行回報資訊。每一個應用程式都可以簡單的将消息發送給ActiveMQ,然後繼續做其他的工作;應用程式無需阻塞式等待消息的傳回。

對于分布在多台主機上的應用程式來說,可以使用多種布置政策。主要包括單一ActiveMQ執行個體和多ActiveMQ執行個體。單一ActiveMQ實 例是一個簡單解決方案。所有的應用程式都向同一個ActiveMQ中介發送和接收消息,這與上面提到的單機多服務雷同。單一的ActiveMQ可以布置到 一台單獨的主機上,也可以和其中的一些服務布置在一起。重要的是,所有的應用必須能夠直接與ActiveMQ中介進行互動,是以,你必須考慮到你的網絡設 計。

第二種情況比較複雜,但是有ActiveMQ來負責遠端通信,而不是應用程式自身。在這種場景下,每一個應用程式都會執行個體化一個 ActiveMQ(無論是嵌入式的還是獨立式的),應用程式從其本地的ActiveMQ發送和接收消息。之後這些ActiveMQ執行個體将會以一種聯合的方 式協同工作。消息将會基于每一個應用的要求在多個ActiveMQ中介間傳遞到遠端的處理者。在ActiveMQ中,這種模式被稱為netWork of brokers。采用這種模式對于處理大量的ActiveMQ消息是可行的,但是,我們往往需要減輕網絡拓撲的複雜性,這樣直接将消息投遞到遠端接收者的 ActiveMQ是不可行的。在後一種情況下,不同的協定使用可以使ActiveMQ更輕松的傳遞消息。

5. ActiveMQ傳輸效率

計算機環境

CPU:Intel(R) Cpu G530 @ 2.40GHz 2.40

Memory:2GB

HD:希捷 ST3500620AS

OS:window xp Service Pack 3

   發送10萬條長度為25的消息耗時6~7秒鐘, cpu占用量很大。

   如果持續發送不接受的話,伺服器承受到30萬時容易卡住發送達到26秒之多

   一次性接收所有的消息50萬,cpu占用100%占用時間50s左右,可以全部接收。

   接收完成後在繼續發送10萬消息占用時間6~7s

  在一次性發送50萬消息時出現問題

INFO | Usage Manager Memory Limit (524288000) reached on queue://FirstQueue.Producers will be throttled to the rate at which messages are removed from thisdestination to prevent flooding it. See

http://activemq.apache.org/producer-flow-control.html for more info

在網上找原因說是配置了發送限制,修改xml 運作時間是53s 的樣子

繼續發送50萬,一分50秒

在發送50萬

在發送50萬2分多鐘的樣子

 試着接收這200萬消息可以成功,但時間很長。

  修改xml後在接受消息的同時發送10萬消息,

  一次發送100萬耗時4分左右的樣子

接收方也可以全部接收。

  模拟伺服器斷電,非持久性模式時沒有被消費的消息不會繼續消費,全部丢失

程式會報一個連接配接關閉異常停止運作,繼續啟動伺服器運作程式,不會接收任何消息。

         模拟伺服器斷電,持久性模式時沒有被消費的消息會繼續消費

定義了消息的持久性模式後,即使關閉了伺服器,程式也會報連接配接關閉異常,但再次啟動伺服器和程式後,接收方還能繼續原來的消息再次接收。

總結

總體看來,在配置好xml的情況下,activemq對消息傳輸上還是沒有問題的,發送的消息都可以全部接收,發送多少條就接收多少條,準确度上還是有保證的,持久模式支援斷電續傳功能。雖然功能上沒有什麼問題但對cpu的占用率就比較大了,發送或接受消息的時候都達到了100%,記憶體到不會很大。 這跟自己使用機子有關系,配置好點的機子可能情況會好些。

6. ActiveMQ配置傳輸連接配接

ActiveMQ提供了廣泛的連接配接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的連接配接模式表明了ActiveMQ具有較高的靈活性。

配置格式如下:

  <transportConnectors>

            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>

            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>

            <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>

            <transportConnector name="stomp" uri="stomp://0.0.0.0:61618?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>

            <transportConnector name="xmpp" uri="xmpp://0.0.0.0:61619?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>

        </transportConnectors>

生産者和消費着可以使用不同的傳輸協定來傳輸資訊。比如生産者用nio協定生産消息,消費者用tcp協定接收消息。

7. ActiveMQ配置網絡連接配接

當應用到Broker的叢集時,Borker與Broker的通信就用到了網絡連接配接。

配置格式如下:

<networkConnectors>

<!-- 動态連接配接方式

<networkConnector name="default-nc" uri="multicast://default"

                   dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true"

         /> -->

         <!-- 靜态連接配接方式 <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> -->

</networkConnectors>

8.  ActiveMQ持久化存儲模式

ActiveMq主要實作了如下幾種存儲:

1.4.1.   AMQ消息存儲—預設的消息存儲

  它是一種基于檔案存儲的消息資料庫并且不依賴第三方資料庫。配置如下

<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>

1.4.2.   KahaDB 消息存儲—提供容量的提升和恢複能力

  它是一種新的消息存儲機制,配置如下

<kahaDB directory="${activemq.data}/kahadb" />

1.4.3.   JDBC 消息存儲—消息基于JDBC存儲

<persistenceAdapter>

<jdbcPersistenceAdapter  dataSource="#mysql-ds"/>

</persistenceAdapter>

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"

                            destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/>

                            <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>

                            <property name="username" value="activemq"/> <property name="password" value="activemq"/>

                            <property name="maxActive" value="200"/> <property name="poolPreparedStatements"

                            value="true"/>

                   </bean>

1.4.4.   Memory 消息存儲—基于内容的消息存儲

  ActiveMQ支援将消息儲存到記憶體中,這種情況沒有動态的緩存存在。

  這種情況的配置很簡單,隻要将Broker的“prsistent” 屬性設定為“false”即可。

1.   ActiveMQ負載均衡

ActiveMQ可以實作多個mq之間進行路由,假設有兩個mq,分别為brokerA和brokerB,當有一條消息發送到brokerA的隊列 test中,有一個用戶端連接配接到brokerB上,并且要求擷取test隊列的消息時,brokerA中隊列test的消息就會路由到brokerB上, 反之brokerB的消息也會路由到brokerA。分靜态和動态兩種配置方法,見《6  activemq的網絡連接配接》。下面給出動态配置:

<networkConnectors>

            <networkConnector uri="multicast://default" />

</networkConnectors>

 <transportConnectors>

            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

            <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />

        </transportConnectors>

2.   ActiveMQ主從配置

Master-Slave模式分為三類:PureMaster Slave、Shared File System Master Slave和JDBC Master Slave。以上三種方式的叢集都不支援負載均衡,但可以解決單點故障的問題,以保證消息服務的可靠性。

2.1.  PureMaster Slave

    需要兩個Broker,一個作為Master,另一個作為Slave,運作時,Slave通過網絡實時從Master處複制資料,同時,如果Slave和 Master失去連接配接,Slave就會自動更新為Master,繼續為用戶端提供消息服務,這種方式的Slave隻能有一個。模型如圖所示:

這種方式的主備不需要對Master Broker做特殊的配置,隻要在Slave Broker中指定他的Master就可以了,指定Master有兩種方式,最簡單的配置就是在broker節點中添加masterConnectorURI=”tcp://localhost:61616″即可,還有一種方式就是添加一個services節點,可以指定 連接配接的使用者名和密碼,配置如下:

<services>

  <masterConnector remoteURI= "tcp://localhost:61616" userName="system" password="manager"/>

</services>

2.2.  SharedFile System Master Slave

        SharedFile System Master Slave就是利用共享檔案系統做ActiveMQ叢集,是基于ActiveMQ的預設資料庫kahaDB完成的,kahaDB的底層是檔案系統。這種方 式的叢集,Slave的個數沒有限制,哪個ActiveMQ執行個體先擷取共享檔案的鎖,那個執行個體就是Master,其它的ActiveMQ執行個體就是 Slave,當目前的Master失效,其它的Slave就會去競争共享檔案鎖,誰競争到了誰就是Master。這種模式的好處就是當Master失效時 不用手動去配置,隻要有足夠多的Slave。如果各個ActiveMQ執行個體需要運作在不同的機器,就需要用到分布式檔案系統了。模式如圖所示:

2.3.  JDBCMaster Slave

        JDBCMaster Slave模式和Shared File Sysytem Master Slave模式的原理是一樣的,隻是把共享檔案系統換成了共享資料庫。我們隻需在所有的ActiveMQ的主配置檔案中添加資料源,所有的資料源都指向同 一個資料庫,然後修改持久化擴充卡。這種方式的叢集相對Shared File System Master Slave更加簡單,更加容易地進行分布式部署,但是如果資料庫失效,那麼所有的ActiveMQ執行個體都将失效。

3.   ActiveMQ攔截器使用

在ActiveMQ中使用攔截器和過濾器的使用多采用插件的形式實作,繼承BrokerFilter實作BrokerPlugin接口類。BrokerFilter實質一個實作Broker接口的類。

3.1.  日志攔截

日志攔截器是Broker的一個攔截器,預設的日志級别為INFO。你如你想改變日志的級别。這個日志攔截器支援Commons-log和Log4j兩種日志。

<plugins>

      <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>

    </plugins>

部分參數如下:

屬性名稱 預設值 描述
logAll false 記錄所有事件的日志
logMessageEvents false 記錄消息事件日志
logConnectionEvents True 記錄連接配接事件日志
logTransactionEvents false 記錄消息事務事件日志
logConsumerEvents false 記錄消息消費者事件日志
logProducerEvents false 記錄消息生産者事件日志
logInternalEvents false

3.2.   統計攔截器

StatisticsPlugin插件被用作檢測Broker中統計的插件。注意消息必須包含replyTo的消息頭, 如果是在JMS那麼需要采用jmsReplyTo消息頭,否則消息将被統計忽略。ReplyTo消息頭包含了你想檢查統計的消息。統計消息是一個 MapMessage.

      檢查Broker的資訊,僅僅需要一個名稱為ActiveMQ.Statistics.Broker并且有一個replyTo的消息頭的Destination。為了檢測所有destination,你需要一個名稱為ActiveMQ.Statistics.Destination.<destination-name>或者 ActiveMQ.Statistics.Destination.<wildcard-expression>并且有一個replyTo的 消息頭。如果許多Destination比對相關的模糊比對表達式,那麼統計的消息将被發送到replyTo的Destination.

<plugins>

      <statisticsBrokerPlugin/>

</plugins>

4.   ActiveMQ安全配置

ActiveMQ也可以對各個主題和隊列設定使用者名和密碼,配置如下:

<plugins>

  <!-- Configure authentication; Username, passwords and groups -->

  <simpleAuthenticationPlugin>

      <users>

          <authenticationUser username="system" password="manager" groups="users,admins"/>

          <authenticationUser username="user" password="password" groups="users"/>

          <authenticationUser username="guest" password="password" groups="guests"/>

          <authenticationUser username="testUser" password="123456" groups="testGroup"/>

      </users>

  </simpleAuthenticationPlugin>

  <!--  Lets configure a destination based authorization mechanism -->

  <authorizationPlugin>

    <map>

      <authorizationMap>

        <authorizationEntries>

          <authorizationEntry queue="queue.group.uum" read="users" write="users" admin="users" />

          <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />

          <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />

          <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />

          <authorizationEntry queue="TEST.Q" read="guests" write="guests" />

          <authorizationEntry queue="test" read=" testGroup " write=" testGroup " />

          <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />

          <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />

          <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />

          <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users ,testGroup" write="guests,users ,testGroup " admin="guests,users ,testGroup "/>

        </authorizationEntries>

      </authorizationMap>

    </map>

  </authorizationPlugin>

</plugins>

5.   ActiveMQ中NetWorkConnctor屬性

    <networkConnector name="bridge" uri="static://(tcp://host1:61616)"

                duplex="true" conduitSubscriptions="true"

                decreaseNetworkConsumerPriority="false" >

            </networkConnector>

屬性名稱 預設值 描述
Duplex True 表示雙向可以通信
ConduitSubscriptions False 表示每個Consumer 上都會收到所有的發送的消息
DynamicOnly false 消息将被動态的轉接的在其他Broker的consumer上
PrefetchSize 1000 指定消息的數量
ConduitSubscriptons true
excludedDestinations 指定排除的位址
DynamiccallyincludedDestinations 包括的位址
StaticcallyincludedDestinations 靜态的包括消息位址
DecreaseNetwordConsumerPriority false 消費者優先權
NetworkTTl 1

6.   ActiveMQ消息遊标

當producer發送的持久化消息到達broker 之後,broker 首先會把它儲存在持

久存儲中。接下來,如果發現目前有活躍的 consumer,而且這個consumer 消費消息的速度能跟上producer 生産消息的速度,那麼ActiveMQ 會直接把消息傳遞給 broker 内部跟這個consumer關聯的 dispatch(派遣、排程) queue;如果目前沒有活躍的 consumer或者 consumer 消費消息的速度跟不上producer 生産消息的速度,那麼 ActiveMQ 會使用Pending Message Cursors 儲存對消息的引用。在需要的時候,PendingMessage Cursors 把消息引用傳遞給broker 内部跟這個consumer關聯的 dispatch queue。以下是兩種Pending (未解決)Message Cursors:

   · VM Cursor。在記憶體中儲存消息的引用。

   · File Cursor。首先在記憶體中儲存消息的引用,如果記憶體使用量達到上限, 那麼會把消息引用儲存到臨時檔案中。

對于 topic,可以使用的pendingSubscriberPolicy 有vmCursor 和 fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy 有 vmDurableCursor 和fileDurableSubscriberCursor。以下是ActiveMQ 配置檔案 的一個例子:

Xml 代碼

<destinationPolicy>

        <policyMap>

        <policyEntries>

                  <policyEntry     topic="org.apache.>">

                      <pendingSubscriberPolicy>

                          <vmCursor  />

                        </pendingSubscriberPolicy>

                       <PendingDurableSubscriberMessageStoragePolicy>

                           <vmDurableCursor/>

                       </PendingDurableSubscriberMessageStoragePolicy>

                 </policyEntry>

           </policyEntries>

        </policyMap>

    </destinationPolicy>

  對于 queue,可以使用的pendingQueuePolicy 有vmQueueCursor 和 fileQueueCursor。以下是ActiveMQ 配置檔案的一個例子:

<destinationPolicy>

        <policyMap>

             <policyEntries>

                  <policyEntry     queue="org.apache.>">

                       <pendingQueuePolicy>

                            <vmQueueCursor  />

                       </pendingQueuePolicy>

                 </policyEntry>

            </policyEntries>

         </policyMap>

    </destinationPolicy>

7.   ActiveMQ嚴格排程政策

有時候需要保證不同的 topic consumer 以相同的順序接收消息。通常

ActiveMQ 會保證 topic consumer 以相同的順序接收來自同一個producer 的消息。

然而,由于多線程和異步處理,不同的 topic consumer可能會以不同的順序接收

來自不同producer 的消息。例如有兩個producer,分别是P 和Q。差不多是同一

時間内,P 發送了P1、P2 和P3 三個消息;Q 發送了Q1 和Q2 兩個消息。兩個不同

的consumer可能會以以下順序接收到消息:

        consumer1: P1 P2 Q1 P3 Q2

        consumer2: P1 Q1 Q2 P2 P3

        Strict order dispatch policy 會保證每個 topic consumer會以相同的

順序接收消息,代價是性能上的損失。以下是采用了strict order dispatch policy

後,兩個不同的 consumer可能以以下的順序接收消息:

        consumer1: P1 P2 Q1 P3 Q2

        consumer2: P1 P2 Q1 P3 Q2

      以下是ActiveMQ 配置檔案的一個例子:

   <destinationPolicy>

         <policyMap>

             <policyEntries>

                <policyEntry   topic="FOO.>">

                     <dispatchPolicy>

                        <strictOrderDispatchPolicy  />

                      </dispatchPolicy>

                  </policyEntry>

             </policyEntries>

        </policyMap>

   </destinationPolicy>

8.   ActiveMQ輪轉排程政策

介紹過 ActiveMQ 的prefetch(預讀取)機制,ActiveMQ 的預設參數是針對處理大量消息時的高性能和高吞吐量而設定的。是以預設的prefetch參數比較大,而且預設

的dispatchpolicies 會嘗試盡可能快的填滿 prefetch緩沖。然而在有些情況下,

例如隻有少量的消息而且單個消息的處理時間比較長,那麼在預設的 prefetch和

dispatch policies下,這些少量的消息總是傾向于被分發到個别的consumer 上。

這樣就會因為負載的不均衡配置設定而導緻處理時間的增加。

       Round robin dispatch policy 會嘗試平均分發消息,以下是 ActiveMQ配

置檔案的一個例子:

<destinationPolicy>     

      <policyMap>     

         <policyEntries>     

             <policyEntry  topic="FOO.>">     

                  <dispatchPolicy>     

                     <roundRobinDispatchPolicy  />     

                </dispatchPolicy>     

             </policyEntry>     

          </policyEntries>     

   </policyMap>     

</destinationPolicy>

9.   ActiveMQ Async Sends  

Acivemq 支援異步和同步發送消息。在 ActiveMQ4.0 以上,所有的異步或同步對

于Consumer 來說是變得可配置了。預設是在 ConnectionFactory、Connection、

Connection URI等方面配置對于一個基于Destination  的Consumer來說。

衆所周之,如果你想傳遞給 Slow Consumer 那麼你可能使用異步的消息傳遞,但是對于 FastConsumer 你可能使用同步發送消息。(這樣可以避免同步和上下文切換額外的增加Queue 堵塞花費。如果對于一個 Slow Consumer,你使用同步發送消息可能出現Producer 堵塞等顯現。

ActiveMQ預設設定 dispatcheAsync=true是最好的性能設定。如果你處理的是

Slow Consumer 則使用dispatcheAsync=true,反之,那你使用的是 Fast Consumer則使用dispatcheAsync=false。

用ConnectionURI 來配置Async如下:

ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

用ConnectionFactory配置Async如下:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

用Connection配置Async 如下:

((ActiveMQConnection)connection).setUseAsyncSend(true);

10.        ActiveMQ預設支援批量确認消息

ActiveMQ預設支援批量确認消息。由于批量确認會提高性能,是以這是預設的确認方式。如果希望在應用程式中禁止經過優化的确認方式,那麼可以采用如下方法:

cf = new ActiveMQConnectionFactory ("tcp://locahost:61616?jms.optimizeAcknowledge=false"); 

((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(false);   ((ActiveMQConnection)connection).setOptimizeAcknowledge(false); 

11.        ActiveMQ消息類型

(一)Blob Message

(二)Advisory Message

(三)ActiveMQ Stream

(四)Transformer message

(五)TextMessage

(六)MapMessage

(七)BytesMessage

(八)StreamMessage

(九)ObjectMessage

(十)Message

12.        ActiveMQ  destination

12.1.     ActiveMQ的混合發送模式

允許一個虛拟的destination代表多個destinations,多個destination之間用“,”分割。

Java代碼:

Queue queue = new ActiveMQQueue("USERS.First,USERS.Sconder");

如果需要不同類型的destination,需要加上字首queue:// 或topic://

Queue queue = new ActiveMQQueue("USERS.First,USERS.Sconder,topic://USERS.topic1");

配置如下:

<amq:destinationInterceptors>

          <amq:virtualDestinationInterceptor>

          <amq:virtualDestinations>

          <amq:compositeQueue name="MY.QUEUE.A">

           <amq:forwardTo>

             <amq:queue physicalName="MY.QUEUE.B"></amq:queue>

             <amq:topic physicalName="MY.TOPIC.A"></amq:topic>

           </amq:forwardTo>

          </amq:compositeQueue>

          <amq:virtualTopic/>

          </amq:virtualDestinations>

          </amq:virtualDestinationInterceptor>

        </amq:destinationInterceptors>

12.2.     ActiveMQ的接收Mirrored模式

每個 queue 中的消息隻能被一個consumer 消費。然而,有時候你可能希望能

夠監視生産者和消費者之間的消息流。你可以通過使用Virtual Destinations 來

建立一個virtual queue 來把消息轉發到多個 queues 中。但是 為系統中每個

queue 都進行如此的配置可能會很麻煩。

        ActiveMQ 支援Mirrored Queues。Broker 會把發送到某個 queue 的所有消

息轉發到一個名稱類似的topic,是以監控程式可以訂閱這個mirrored queue

topic。為了啟用Mirrored Queues,首先要将BrokerService 的useMirroredQueues

屬性設定成 true,然後可以通過destinationInterceptors 設定其它屬性,如

mirror topic 的字首,預設是"VirtualTopic.Mirror."。以下是ActiveMQ 配置文

件的一個例子:

<broker xmlns="http://activemq.apache.org/schema/core"

       brokerName="localhost" dataDirectory="${activemq.data}"

       useMirroredQueues="true">

       <amq:destinationInterceptors>

            <amq:mirroredQueue copyMessage="true" postfix="Mirror.Topic">

            </amq:mirroredQueue>

       </amq:destinationInterceptors>

12.3.     ActiveMQ的接收Wildcards模式

Wildcards 用來支援聯合的名字分層體系(federated namehierarchies)。它不是JMS 規範的一部分,而是ActiveMQ的擴充。ActiveMQ 支援以下三種wildcards:

   · "." 用于作為路徑上名字間的分隔符。

   · "*" 用于比對路徑上的任何名字。

   · ">" 用于遞歸地比對任何以這個名字開始的 destination。

12.4.     ActiveMQ虛拟主題

13.        ActiveMQ 消費者特性

13.1.     獨有消費者或者獨有隊列

Queue 中的消息是按照順序被分發到 consumers 的。然而,當你有多個consumers同時從相同的 queue中提取消息時,你将失去這個保證。因為這些消息是被多個線程并發的處理。有的時候,保證消息按照順序處理是很重要的。Broker會從多個 consumers中挑選一個 consumer來處理 queue中所有的消息,進而保證了消息的有序處理。如果這個 consumer失效,那麼 broker會自動切換到其它的 consumer。 可以通過 Destination Options 來建立一個 Exclusive Consumer:

queue  =  new  ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");     

consumer  =  session.createConsumer(queue);

如果存在 Exclusive Consumer 和普通的Consumer,那麼 Broker會首先把消息發送給Exclusive Consumer。除非該獨有消費者死亡。

13.2.     Message Group

Message Groups 可以看成是一種并發的Exclusive Consumer。跟所有的消息都由唯一的 consumer 處理不同,JMS 消息屬性JMSXGroupID 被用來區分 message group。Message Groups 特性保證所有具有相同JMSXGroupID 的消息會被分發到相同的 consumer(隻要這個consumer 保持active)。

在一個消息被分發到 consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那麼 broker 會檢查是否有某個 consumer擁有這個 message group。如果沒有,那麼 broker會選擇一個consumer,并将它關聯到這個 message group。此後,這個 consumer 會接收這個message group 的所有消息,直到:

 Consumer 被關閉;

 Message group 被關閉。通過發送一個消息,并設定這個消息的JMSXGroupSeq 為-1。

開啟Message Group:

TextMessage message = session.createTextMessage("ActiveMq 發送的消息");

message.setStringProperty("JMSXFroupID", "TEST_GROUP_A");

關閉Message Group:

TextMessage message = session.createTextMessage("ActiveMq 發送的消息");

message.setStringProperty("JMSXFroupID", "TEST_GROUP_A");

message.setIntProperty("JMSXGroupSeq", -1);

13.3.     Message Slelectors

JMS Selectors 用于在訂閱中,基于消息屬性和Xpath 文法對進行消息的過濾。

consumer  =  session.createConsumer(destination,  "JMSType  =  'car'  AND  weight  >  2500");

在JMSSelectors 表達式中,可以使用 IN、NOT IN、LIKE 等,例如:

       LIKE '12%3' ('123' true,'12993' true,'1234' false)

       LIKE 'l_se' ('lose' true,'loose' false)

       LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)

       需要注意的是,JMS Selectors表達式中的日期和時間需要使用标準的long型毫秒值。另外表達式中的屬性不會自動進行類型轉換,例如:

myMessage.setStringProperty("NumberOfOrders", "2");    

"NumberOfOrders > 1" 求值結果是false。

13.4.     消息的重新傳遞和死信隊列

ActiveMQ需要重新傳遞消息需要 Client 有以下幾種操作:

1.  Client 用了transactions 和調用了rollback()在session 中。

2.  Client 用了transactiions 和在調用commit()之前關閉。

3.  Client 在CLIENT_ACKNOWLEDGE 的傳遞模式下在 session 中調用了

recover()。

隻有最後一個事物送出後,消息才能發送到 broker 上,事物沒有送出前,整

個傳遞消息仍處于事物中。一旦復原,恢複以前情況。在 broker 端不知道消息是

否處于重新傳遞狀态,這将會造成消息分發開銷。

預設,aciaveMQ 中死隊列被聲明為“ActivemMQ.DLQ”,所有不能消費的消

息被傳遞到該死隊列中。你可以在 acivemq.xml中配置individualDeadLetterStrategy屬性

   <destinationPolicy>

     <policyMap>

       <policyEntries>

         <policyEntry queue= "> " >

           <deadLetterStrategy>

             <individualDeadLetterStrategy

               queuePrefix= "DLQ."  useQueueForQueueMessages= "true" />

           </deadLetterStrategy>

         </policyEntry>

       </policyEntries>

     </policyMap>

</destinationPolicy>

有時需要直接删除過期的消息而不需要發送到死隊列中,xml 可以使用屬性

processExpired=false 來設定

<destinationPolicy>

    <policyMap>

      <policyEntries>

        <policyEntry queue= "> " >

          <deadLetterStrategy>

            <sharedDeadLetterStrategy processExpired= "false" />

          </deadLetterStrategy>

        </policyEntry>

      </policyEntries>

    </policyMap>

   </destinationPolicy>

存放非持久消息到死隊列中

預設情況下,Activemq 不會把非持久的死消息發送到死隊列中。

 非持久性如果你想把非持久的消息發送到死隊列中,需要設定屬性

processNonPersistent=“true”

<destinationPolicy>

    <policyMap>

      <policyEntries>

        <policyEntry queue= "> " >

          <deadLetterStrategy>

            <sharedDeadLetterStrategy processNonPersistent= "true" />

          </deadLetterStrategy>

        </policyEntry>

      </policyEntries>

    </policyMap>

   </destinationPolicy>

13.5.     消息優先級值

JMS JMSPriority  定義了十個消息優先級值,0  是最低的優先級, 9  是最高的優先級。另外,用戶端應當将0‐4  看作普通優先級,5‐9  看作加急優先級.

配置如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");

consumer = session.createConsumer(queue);

13.6.     慢消息處理機制

目前ActiveMQ 使用 Pending Message Limit Strategy來解決慢消息帶來的性能問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限後,新消息到來時會丢棄舊消息。通過在配置檔案的 destination map 中配置PendingMessageLimitStrategy,可以為不用的 topic namespace 配置不同的政策。

A:Pending Message Limit Strategy(等待消息限制政策)目前有以下兩種:

  1. ConstantPendingMessageLimitStrategy

Limit 可以設定 0、>0、-1三種方式:

0表示:不額外的增加其預存大小。

>0表示:在額外的增加其預存大小。

-1表示:不增加預存也不丢棄舊的消息。

這個政策使用常量限制:

<constantPendingMessageLimitStrategy limit="50"/>
  1. PrefetchRatePendingMessageLimitStrategy

這種政策是利用Consumer 的之前的預存的大小乘以其倍數等于現在的預存大小。

<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

在以上兩種方式中,如果設定 0 意味着除了 prefetch 之外不再緩存消息;如果設定-1

意味着禁止丢棄消息。

此外,你還可以配置消息的丢棄政策,目前有以下兩種: 

oldestMessageEvictionStrategy。這個政策丢棄最舊的消息。 

 oldestMessageWithLowestPriorityEvictionStrategy。這個政策丢棄最舊的,

而且具有最低優先級的消息。 

以下是個ActiveMQ配置檔案的例子:

<destinationPolicy> 

 <policyMap> 

 <policyEntries> 

 <policyEntry topic="PRICES.>"> 

<subscriptionRecoveryPolicy> 

<timedSubscriptionRecoveryPolicy recoverDuration="10000" /> 

</subscriptionRecoveryPolicy> 

<pendingMessageLimitStrategy> 

<constantPendingMessageLimitStrategy limit="10"/> 

</pendingMessageLimitStrategy> 

 </policyEntry> 

</policyEntries> 

</policyMap> 

</destinationPolicy>

13.7.     消費者追溯消息

ActiveMQ支援6種恢複政策,可以自行選擇使用不同的政策

(一)  <fixedCountSubscriptionRecoveryPolicy>

這種政策限制在基于一個靜态的計數中對于主題(Topic)消息緩存的數量。

(二)  <fixedSizedSubscriptionRecoveryPolicy>

這種政策限制在記憶體使用量中對于主題(Topic)消息緩存的數量。這是

ActiveMQ 的預設持久恢複政策。你可以選擇設定 cache的大小來應用與所

有的主題[Topic]。

(三)  <lastImageSubscriptionRecoveryPolicy>

這種政策僅僅保持發送到主題(Topic)的最後一個消息。

(四)  <noSubscriptionRecoveryPolicy>

這種政策是不儲存主題消息,不需要任何配置

(五)  <queryBasedSubscriptionRecoveryPolicy>

這種政策基于一個 JMS屬性選擇器應用到所有的消息來設定其消息緩存的

大小

(六)  <timedSubscriptionRecoveryPolicy>

這種政策是基于應用到每個消息的過期時間來限制其消息緩存數量。提示

這種消息的生命周期時間來源于消息發送者設定其 timeToLive 參數

轉自:http://www.cnblogs.com/tomcattd/p/3198099.html