天天看點

五分鐘帶你玩轉rocketMQ(三)spring boot整合rocketMQ

一.建立spring boot項目

廢話不多說。。。

二.配置application.properties

1. spring.application.name=rocketmq
2. server.port=8088
3. ###producer
4. #該應用是否啟用生産者
5. rocketmq.producer.isOnOff=on
6. #發送同一類消息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一标示
7. rocketmq.producer.groupName=${spring.application.name}
8. #mq的nameserver位址
9. rocketmq.producer.namesrvAddr=127.0.0.1:9876
10. #消息最大長度 預設1024*4(4M)
11. rocketmq.producer.maxMessageSize=4096
12. #發送消息逾時時間,預設3000
13. rocketmq.producer.sendMsgTimeout=3000
14. #發送消息失敗重試次數,預設2
15. rocketmq.producer.retryTimesWhenSendFailed=2
16. ###consumer
17. ##該應用是否啟用消費者
18. rocketmq.consumer.isOnOff=on
19. rocketmq.consumer.groupName=${spring.application.name}
20. #mq的nameserver位址
21. rocketmq.consumer.namesrvAddr=127.0.0.1:9876
22. #該消費者訂閱的主題和tags("*"号表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
23. rocketmq.consumer.topics=DemoTopic~*;
24. rocketmq.consumer.consumeThreadMin=20
25. rocketmq.consumer.consumeThreadMax=64
26. #設定一次消費消息的條數,預設為1條
27. rocketmq.consumer.consumeMessageBatchMaxSize=1      

三.建立生産者

1. package cn.baocl.rocketmq.producer;
2. 
3. import com.alibaba.rocketmq.client.exception.MQClientException;
4. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
5. import org.slf4j.Logger;
6. import org.slf4j.LoggerFactory;
7. import org.springframework.beans.factory.annotation.Value;
8. import org.springframework.boot.SpringBootConfiguration;
9. import org.springframework.context.annotation.Bean;
10. import org.springframework.util.StringUtils;
11. 
12. @SpringBootConfiguration
13. public class MQProducerConfiguration {
14. 
15. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
16. /**
17.      * 發送同一類消息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一标示
18.      */
19. @Value("${rocketmq.producer.groupName}")
20. private String groupName;
21. @Value("${rocketmq.producer.namesrvAddr}")
22. private String namesrvAddr;
23. /**
24.      * 消息最大大小,預設4M
25.      */
26. @Value("${rocketmq.producer.maxMessageSize}")
27. private Integer maxMessageSize ;
28. /**
29.      * 消息發送逾時時間,預設3秒
30.      */
31. @Value("${rocketmq.producer.sendMsgTimeout}")
32. private Integer sendMsgTimeout;
33. /**
34.      * 消息發送失敗重試次數,預設2次
35.      */
36. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
37. private Integer retryTimesWhenSendFailed;
38. 
39. @Bean
40. public DefaultMQProducer getRocketMQProducer() throws Exception {
41. if (StringUtils.isEmpty(this.groupName)) {
42. throw new Exception("groupName is blank");
43.         }
44. if (StringUtils.isEmpty(this.namesrvAddr)) {
45. throw new Exception("nameServerAddr is blank");
46.         }
47.         DefaultMQProducer producer;
48.         producer = new DefaultMQProducer(this.groupName);
49.         producer.setNamesrvAddr(this.namesrvAddr);
50. //如果需要同一個jvm中不同的producer往不同的mq叢集發送消息,需要設定不同的instanceName
51. //producer.setInstanceName(instanceName);
52.         producer.setVipChannelEnabled(false);
53. if(this.maxMessageSize!=null){
54.             producer.setMaxMessageSize(this.maxMessageSize);
55.         }
56. if(this.sendMsgTimeout!=null){
57.             producer.setSendMsgTimeout(this.sendMsgTimeout);
58.         }
59. //如果發送消息失敗,設定重試次數,預設為2次
60. if(this.retryTimesWhenSendFailed!=null){
61.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
62.         }
63. 
64. try {
65.             producer.start();
66.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
67.                     , this.groupName, this.namesrvAddr));
68.         } catch (MQClientException e) {
69.             LOGGER.error(String.format("producer is error {}"
70.                     , e.getMessage(),e));
71. throw new Exception(e);
72.         }
73. return producer;
74.     }
75. }      

4.建立消費者

