hello你好,我是辰兮,很高兴你能来阅读,本篇继续分享消息队列的实践案例,分享给初学者,大家一起进步!
文章目录
-
-
- 一、文章序言
- 二、代码详解
-
一、文章序言
整理几篇消息队列文章的初衷就是初学时网上很多案例很零碎,代码也是,为了更好的便于初学者学习就将一个完整的案例整理下来,代码完全真实可用,请自行实践!
消息队列
关于初次学习消息队列我们一定要了解它的应用场景,为什么使用?以及如何去使用?整理本篇文章为了分享给初学者给他提供一个详细案例,如下代码可以直接实践!
消息队列最核心的三个点:解耦、异步、削峰。
参考文章:消息队列作用(解耦、异步、削峰)图详解
消息队列也设计到生产者,消费者原理可以简单的了解一下
参考文章:生产者消费者问题-代码详解(Java多线程)
二、代码详解
生产者的相关代码
/**
* 编写消息的生产者
*/
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
private RabbitTemplate rabbitTemplate;
/**
* 构造方法注入rabbitTemplate
*/
@Autowired
public MsgProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
}
/**
* 发送消息方法一个交换机配一个路由配一个队列
* @param content
*/
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_B, RabbitConfig.ROUTINGKEY_B, content, correlationId);
}
/**
* 广播模式
* @param content
*/
public void sendAll(String content) {
rabbitTemplate.convertAndSend("fanoutExchange","", content);
}
/**
* 回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//logger.info(" 回调id:" + correlationData);
if (ack) {
logger.info("消息成功消费");
} else {
logger.info("消息消费失败:" + cause);
}
}
}
关于Controller层的相关代码
@RestController
public class SendController {
@Autowired
private MsgProducer msgProducer;
@RequestMapping(value = "/send",method = RequestMethod.GET)
public void send(int length){
for (int i=1;i<=length;i++){
msgProducer.sendMsg("这是我发送的第"+i+"个信息");
}
}
@RequestMapping(value = "/sendAll",method = RequestMethod.GET)
public void sendAll(int length){
for (int i=1;i<=length;i++){
msgProducer.sendAll("这是我发送的第"+i+"个信息");
}
}
}
消费者的相关代码
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver_one {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
logger.info("消费者one接收处理队列A当中的消息: " + content);
}
}
这两个分别监听队列A和队列B
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiver_two {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
logger.info("消费者two接收处理队列A当中的消息: " + content);
}
}
测试:在浏览器输入访问第一个路径输入相关参数
这里第一个路径是生产者分别发送给队列A和B
看到控制台打印相关信息
测试:在浏览器输入访问第二个路径和输入相关参数
这里第二个路径是以广播的形式发布都可以消费
看到控制台打印相关信息
http://localhost:15672/#/queues
项目配置文件的相关信息
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
public static final String FANOUT_EXCHANGE="fanoutExchange";
//交换机
public static final String EXCHANGE_A = "my-mq-exchange_A";
public static final String EXCHANGE_B = "my-mq-exchange_B";
public static final String EXCHANGE_C = "my-mq-exchange_C";
//队列
public static final String QUEUE_A = "QUEUE_A";
public static final String QUEUE_B = "QUEUE_B";
public static final String QUEUE_C = "QUEUE_C";
//路由关键字 key
public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//connectionFactory.setVirtualHost("/test");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchangeA() {
return new DirectExchange(EXCHANGE_A);
}
@Bean
public DirectExchange defaultExchangeB() {
return new DirectExchange(EXCHANGE_B);
}
@Bean
public DirectExchange directExchangeC(){
return new DirectExchange(EXCHANGE_C);
}
/**
* 获取队列A
* @return
*/
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); //队列持久
}
@Bean
public Queue queueB() {
return new Queue(QUEUE_B, true); //队列持久
}
@Bean
public Queue queueC() {
return new Queue(QUEUE_C, true); //队列持久
}
/**
* 队列绑定交换机
* @return
*/
@Bean
public Binding bindingA() {
return BindingBuilder.bind(queueA()).to(defaultExchangeA()).with(RabbitConfig.ROUTINGKEY_A);
}
@Bean
public Binding bindingB(){
return BindingBuilder.bind(queueB()).to(defaultExchangeB()).with(RabbitConfig.ROUTINGKEY_B);
}
@Bean
public Binding bindingC(){
return BindingBuilder.bind(queueC()).to(directExchangeC()).with(RabbitConfig.ROUTINGKEY_C);
}
//配置fanout_exchange
//fanout只能支持统一广播
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
}
//把所有的队列都绑定到这个交换机上去
@Bean
Binding bindingExchangeA(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA()).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB()).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueC()).to(fanoutExchange);
}
}
小结:
1.首先创建一个配置类 然后 创建队列
2.创建生产者 生产者要用消息队列的对象,然后传递你想传递的消息
3.创建控制层 填好路径 ,创建生产者对象,然后发送信息
4.消费者,填写好消费队列的名称,监听准备开始消费
The best investment is to invest in yourself.
2020.10.07 希望你们奔赴在自己的热爱里!