一.建立spring boot項目
廢話不多說。。。
二.配置application.properties
1. spring.application.name=rocketmq
2. server.port=8088
3. ###producer
4. #該應用是否啟用生産者
5. rocketmq.producer.isOnOff=on
6. #發送同一類消息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一标示
7. rocketmq.producer.groupName=${spring.application.name}
8. #mq的nameserver位址
9. rocketmq.producer.namesrvAddr=127.0.0.1:9876
10. #消息最大長度 預設1024*4(4M)
11. rocketmq.producer.maxMessageSize=4096
12. #發送消息逾時時間,預設3000
13. rocketmq.producer.sendMsgTimeout=3000
14. #發送消息失敗重試次數,預設2
15. rocketmq.producer.retryTimesWhenSendFailed=2
16. ###consumer
17. ##該應用是否啟用消費者
18. rocketmq.consumer.isOnOff=on
19. rocketmq.consumer.groupName=${spring.application.name}
20. #mq的nameserver位址
21. rocketmq.consumer.namesrvAddr=127.0.0.1:9876
22. #該消費者訂閱的主題和tags("*"号表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
23. rocketmq.consumer.topics=DemoTopic~*;
24. rocketmq.consumer.consumeThreadMin=20
25. rocketmq.consumer.consumeThreadMax=64
26. #設定一次消費消息的條數,預設為1條
27. rocketmq.consumer.consumeMessageBatchMaxSize=1
三.建立生産者
1. package cn.baocl.rocketmq.producer;
2.
3. import com.alibaba.rocketmq.client.exception.MQClientException;
4. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
5. import org.slf4j.Logger;
6. import org.slf4j.LoggerFactory;
7. import org.springframework.beans.factory.annotation.Value;
8. import org.springframework.boot.SpringBootConfiguration;
9. import org.springframework.context.annotation.Bean;
10. import org.springframework.util.StringUtils;
11.
12. @SpringBootConfiguration
13. public class MQProducerConfiguration {
14.
15. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
16. /**
17. * 發送同一類消息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一标示
18. */
19. @Value("${rocketmq.producer.groupName}")
20. private String groupName;
21. @Value("${rocketmq.producer.namesrvAddr}")
22. private String namesrvAddr;
23. /**
24. * 消息最大大小,預設4M
25. */
26. @Value("${rocketmq.producer.maxMessageSize}")
27. private Integer maxMessageSize ;
28. /**
29. * 消息發送逾時時間,預設3秒
30. */
31. @Value("${rocketmq.producer.sendMsgTimeout}")
32. private Integer sendMsgTimeout;
33. /**
34. * 消息發送失敗重試次數,預設2次
35. */
36. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
37. private Integer retryTimesWhenSendFailed;
38.
39. @Bean
40. public DefaultMQProducer getRocketMQProducer() throws Exception {
41. if (StringUtils.isEmpty(this.groupName)) {
42. throw new Exception("groupName is blank");
43. }
44. if (StringUtils.isEmpty(this.namesrvAddr)) {
45. throw new Exception("nameServerAddr is blank");
46. }
47. DefaultMQProducer producer;
48. producer = new DefaultMQProducer(this.groupName);
49. producer.setNamesrvAddr(this.namesrvAddr);
50. //如果需要同一個jvm中不同的producer往不同的mq叢集發送消息,需要設定不同的instanceName
51. //producer.setInstanceName(instanceName);
52. producer.setVipChannelEnabled(false);
53. if(this.maxMessageSize!=null){
54. producer.setMaxMessageSize(this.maxMessageSize);
55. }
56. if(this.sendMsgTimeout!=null){
57. producer.setSendMsgTimeout(this.sendMsgTimeout);
58. }
59. //如果發送消息失敗,設定重試次數,預設為2次
60. if(this.retryTimesWhenSendFailed!=null){
61. producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
62. }
63.
64. try {
65. producer.start();
66. LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
67. , this.groupName, this.namesrvAddr));
68. } catch (MQClientException e) {
69. LOGGER.error(String.format("producer is error {}"
70. , e.getMessage(),e));
71. throw new Exception(e);
72. }
73. return producer;
74. }
75. }
4.建立消費者
1. package cn.baocl.rocketmq.consumer;
2.
3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor;
4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
7. import org.slf4j.Logger;
8. import org.slf4j.LoggerFactory;
9. import org.springframework.beans.factory.annotation.Autowired;
10. import org.springframework.beans.factory.annotation.Value;
11. import org.springframework.boot.SpringBootConfiguration;
12. import org.springframework.context.annotation.Bean;
13. import org.springframework.util.StringUtils;
14.
15.
16. @SpringBootConfiguration
17. public class MQConsumerConfiguration {
18.
19. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
20. @Value("${rocketmq.consumer.namesrvAddr}")
21. private String namesrvAddr;
22. @Value("${rocketmq.consumer.groupName}")
23. private String groupName;
24. @Value("${rocketmq.consumer.consumeThreadMin}")
25. private int consumeThreadMin;
26. @Value("${rocketmq.consumer.consumeThreadMax}")
27. private int consumeThreadMax;
28. @Value("${rocketmq.consumer.topics}")
29. private String topics;
30. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
31. private int consumeMessageBatchMaxSize;
32. @Autowired
33. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
34.
35. @Bean
36. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
37. if (StringUtils.isEmpty(groupName)){
38. throw new Exception("groupName is null !!!");
39. }
40. if (StringUtils.isEmpty(namesrvAddr)){
41. throw new Exception("namesrvAddr is null !!!");
42. }
43. if(StringUtils.isEmpty(topics)){
44. throw new Exception("topics is null !!!");
45. }
46. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
47. consumer.setNamesrvAddr(namesrvAddr);
48. consumer.setConsumeThreadMin(consumeThreadMin);
49. consumer.setConsumeThreadMax(consumeThreadMax);
50. consumer.registerMessageListener(mqMessageListenerProcessor);
51.
52. /**
53. * 設定Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
54. * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
55. */
56. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
57. /**
58. * 設定消費模型,叢集還是廣播,預設為叢集
59. */
60. //consumer.setMessageModel(MessageModel.CLUSTERING);
61. /**
62. * 設定一次消費消息的條數,預設為1條
63. */
64. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
65. try {
66. /**
67. * 設定該消費者訂閱的主題和tag,如果是訂閱該主題下的所有tag,則tag使用*;如果需要指定訂閱該主題下的某些tag,則使用||分割,例如tag1||tag2||tag3
68. */
69. String[] topicTagsArr = topics.split(";");
70. for (String topicTags : topicTagsArr) {
71. String[] topicTag = topicTags.split("~");
72. consumer.subscribe(topicTag[0],topicTag[1]);
73. }
74. consumer.start();
75. LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
76. }catch (MQClientException e){
77. LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
78. throw new Exception(e);
79. }
80. return consumer;
81. }
82. }
5.建立處理類
1. package cn.baocl.rocketmq.processor;
2.
3.
4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7. import com.alibaba.rocketmq.common.message.MessageExt;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.stereotype.Component;
11. import org.springframework.util.CollectionUtils;
12.
13. import java.util.List;
14.
15. @Component
16. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
17. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
18. /**
19. * 預設msgs裡隻有一條消息,可以通過設定consumeMessageBatchMaxSize參數來批量接收消息<br/>
20. * 不要抛異常,如果沒有return CONSUME_SUCCESS ,consumer會重新消費該消息,直到return CONSUME_SUCCESS
21. */
22. @Override
23. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
24. if(CollectionUtils.isEmpty(msgs)){
25. logger.info("接受到的消息為空,不處理,直接傳回成功");
26. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
27. }
28. MessageExt messageExt = msgs.get(0);
29. logger.info("接受到的消息為:"+messageExt.toString());
30. if(messageExt.getTopic().equals("你的Topic")){
31. if(messageExt.getTags().equals("你的Tag")){
32. //TODO 判斷該消息是否重複消費(RocketMQ不保證消息不重複,如果你的業務需要保證嚴格的不重複消息,需要你自己在業務端去重)
33. //TODO 擷取該消息重試次數
34. int reconsume = messageExt.getReconsumeTimes();
35. if(reconsume ==3){//消息已經重試了3次,如果不需要再次消費,則傳回成功
36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
37. }
38. //TODO 處理對應的業務邏輯
39. }
40. }
41. // 如果沒有return success ,consumer會重新消費該消息,直到return success
42. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
43. }
44. }
6.接口調用
1. package cn.baocl.rocketmq.controllor;
2.
3. import cn.baocl.rocketmq.entity.TestVo;
4. import com.alibaba.rocketmq.client.exception.MQBrokerException;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
7. import com.alibaba.rocketmq.client.producer.SendCallback;
8. import com.alibaba.rocketmq.client.producer.SendResult;
9. import com.alibaba.rocketmq.common.message.Message;
10. import com.alibaba.rocketmq.remoting.exception.RemotingException;
11. import org.slf4j.Logger;
12. import org.slf4j.LoggerFactory;
13. import org.springframework.beans.factory.annotation.Autowired;
14. import org.springframework.web.bind.annotation.RequestMapping;
15. import org.springframework.web.bind.annotation.RestController;
16.
17.
18. @RestController
19. @RequestMapping("/test")
20. public class TestControllor {
21. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
22.
23. /**
24. * 使用RocketMq的生産者
25. */
26. @Autowired
27. private DefaultMQProducer defaultMQProducer;
28.
29. @RequestMapping("/send")
30. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
31. String msg = "demo msg test";;
32. logger.info("開始發送消息:" + msg);
33. Message sendMsg = new Message("DemoTopic", "DemoTag", msg.getBytes());
34. //預設3秒逾時
35. SendResult sendResult = defaultMQProducer.send(sendMsg);
36. logger.info("消息發送響應資訊:" + sendResult.toString());
37. }
38. }
如果出現org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.99.1:10909> fail 在生産者中加入
producer.setVipChannelEnabled(false);