1. package cn.baocl.rocketmq.consumer;
2. 
3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor;
4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
7. import org.slf4j.Logger;
8. import org.slf4j.LoggerFactory;
9. import org.springframework.beans.factory.annotation.Autowired;
10. import org.springframework.beans.factory.annotation.Value;
11. import org.springframework.boot.SpringBootConfiguration;
12. import org.springframework.context.annotation.Bean;
13. import org.springframework.util.StringUtils;
14. 
15. 
16. @SpringBootConfiguration
17. public class MQConsumerConfiguration {
18. 
19. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
20. @Value("${rocketmq.consumer.namesrvAddr}")
21. private String namesrvAddr;
22. @Value("${rocketmq.consumer.groupName}")
23. private String groupName;
24. @Value("${rocketmq.consumer.consumeThreadMin}")
25. private int consumeThreadMin;
26. @Value("${rocketmq.consumer.consumeThreadMax}")
27. private int consumeThreadMax;
28. @Value("${rocketmq.consumer.topics}")
29. private String topics;
30. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
31. private int consumeMessageBatchMaxSize;
32. @Autowired
33. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
34. 
35. @Bean
36. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
37. if (StringUtils.isEmpty(groupName)){
38. throw new Exception("groupName is null !!!");
39.         }
40. if (StringUtils.isEmpty(namesrvAddr)){
41. throw new Exception("namesrvAddr is null !!!");
42.         }
43. if(StringUtils.isEmpty(topics)){
44. throw new Exception("topics is null !!!");
45.         }
46.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
47.         consumer.setNamesrvAddr(namesrvAddr);
48.         consumer.setConsumeThreadMin(consumeThreadMin);
49.         consumer.setConsumeThreadMax(consumeThreadMax);
50.         consumer.registerMessageListener(mqMessageListenerProcessor);
51. 
52. /**
53.          * 設定Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
54.          * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
55.          */
56.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
57. /**
58.          * 設定消費模型,叢集還是廣播,預設為叢集
59.          */
60. //consumer.setMessageModel(MessageModel.CLUSTERING);
61. /**
62.          * 設定一次消費消息的條數,預設為1條
63.          */
64.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
65. try {
66. /**
67.              * 設定該消費者訂閱的主題和tag,如果是訂閱該主題下的所有tag,則tag使用*;如果需要指定訂閱該主題下的某些tag,則使用||分割,例如tag1||tag2||tag3
68.              */
69.             String[] topicTagsArr = topics.split(";");
70. for (String topicTags : topicTagsArr) {
71.                 String[] topicTag = topicTags.split("~");
72.                 consumer.subscribe(topicTag[0],topicTag[1]);
73.             }
74.             consumer.start();
75.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
76.         }catch (MQClientException e){
77.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
78. throw new Exception(e);
79.         }
80. return consumer;
81.     }
82. }      

5.建立處理類

1. package cn.baocl.rocketmq.processor;
2. 
3. 
4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7. import com.alibaba.rocketmq.common.message.MessageExt;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.stereotype.Component;
11. import org.springframework.util.CollectionUtils;
12. 
13. import java.util.List;
14. 
15. @Component
16. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
17. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
18. /**
19.      *  預設msgs裡隻有一條消息,可以通過設定consumeMessageBatchMaxSize參數來批量接收消息<br/>
20.      *  不要抛異常,如果沒有return CONSUME_SUCCESS ,consumer會重新消費該消息,直到return CONSUME_SUCCESS
21.      */
22. @Override
23. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
24. if(CollectionUtils.isEmpty(msgs)){
25.             logger.info("接受到的消息為空,不處理,直接傳回成功");
26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
27.         }
28. MessageExt messageExt = msgs.get(0);
29.         logger.info("接受到的消息為:"+messageExt.toString());
30. if(messageExt.getTopic().equals("你的Topic")){
31. if(messageExt.getTags().equals("你的Tag")){
32. //TODO 判斷該消息是否重複消費(RocketMQ不保證消息不重複,如果你的業務需要保證嚴格的不重複消息,需要你自己在業務端去重)
33. //TODO 擷取該消息重試次數
34. int reconsume = messageExt.getReconsumeTimes();
35. if(reconsume ==3){//消息已經重試了3次,如果不需要再次消費,則傳回成功
36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
37.                 }
38. //TODO 處理對應的業務邏輯
39.             }
40.         }
41. // 如果沒有return success ,consumer會重新消費該消息,直到return success
42. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
43.     }
44. }      

6.接口調用

1. package cn.baocl.rocketmq.controllor;
2. 
3. import cn.baocl.rocketmq.entity.TestVo;
4. import com.alibaba.rocketmq.client.exception.MQBrokerException;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
7. import com.alibaba.rocketmq.client.producer.SendCallback;
8. import com.alibaba.rocketmq.client.producer.SendResult;
9. import com.alibaba.rocketmq.common.message.Message;
10. import com.alibaba.rocketmq.remoting.exception.RemotingException;
11. import org.slf4j.Logger;
12. import org.slf4j.LoggerFactory;
13. import org.springframework.beans.factory.annotation.Autowired;
14. import org.springframework.web.bind.annotation.RequestMapping;
15. import org.springframework.web.bind.annotation.RestController;
16. 
17. 
18. @RestController
19. @RequestMapping("/test")
20. public class TestControllor {
21. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
22. 
23. /**
24.      * 使用RocketMq的生産者
25.      */
26. @Autowired
27. private DefaultMQProducer defaultMQProducer;
28. 
29. @RequestMapping("/send")
30. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
31. String msg = "demo msg test";;
32.         logger.info("開始發送消息:" + msg);
33. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes());        
34. //預設3秒逾時
35. SendResult sendResult = defaultMQProducer.send(sendMsg);
36.         logger.info("消息發送響應資訊:" + sendResult.toString());
37.     }
38. }      

如果出現org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.99.1:10909> fail  在生産者中加入

producer.setVipChannelEnabled(false);