生産者
配置生産者
/**
* @author BNTang
*/
@Configuration
public class RoutingTopicConfig {
/**
* 聲明交換機
*
* @return 交換機
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topics");
}
/**
* 聲明隊列1 綁定info和warm
*
* @return 隊列1
*/
@Bean
public Queue topicQueue1() {
return new Queue("topicQueue1");
}
/**
* 聲明隊列2
*
* @return 隊列2
*/
@Bean
public Queue topicQueue2() {
return new Queue("topicQueue2");
}
/**
* 把隊列1 綁定到交換機裡面指定user.*的路由key
*
* @return 綁定之後的一個關系
*/
@Bean
public Binding binding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.*");
}
/**
* 把隊列2 綁定到交換機裡面指定user.#的路由key
*
* @return 綁定之後的一個關系
*/
@Bean
public Binding binding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
}
}
發送消息
@Test
public void testTopic() {
this.rabbitTemplate.convertAndSend("topics", "user.save", "user.save 的消息");
this.rabbitTemplate.convertAndSend("topics", "user.save.findAll", "user.save.findAll 的消息");
this.rabbitTemplate.convertAndSend("topics", "user", "user 的消息");
}
消費者
消費消息
/**
* @author BNTang
*/
@Component
public class RoutingTopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("topicQueue1"),
key = {"user.*"},
exchange = @Exchange(name = "topics", type = ExchangeTypes.TOPIC)
)
})
public void receive1(String message) {
System.out.println("消費者【1】接收到【user.*】消息:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("topicQueue2"),
key = {"user.#"},
exchange = @Exchange(name = "topics", type = ExchangeTypes.TOPIC)
)
})
public void receive2(String message) {
System.out.println("消費者【2】接收到【user.#】消息:" + message);
}
}
測試方式同之前章節中的一樣。