天天看点

ActiveMQ集群搭建ActiveMQ集群搭建1.使用shared filesystem Master-Slave方式主从集群2.使用shared database Master-Slave方式主从集群3.使用Replicated LevelDB Store方式主从集群(常用)

ActiveMQ集群搭建

1.使用shared filesystem Master-Slave方式主从集群

通过共享存储目录(kahaDB)来实现master和slave的主从信息同步所有ActiveMQ的broker都在不断地获取共享目录的控制权,哪个broker抢到了控制权,它就成为master,它将锁定该目录,其他broker就只能成为slave。

当master主出现故障后,剩下的slave从将再进行争夺共享目录的控制权,谁抢到共享目录的控制权,谁就成为主,其他没有抢到控制权的称为从。

由于他们是基于共享目录,所以当主出现故障后,其上没有被消费的消息在接下来产生的新的master主中可以继续进行消费。

这种方式客户端访问的都是主,从只是起到了一个备份访问的作用

实现步骤:

(这里都是采用伪集群部署)

  1. 先复制3个MQ
    ActiveMQ集群搭建ActiveMQ集群搭建1.使用shared filesystem Master-Slave方式主从集群2.使用shared database Master-Slave方式主从集群3.使用Replicated LevelDB Store方式主从集群(常用)
  2. 配置每个activeMQ的conf /activemq.xml文件中的共享目录
<persistenceAdapter>
    <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
    <kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>
           

修改完持久化目录后,需要在/opt目录下创建该目录

ActiveMQ集群搭建ActiveMQ集群搭建1.使用shared filesystem Master-Slave方式主从集群2.使用shared database Master-Slave方式主从集群3.使用Replicated LevelDB Store方式主从集群(常用)

3. 配置每个activeMQ的conf /activemq.xml文件中的端口

因为是单机伪集群部署,为了避免端口号的冲突,前三个地址端口+1,后两个端口地址-1

第一个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
           
maximumConnections 最大连接数;
wireFormat.maxFrameSize 表示一个完整消息的最大数据量,单位byte;
0.0.0.0表示任意ip
           
  1. 修改conf/jetty.xml文件的jetty服务器端口(管理控制台)
第一个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>
           
  1. 启动三台ActiveMQ进行测试

2.使用shared database Master-Slave方式主从集群

该方式与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库。

实现步骤:

  1. 安装3个ActiveMQ
    ActiveMQ集群搭建ActiveMQ集群搭建1.使用shared filesystem Master-Slave方式主从集群2.使用shared database Master-Slave方式主从集群3.使用Replicated LevelDB Store方式主从集群(常用)
  2. 配置每个activeMQ的conf /activemq.xml文件中的持久化适配器是jdbc数据库方式
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
           
  1. 配置每个数据库连接池

    注意:连接池的配置需要配置在的外面

<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
</bean>
           
  1. 在每个ActiveMQ的lib目录下加入mysql的驱动包和数据库连接池Druid包
  2. 启动MySQL数据库,并创建activemq数据库
  3. 配置每个activeMQ的conf /activemq.xml文件中的端口

    为了避免端口号的冲突,前三个地址端口+1,后两个端口地址-1

第一个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
           
  1. 修改conf/jetty.xml文件的jetty服务器端口(管理控制台)
第一个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>
           
  1. 启动三台ActiveMQ,测试验证

3.使用Replicated LevelDB Store方式主从集群(常用)

基于可复制的LevelDB存储方式的集群,这种集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper从一组broker中协调选择一个broker作为master主,其他broker作为slave从的模式。所有slave从节点通过复制master主节点的消息来实现消息同步,当主出现故障后,没有被消费的消息在从服务器上也同步了一份,所以不会有消息的丢失。LevelDB 是 Google开发的一套用于持久化数据的高性能kv数据库,ActiveMQ利用该数据库进行数据的存储。只有master 接受客户端连接,slave不接受客户端连接,Master的所有存储操作都将被复制到slaves。在这个模式中,需要有半数以上的broker是正常的,集群才是可用的,超过半数broker故障,ZooKeeper的选举算法将不能选择master,从而导致集群不可用。

架构图:

ActiveMQ集群搭建ActiveMQ集群搭建1.使用shared filesystem Master-Slave方式主从集群2.使用shared database Master-Slave方式主从集群3.使用Replicated LevelDB Store方式主从集群(常用)

实现步骤:

  1. 安装3个ActiveMQ
    ActiveMQ集群搭建ActiveMQ集群搭建1.使用shared filesystem Master-Slave方式主从集群2.使用shared database Master-Slave方式主从集群3.使用Replicated LevelDB Store方式主从集群(常用)
  2. 配置每个activeMQ的conf /activemq.xml文件中的持久化适配器是replicatedLevelDB方式
<persistenceAdapter>
<replicatedLevelDB
    replicas="3"
    bind="tcp://0.0.0.0:0"
    zkAddress="localhost:2181"/>
</persistenceAdapter>
           

参数说明:

replicas :集群中存在的节点的数目

bind :当该节点成为master后,将使用该bind配置的ip和端口进行数据复制

zkAddress :ZooKeeper的地址

3. 启动ZooKeeper服务器

ActiveMQ集群搭建ActiveMQ集群搭建1.使用shared filesystem Master-Slave方式主从集群2.使用shared database Master-Slave方式主从集群3.使用Replicated LevelDB Store方式主从集群(常用)

4. 启动三台ActiveMQ,测试验证

注意: 这种方式,不适合集群太大,也就是activemq不能太多,因为多个activemq之间需要复制消息,这个比较耗资源,占用网络,建议3、5台