天天看點

RocketMq消息隊列使用

最近在看消息隊列架構 ,alibaba的RocketMQ單機支援1萬以上的持久化隊列,支援諸多特性,

目前RocketMQ在阿裡集團被廣泛應用在訂單,交易,充值,流計算,消息推送,日志流式處理,binglog分發等場景

比kafka還是有過之無不及,其實kafka文檔很豐富

但RocketMQ網上的文章太少,找不到相關的操作教程

于是研究了下源碼 做個單機操作的教程,如果你也對此有興趣不妨共同研究

下載下傳源碼的位址 

https://github.com/alibaba/RocketMQ/releases

  • 首選通過在java項目裡面Maven依賴方式引用RocketMQ Java SDK
    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.2.6</version>
    </dependency>           

Downloads

在linux 下用wget 下載下傳源碼然後解壓出來

RocketMq消息隊列使用

在runserver.sh裡面可以配置 jvm啟動的參數 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  

可以 vi runserver.sh

RocketMq消息隊列使用

分别給 mqnamesrv mqbroker play.sh 執行的權限

chmod +x  mqnamersrv 

chmod +x  mqbroker 

chmod +x  play.sh 

下面紅線框的這段 指令輸入錯誤了,忽略不用看

RocketMq消息隊列使用

通過 nohup sh mqnamesrv& 啟動 RocketMq

目前沒看到結束的指令,也沒找到相關的介紹,

我這裡用的 ps -ef|grep rocketmq  查到程序pid

然後kill pid号

或則pkill -9 java [慎用]

用jps -v 檢視下java程序的參數

RocketMq消息隊列使用

 rocketmq啟動後監聽 9876端口,這裡還是在看源碼裡面看到的,資料實在是太少了

RocketMq消息隊列使用

在防火牆配置裡面加上 9876端口,設定iptables對外開放

RocketMq消息隊列使用

部署Broker 

nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties & 

這裡ip換成本機的就是單機執行個體,如果配置主從 這裡可以配其他的ip

 Master和Slave的配置檔案參考conf目錄下的配置檔案

 Master與Slave通過指定相同的brokerName參數來配對,Master的BrokerId必須是0,Slave的BrokerId必須是大于0的數

 一個Master下面可以挂載多個Slave,同一Master下的多個Slave通過指定不同的BrokerId來區分

 部署一Master一Slave,叢集采用異步複制方式:

 Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &  

Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &  

RocketMq消息隊列使用

package

com.pgsqlmybatis.common.rocketmq;

/*

***************************************************************

* 公司名稱    :

* 系統名稱    :信用管家專業版

* 類 名 稱    :Ios管道idfa統計,推廣統計用

* 功能描述    :

* 業務描述    :

* 作 者 名    :@Author Royal

* 開發日期    :2016-05-15

* Created     :IntelliJ IDEA

***************************************************************

* 修改日期    :

* 修 改 者    :

* 修改内容    :

***************************************************************

*/

import

com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import

com.alibaba.rocketmq.client.producer.SendResult;

import

com.alibaba.rocketmq.common.message.Message;

public

class

Producer {

public

static

void

main(String[] args) {

DefaultMQProducer producer = 

new

DefaultMQProducer(

"Producer"

);

producer.setNamesrvAddr(

"xxxxxxxxxx:9876"

);

try

{

producer.start();

String pushMsg=

"kafka activeMq rocketMq 消息隊列使用1"

;

Message msg = 

new

Message(

"PushTopic"

,

"push"

,

"1"

,

pushMsg.getBytes(

"UTF-8"

));

SendResult result = producer.send(msg);

System.out.println(

"id:"

+ result.getMsgId() +

" result:"

+ result.getSendStatus());

String pushMsg2=

"海量級消息記錄單機測試2"

;

msg = 

new

Message(

"PushTopic"

,

"push"

,

"2"

,pushMsg2.getBytes(

"UTF-8"

));

result = producer.send(msg);

System.out.println(

"id:"

+ result.getMsgId() +

" result:"

+ result.getSendStatus());

String pushMsg3=

"海量級消息記錄單機測試3"

;

msg = 

new

Message(

"PullTopic"

,

"pull"

,

"1"

,pushMsg3.getBytes());

result = producer.send(msg);

System.out.println(

"id:"

+ result.getMsgId() +

" result:"

+ result.getSendStatus());

catch

(Exception e) {

e.printStackTrace();

finally

{

producer.shutdown();

}

}

}

  

啟動生成者

RocketMq消息隊列使用

啟動消費者

package

com.pgsqlmybatis.common.rocketmq;

/*

***************************************************************

* 公司名稱    :

* 系統名稱    :信用管家專業版

* 類 名 稱    :Ios管道idfa統計,推廣統計用

* 功能描述    :

* 業務描述    :

* 作 者 名    :@Author Royal

* 開發日期    :2016-05-15

* Created     :IntelliJ IDEA

***************************************************************

* 修改日期    :

* 修 改 者    :

* 修改内容    :

***************************************************************

*/

import

java.io.UnsupportedEncodingException;

import

java.util.List;

import

com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import

com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import

com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import

com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import

com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import

com.alibaba.rocketmq.common.message.Message;

import

com.alibaba.rocketmq.common.message.MessageExt;

public

class

Consumer {

public

static

void

main(String[] args){

DefaultMQPushConsumer consumer =

new

DefaultMQPushConsumer(

"PushConsumer"

);

consumer.setNamesrvAddr(

"xxxxxxxxxxxx:9876"

);

try

{

consumer.subscribe(

"PushTopic"

"push"

);

/**

* 設定Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>

* 如果非第一次啟動,那麼按照上次消費的位置繼續消費

*/

consumer.setConsumeFromWhere(

ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.registerMessageListener(

new

MessageListenerConcurrently() {

public

ConsumeConcurrentlyStatus consumeMessage(

List<MessageExt> list,

ConsumeConcurrentlyContext Context) {

Message msg = list.get(

);

System.out.println(msg.toString());

String recString= 

null

;

try

{

recString = 

new

String(msg.getBody() ,

"UTF-8"

);

catch

(UnsupportedEncodingException e) {

e.printStackTrace();

}

System.out.println(recString);

return

ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

}

);

consumer.start();

catch

(Exception e) {

e.printStackTrace();

}

}

}

   

RocketMq消息隊列使用
RocketMq消息隊列使用
RocketMq消息隊列使用

以上為單機執行個體配置

如果你遇到什麼問題可以私信我,如果覺得此文對你很有幫助,點下贊推薦下額^_^ 

參考:http://blog.csdn.net/a19881029/article/details/34446629

        http://sofar.blog.51cto.com/353572/1540874

        http://blog.csdn.net/loongshawn/article/details/51086876

 RocketMq最佳實踐 《RocketMQ原理簡介》 分布式開放消息系統(RocketMQ)的原理與實踐 《RocketMQ使用者指南》