天天看點

7 ActiveMQ的消息存儲以及持久化

消息的持久化

       ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB。發送者将消息發送出去後,消息中心首先将消息存儲到本地資料檔案、記憶體資料庫或者遠端資料庫等。再試圖将消息發給接收者,成功則将消息從存儲中删除,失敗則繼續嘗試嘗試發送。消息中心啟動以後,要先檢查指定的存儲位置是否有未成功發送的消息,如果有,則會先把存儲位置中的消息發出去。

ActiveMQ的消息持久化機制

(1)AMQ Message Store

       基于檔案的存儲機制,是以前的預設機制,現在不再使用。AMQ是一種檔案存儲形式,它具有寫入速度快和容易恢複的特點。消息存儲再一個個檔案中檔案的預設大小為32M,當一個檔案中的消息已經全部被消費,那麼這個檔案将被辨別為可删除,在下一個清除階段,這個檔案被删除。AMQ适用于ActiveMQ5.3之前的版本

(2)kahaDB

基于日志檔案,從ActiveMQ5.4(含)開始預設的持久化插件。

配置方式,在activemq.xml中進行配置

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>
           

directory : 指定持久化消息的存儲目錄

journalMaxFileLength : 指定儲存消息的日志檔案大小,具體根據你的實際應用配置

特點:

1、日志形式存儲消息;

2、消息索引以B-Tree結構存儲,可以快速更新;

3、完全支援JMS事務;

4、支援多種恢複機制;

原理

kahadb在消息儲存目錄中有4類檔案和一個lock。

db-<Number>.log:KahaDB存儲消息到預定義大小的資料記錄檔案中,檔案名為db-<Number>.log。目前檔案已滿時,一個新的檔案會随之建立,number的數值随之遞增。如每32M一個檔案,檔案名按照數字進行編号。當不再有引用到的資料檔案中的任何消息時,檔案會被删除或歸檔,由後續的清理機制來清除檔案。

db.data:包含了持久化的BTree索引,索引了消息資料記錄中的消息,它是消息的索引檔案,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-<Number>.log裡存儲的消息。

db.free:目前db.data檔案裡,哪些頁面是空閑的,檔案具體内容是所有空閑頁的ID,友善後續建索引的時候,先從空閑頁開始建立,保證索引的連續性,沒有碎片。

db.redo:用來進行消息恢複,如果KahaDB消息存儲在強制退出後啟動,用于恢複B-Tree索引。

lock:檔案鎖,表示目前獲得KahaDB讀寫權限的Broker。

(3)JDBC消息存儲

使用JDBC持久化方式,資料庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。

activemq_msgs用于存儲消息,Queue和Topic都存儲在這個表中。

如何使用:

1.添加mysql資料庫的驅動包(mysql-connector-java-5.1.38.jar、commons-pool.jar、commons-dbcp.jar和commons-collections.jar到lib檔案夾

2 配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml檔案,

首先定義一個mysql-ds的MySQL資料源,然後在persistenceAdapter節點中配置jdbcPersistenceAdapter并且引用剛才定義的資料源。

<persistenceAdapter>  
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />  
</persistenceAdapter>  
           

3.dataSource指定持久化資料庫的bean,createTablesOnStartup是否在啟動的時候建立資料表,預設值是true,這樣每次啟動都會去建立資料表了,一般是第一次啟動的時候設定為true,之後改成false。

使用MySQL配置JDBC持久化:

<beans>
    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> 
        </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
        <property name="username" value="activemq"/> 
        <property name="password" value="activemq"/> 
        <property name="maxActive" value="200"/> 
        <property name="poolPreparedStatements" value="true"/> 
    </bean>
</beans>
           

資料庫表資訊

activemq_msgs用于存儲消息,Queue和Topic都存儲在這個表中:

ID:自增的資料庫主鍵

CONTAINER:消息的Destination

MSGID_PROD:消息發送者用戶端的主鍵

MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID

EXPIRATION:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數

MSG:消息本體的Java序列化對象的二進制資料

PRIORITY:優先級,從0-9,數值越大優先級越高

activemq_acks用于存儲訂閱關系。如果是持久化Topic,訂閱者和伺服器的訂閱關系在這個表儲存:

主要的資料庫字段如下:

CONTAINER:消息的Destination

SUB_DEST:如果是使用Static叢集,這個字段會有叢集其他系統的資訊

CLIENT_ID:每個訂閱者都必須有一個唯一的用戶端ID用以區分

SUB_NAME:訂閱者名稱

SELECTOR:選擇器,可以選擇隻消費滿足條件的消息。條件可以用自定義屬性實作,可支援多屬性AND和OR操作

LAST_ACKED_ID:記錄消費過的消息的ID。

表activemq_lock在叢集環境中才有用,隻有一個Broker可以獲得消息,稱為Master Broker,

其他的隻能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。

這個表用于記錄哪個Broker是目前的Master Broker。

重新啟動AcitiveMQ.

JDBC Message Store with ActiveMQ Journal

這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫讀庫。ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。當消費者的速度能夠及時跟上生産者消息的生産速度時,journal檔案能夠大大減少需要寫入到DB中的消息。

舉個例子:生産者生産了1000條消息,這1000條消息會儲存到journal檔案,如果消費者的消費速度很快的情況下,在journal檔案還沒有同步到DB之前,消費者已經消費了90%的以上消息,那麼這個時候隻需要同步剩餘的10%的消息到DB。如果消費者的速度很慢,這個時候journal檔案可以使消息以批量方式寫到DB。

為了高性能,這種方式使用日志檔案存儲+資料庫存儲。先将消息持久到日志檔案,等待一段時間再将未消費的消息持久到資料庫。該方式要比JDBC性能要高。

下面是基于上面JDBC配置,再做一點修改:

<persistenceFactory>
    <journalPersistenceAdapterFactory 
        journalLogFiles="4"
        journalLogFileSize="32768"
        useJournal="true"
        useQuickJournal="true"
        dataSource="#mysql-ds"
        dataDirectory="activemq-data"/>         
</persistenceFactory>
           

(4)LevelDB消息存儲

從ActiveMQ 5.6版本之後,又推出了LevelDB的持久化引擎。

目前預設的持久化方式仍然是KahaDB,不過LevelDB持久化性能高于KahaDB,可能是以後的趨勢。

在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的資料複制方式,用于Master-slave方式的首選資料複制方案。