消息的持久化
ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB。發送者将消息發送出去後,消息中心首先将消息存儲到本地資料檔案、記憶體資料庫或者遠端資料庫等。再試圖将消息發給接收者,成功則将消息從存儲中删除,失敗則繼續嘗試嘗試發送。消息中心啟動以後,要先檢查指定的存儲位置是否有未成功發送的消息,如果有,則會先把存儲位置中的消息發出去。
ActiveMQ的消息持久化機制
(1)AMQ Message Store
基于檔案的存儲機制,是以前的預設機制,現在不再使用。AMQ是一種檔案存儲形式,它具有寫入速度快和容易恢複的特點。消息存儲再一個個檔案中檔案的預設大小為32M,當一個檔案中的消息已經全部被消費,那麼這個檔案将被辨別為可删除,在下一個清除階段,這個檔案被删除。AMQ适用于ActiveMQ5.3之前的版本
(2)kahaDB
基于日志檔案,從ActiveMQ5.4(含)開始預設的持久化插件。
配置方式,在activemq.xml中進行配置
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>
directory : 指定持久化消息的存儲目錄
journalMaxFileLength : 指定儲存消息的日志檔案大小,具體根據你的實際應用配置
特點:
1、日志形式存儲消息;
2、消息索引以B-Tree結構存儲,可以快速更新;
3、完全支援JMS事務;
4、支援多種恢複機制;
原理
kahadb在消息儲存目錄中有4類檔案和一個lock。
db-<Number>.log:KahaDB存儲消息到預定義大小的資料記錄檔案中,檔案名為db-<Number>.log。目前檔案已滿時,一個新的檔案會随之建立,number的數值随之遞增。如每32M一個檔案,檔案名按照數字進行編号。當不再有引用到的資料檔案中的任何消息時,檔案會被删除或歸檔,由後續的清理機制來清除檔案。
db.data:包含了持久化的BTree索引,索引了消息資料記錄中的消息,它是消息的索引檔案,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-<Number>.log裡存儲的消息。
db.free:目前db.data檔案裡,哪些頁面是空閑的,檔案具體内容是所有空閑頁的ID,友善後續建索引的時候,先從空閑頁開始建立,保證索引的連續性,沒有碎片。
db.redo:用來進行消息恢複,如果KahaDB消息存儲在強制退出後啟動,用于恢複B-Tree索引。
lock:檔案鎖,表示目前獲得KahaDB讀寫權限的Broker。
(3)JDBC消息存儲
使用JDBC持久化方式,資料庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用于存儲消息,Queue和Topic都存儲在這個表中。
如何使用:
1.添加mysql資料庫的驅動包(mysql-connector-java-5.1.38.jar、commons-pool.jar、commons-dbcp.jar和commons-collections.jar到lib檔案夾
2 配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml檔案,
首先定義一個mysql-ds的MySQL資料源,然後在persistenceAdapter節點中配置jdbcPersistenceAdapter并且引用剛才定義的資料源。
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>
3.dataSource指定持久化資料庫的bean,createTablesOnStartup是否在啟動的時候建立資料表,預設值是true,這樣每次啟動都會去建立資料表了,一般是第一次啟動的時候設定為true,之後改成false。
使用MySQL配置JDBC持久化:
<beans>
<broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
資料庫表資訊
activemq_msgs用于存儲消息,Queue和Topic都存儲在這個表中:
ID:自增的資料庫主鍵
CONTAINER:消息的Destination
MSGID_PROD:消息發送者用戶端的主鍵
MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID
EXPIRATION:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數
MSG:消息本體的Java序列化對象的二進制資料
PRIORITY:優先級,從0-9,數值越大優先級越高
activemq_acks用于存儲訂閱關系。如果是持久化Topic,訂閱者和伺服器的訂閱關系在這個表儲存:
主要的資料庫字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static叢集,這個字段會有叢集其他系統的資訊
CLIENT_ID:每個訂閱者都必須有一個唯一的用戶端ID用以區分
SUB_NAME:訂閱者名稱
SELECTOR:選擇器,可以選擇隻消費滿足條件的消息。條件可以用自定義屬性實作,可支援多屬性AND和OR操作
LAST_ACKED_ID:記錄消費過的消息的ID。
表activemq_lock在叢集環境中才有用,隻有一個Broker可以獲得消息,稱為Master Broker,
其他的隻能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。
這個表用于記錄哪個Broker是目前的Master Broker。
重新啟動AcitiveMQ.
JDBC Message Store with ActiveMQ Journal
這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫讀庫。ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。當消費者的速度能夠及時跟上生産者消息的生産速度時,journal檔案能夠大大減少需要寫入到DB中的消息。
舉個例子:生産者生産了1000條消息,這1000條消息會儲存到journal檔案,如果消費者的消費速度很快的情況下,在journal檔案還沒有同步到DB之前,消費者已經消費了90%的以上消息,那麼這個時候隻需要同步剩餘的10%的消息到DB。如果消費者的速度很慢,這個時候journal檔案可以使消息以批量方式寫到DB。
為了高性能,這種方式使用日志檔案存儲+資料庫存儲。先将消息持久到日志檔案,等待一段時間再将未消費的消息持久到資料庫。該方式要比JDBC性能要高。
下面是基于上面JDBC配置,再做一點修改:
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"/>
</persistenceFactory>
(4)LevelDB消息存儲
從ActiveMQ 5.6版本之後,又推出了LevelDB的持久化引擎。
目前預設的持久化方式仍然是KahaDB,不過LevelDB持久化性能高于KahaDB,可能是以後的趨勢。
在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的資料複制方式,用于Master-slave方式的首選資料複制方案。