ActiveMQ集群搭建
1.使用shared filesystem Master-Slave方式主从集群
通过共享存储目录(kahaDB)来实现master和slave的主从信息同步所有ActiveMQ的broker都在不断地获取共享目录的控制权,哪个broker抢到了控制权,它就成为master,它将锁定该目录,其他broker就只能成为slave。
当master主出现故障后,剩下的slave从将再进行争夺共享目录的控制权,谁抢到共享目录的控制权,谁就成为主,其他没有抢到控制权的称为从。
由于他们是基于共享目录,所以当主出现故障后,其上没有被消费的消息在接下来产生的新的master主中可以继续进行消费。
这种方式客户端访问的都是主,从只是起到了一个备份访问的作用
实现步骤:
(这里都是采用伪集群部署)
- 先复制3个MQ
- 配置每个activeMQ的conf /activemq.xml文件中的共享目录
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>
修改完持久化目录后,需要在/opt目录下创建该目录
3. 配置每个activeMQ的conf /activemq.xml文件中的端口
因为是单机伪集群部署,为了避免端口号的冲突,前三个地址端口+1,后两个端口地址-1
第一个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
maximumConnections 最大连接数;
wireFormat.maxFrameSize 表示一个完整消息的最大数据量,单位byte;
0.0.0.0表示任意ip
- 修改conf/jetty.xml文件的jetty服务器端口(管理控制台)
第一个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
- 启动三台ActiveMQ进行测试
2.使用shared database Master-Slave方式主从集群
该方式与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库。
实现步骤:
- 安装3个ActiveMQ
- 配置每个activeMQ的conf /activemq.xml文件中的持久化适配器是jdbc数据库方式
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
-
配置每个数据库连接池
注意:连接池的配置需要配置在的外面
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&characterEncoding=utf8&useSSL=false"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
</bean>
- 在每个ActiveMQ的lib目录下加入mysql的驱动包和数据库连接池Druid包
- 启动MySQL数据库,并创建activemq数据库
-
配置每个activeMQ的conf /activemq.xml文件中的端口
为了避免端口号的冲突,前三个地址端口+1,后两个端口地址-1
第一个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- 修改conf/jetty.xml文件的jetty服务器端口(管理控制台)
第一个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
- 启动三台ActiveMQ,测试验证
3.使用Replicated LevelDB Store方式主从集群(常用)
基于可复制的LevelDB存储方式的集群,这种集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper从一组broker中协调选择一个broker作为master主,其他broker作为slave从的模式。所有slave从节点通过复制master主节点的消息来实现消息同步,当主出现故障后,没有被消费的消息在从服务器上也同步了一份,所以不会有消息的丢失。LevelDB 是 Google开发的一套用于持久化数据的高性能kv数据库,ActiveMQ利用该数据库进行数据的存储。只有master 接受客户端连接,slave不接受客户端连接,Master的所有存储操作都将被复制到slaves。在这个模式中,需要有半数以上的broker是正常的,集群才是可用的,超过半数broker故障,ZooKeeper的选举算法将不能选择master,从而导致集群不可用。
架构图:
实现步骤:
- 安装3个ActiveMQ
- 配置每个activeMQ的conf /activemq.xml文件中的持久化适配器是replicatedLevelDB方式
<persistenceAdapter>
<replicatedLevelDB
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="localhost:2181"/>
</persistenceAdapter>
参数说明:
replicas :集群中存在的节点的数目
bind :当该节点成为master后,将使用该bind配置的ip和端口进行数据复制
zkAddress :ZooKeeper的地址
3. 启动ZooKeeper服务器
4. 启动三台ActiveMQ,测试验证
注意: 这种方式,不适合集群太大,也就是activemq不能太多,因为多个activemq之间需要复制消息,这个比较耗资源,占用网络,建议3、5台