導入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
發送消息
消息發送步驟:
建立消息消費者, 指定消費者所屬的組名
指定Nameserver位址
指定消費者訂閱的主題和标簽
設定回調函數,編寫處理消息的方法
啟動消息消費者
代碼示例:
package com.wxit.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* @Author wj
**/
//發送消息
public class RocketMQSendMessageTest {
public static void main(String[] args) throws Exception {
//1.建立消息生産者,并且設定生産組名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver位址
producer.setNamesrvAddr("192.168.91.4:9876");
//3.啟動生産者
producer.start();
//4.建立消息對象,指定主題、标簽和消息體
Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());
//5.發送消息
SendResult sendResult = producer.send(message, 10000);
System.out.println(sendResult);
//6.關閉生産者
producer.shutdown();
}
}
接收消息
消息接收步驟:
\1. 建立消息消費者, 指定消費者所屬的組名
\2. 指定Nameserver位址
\3. 指定消費者訂閱的主題和标簽
\4. 設定回調函數,編寫處理消息的方法
\5. 啟動消息消費者
package com.wxit.test;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author wj
**/
//接收消息
public class RocketMQReceiveMessageTest {
//接收消息
public static void main(String[] args) throws Exception {
//1 建立消費者,并且為其指定消費者組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");
//2 為消費者設定NameServer的位址
consumer.setNamesrvAddr("192.168.91.4:9876");
//3 指定消費者訂閱的主題和标簽
consumer.subscribe("myTopic", "*");
//4 設定一個回調函數,并在函數中編寫接收到消息之後的處理方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
//處理擷取到的消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//消費邏輯
System.out.println("Message===>" + list);
//傳回消費成功狀态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5 啟動消費者
consumer.start();
System.out.println("啟動消費者成功了");
}
}
案例
接下來我們模拟一種場景: 下單成功之後,向下單使用者發送短信。設計圖如下:
訂單微服務發送消息
1 在 shop-order 中添加rocketmq的依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2 添加配置
#rocketmq
rocketmq:
name-server: 192.168.91.4:9876 #rocketMQ服務的位址
producer:
group: shop-order # 生産者組
3 編寫測試代碼
@RestController
@Slf4j
public class OrderController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private OrderService orderService;
@Autowired
private ProductService productService;
@Autowired
private DiscoveryClient discoveryClient;
@Autowired
private RocketMQTemplate rocketMQTemplate;
//下單 --ribbon自定義負載均衡
//fegin
@RequestMapping("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid){
log.info("接收到{}号商品的下單請求,接下來調用商品的微服務查詢此商品資訊",pid);
/**
* 調用商品微服務,查詢商品資訊
* 問題:
* 1.代碼可讀性不好
* 2.程式設計風格不統一
*/
Product product = productService.findByPid(pid);
if (product.getPid() == -100){
Order order = new Order();
order.setOid(-100L);
order.setPname("下單失敗");
return order;
}
log.info("查詢到{}号商品的資訊",pid);
//下單,建立訂單
Order order = new Order();
order.setUid(1);
order.setUsername("測試使用者");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);
orderService.createOrder(order);
log.info("建立訂單成功,訂單資訊為{}",order);
//向mq中投遞一個下單成功的資訊
rocketMQTemplate.convertAndSend("order-topic",order);
return order;
}
使用者微服務訂閱消息
1 修改 shop-user 子產品配置,導入依賴
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2 修改主類
package com.wxit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* @Author wj
**/
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class);
}
}
3 修改配置檔案
cloud: nacos: discovery: server-addr: 127.0.0.1:8848#rocrocketmq: name-server: 192.168.91.4:9876
4 編寫消息接收服務
package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj **/@Slf4j@Service@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> { @Override public void onMessage(Order message) { log.info("接收到了一個訂單資訊{},接下來就可以發送短信通知了", message); }}
5 啟動服務,執行下單操作,觀看背景輸出