天天看点

7 ActiveMQ的消息存储以及持久化

消息的持久化

       ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB。发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

ActiveMQ的消息持久化机制

(1)AMQ Message Store

       基于文件的存储机制,是以前的默认机制,现在不再使用。AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

(2)kahaDB

基于日志文件,从ActiveMQ5.4(含)开始默认的持久化插件。

配置方式,在activemq.xml中进行配置

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>
           

directory : 指定持久化消息的存储目录

journalMaxFileLength : 指定保存消息的日志文件大小,具体根据你的实际应用配置

特点:

1、日志形式存储消息;

2、消息索引以B-Tree结构存储,可以快速更新;

3、完全支持JMS事务;

4、支持多种恢复机制;

原理

kahadb在消息保存目录中有4类文件和一个lock。

db-<Number>.log:KahaDB存储消息到预定义大小的数据记录文件中,文件名为db-<Number>.log。当前文件已满时,一个新的文件会随之创建,number的数值随之递增。如每32M一个文件,文件名按照数字进行编号。当不再有引用到的数据文件中的任何消息时,文件会被删除或归档,由后续的清理机制来清除文件。

db.data:包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-<Number>.log里存储的消息。

db.free:当前db.data文件里,哪些页面是空闲的,文件具体内容是所有空闲页的ID,方便后续建索引的时候,先从空闲页开始建立,保证索引的连续性,没有碎片。

db.redo:用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复B-Tree索引。

lock:文件锁,表示当前获得KahaDB读写权限的Broker。

(3)JDBC消息存储

使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。

activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。

如何使用:

1.添加mysql数据库的驱动包(mysql-connector-java-5.1.38.jar、commons-pool.jar、commons-dbcp.jar和commons-collections.jar到lib文件夹

2 配置持久化的方式,都是修改安装目录下conf/acticvemq.xml文件,

首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置jdbcPersistenceAdapter并且引用刚才定义的数据源。

<persistenceAdapter>  
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />  
</persistenceAdapter>  
           

3.dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

使用MySQL配置JDBC持久化:

<beans>
    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> 
        </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
        <property name="username" value="activemq"/> 
        <property name="password" value="activemq"/> 
        <property name="maxActive" value="200"/> 
        <property name="poolPreparedStatements" value="true"/> 
    </bean>
</beans>
           

数据库表信息

activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:

ID:自增的数据库主键

CONTAINER:消息的Destination

MSGID_PROD:消息发送者客户端的主键

MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID

EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数

MSG:消息本体的Java序列化对象的二进制数据

PRIORITY:优先级,从0-9,数值越大优先级越高

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

主要的数据库字段如下:

CONTAINER:消息的Destination

SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分

SUB_NAME:订阅者名称

SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作

LAST_ACKED_ID:记录消费过的消息的ID。

表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,

其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。

这个表用于记录哪个Broker是当前的Master Broker。

重新启动AcitiveMQ.

JDBC Message Store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。

举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。

下面是基于上面JDBC配置,再做一点修改:

<persistenceFactory>
    <journalPersistenceAdapterFactory 
        journalLogFiles="4"
        journalLogFileSize="32768"
        useJournal="true"
        useQuickJournal="true"
        dataSource="#mysql-ds"
        dataDirectory="activemq-data"/>         
</persistenceFactory>
           

(4)LevelDB消息存储

从ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎。

目前默认的持久化方式仍然是KahaDB,不过LevelDB持久化性能高于KahaDB,可能是以后的趋势。

在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。