天天看點

消息發送和接收示範

導入依賴

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>           

發送消息

消息發送步驟:

​ 建立消息消費者, 指定消費者所屬的組名

​ 指定Nameserver位址

​ 指定消費者訂閱的主題和标簽

​ 設定回調函數,編寫處理消息的方法

​ 啟動消息消費者

代碼示例:

package com.wxit.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @Author wj
 **/
//發送消息
public class RocketMQSendMessageTest {
    public static void main(String[] args) throws Exception {
        //1.建立消息生産者,并且設定生産組名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

        //2. 指定Nameserver位址
        producer.setNamesrvAddr("192.168.91.4:9876");

        //3.啟動生産者
        producer.start();

        //4.建立消息對象,指定主題、标簽和消息體
        Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());

        //5.發送消息
        SendResult sendResult = producer.send(message, 10000);
        System.out.println(sendResult);

        //6.關閉生産者
        producer.shutdown();
    }
}
           

接收消息

消息接收步驟:

​ \1. 建立消息消費者, 指定消費者所屬的組名

​ \2. 指定Nameserver位址

​ \3. 指定消費者訂閱的主題和标簽

​ \4. 設定回調函數,編寫處理消息的方法

​ \5. 啟動消息消費者

package com.wxit.test;


import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author wj
 **/
//接收消息
public class RocketMQReceiveMessageTest {

    //接收消息
    public static void main(String[] args) throws Exception {

        //1 建立消費者,并且為其指定消費者組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");

        //2 為消費者設定NameServer的位址
        consumer.setNamesrvAddr("192.168.91.4:9876");

        //3 指定消費者訂閱的主題和标簽
        consumer.subscribe("myTopic", "*");

        //4 設定一個回調函數,并在函數中編寫接收到消息之後的處理方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //處理擷取到的消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //消費邏輯
                System.out.println("Message===>" + list);

                //傳回消費成功狀态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5 啟動消費者
        consumer.start();
        System.out.println("啟動消費者成功了");
    }
}
           

案例

接下來我們模拟一種場景: 下單成功之後,向下單使用者發送短信。設計圖如下:

訂單微服務發送消息

1 在 shop-order 中添加rocketmq的依賴

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>           

2 添加配置

#rocketmq
rocketmq:
  name-server: 192.168.91.4:9876 #rocketMQ服務的位址
  producer:
    group: shop-order  # 生産者組           

3 編寫測試代碼

@RestController
@Slf4j
public class OrderController {

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private OrderService orderService;

    @Autowired
    private ProductService productService;

    @Autowired
    private DiscoveryClient discoveryClient;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //下單 --ribbon自定義負載均衡
    //fegin
    @RequestMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Integer pid){
        log.info("接收到{}号商品的下單請求,接下來調用商品的微服務查詢此商品資訊",pid);

        /**
         * 調用商品微服務,查詢商品資訊
         * 問題:
         * 1.代碼可讀性不好
         * 2.程式設計風格不統一
         */
        Product product = productService.findByPid(pid);

        if (product.getPid() == -100){
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下單失敗");
            return order;
        }

        log.info("查詢到{}号商品的資訊",pid);

        //下單,建立訂單
        Order order = new Order();
        order.setUid(1);
        order.setUsername("測試使用者");

        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);

        orderService.createOrder(order);

        log.info("建立訂單成功,訂單資訊為{}",order);

        //向mq中投遞一個下單成功的資訊
        rocketMQTemplate.convertAndSend("order-topic",order);

        return order;
    }           

使用者微服務訂閱消息

1 修改 shop-user 子產品配置,導入依賴

<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>           

2 修改主類

package com.wxit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

/**
 * @Author wj
 **/
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class);
    }
}
           

3 修改配置檔案

cloud:    nacos:      discovery:        server-addr: 127.0.0.1:8848#rocrocketmq:  name-server: 192.168.91.4:9876           

4 編寫消息接收服務

package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj **/@Slf4j@Service@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> {    @Override    public void onMessage(Order message) {        log.info("接收到了一個訂單資訊{},接下來就可以發送短信通知了", message);    }}           

5 啟動服務,執行下單操作,觀看背景輸出

繼續閱讀