1. maven 引用 Rabbit
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置RabbitConfig,可放在Application 同级
package com.onem2;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.onem2.base.common.util.Config;
import com.onem2.common.controller.RabbitComponent;
/**
* @ClassName: RabbitConfig
* @Description: 注册消息队列
* @author heliang
* @date 2017-12-13 上午10:21:33
* @version V2.1 * Update Logs: * Name: * Date: * Description: 初始化
*/
@Configuration
public class RabbitConfig {
/** 消息交换机名称 */
public static final String EXCHANGE = "bank-log-exchange";
/** 路由键1 */
public static final String ROUTINGKEY_LOG = "bank-log-queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("xxx.xx.xxx.xx");
connectionFactory.setPort(5672);
connectionFactory.setUsername(Config.getProperty("rabbit.username"));
connectionFactory.setPassword(Config.getProperty("rabbit.password"));
connectionFactory.setVirtualHost(Config
.getProperty("rabbit.virtualHost"));
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
/**
* 接受消息的监听,这个监听客户交易流水的消息 针对消费者配置
*
* @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer1(
ConnectionFactory connectionFactory, RabbitComponent rabbitComponent) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(10);
container.setConcurrentConsumers(10);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置确认模式手工确认
container.setMessageListener(rabbitComponent);
return container;
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE);
}
/** 创建队列 */
@Bean
public Queue queue() {
return new Queue(ROUTINGKEY_LOG, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange())
.with(ROUTINGKEY_LOG);
}
}
3. 实现消息队列接收,如果不需要接收,只要发送,只需要取消掉messageContainer1这个方法就行了。
RabbitComponent 为接收处理方案
package com.onem2.common.controller;
import java.util.Map;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.onem2.capacity.bank.biz.BankReceiveLogBizService;
import com.rabbitmq.client.Channel;
@Component
public class RabbitComponent implements ChannelAwareMessageListener {
@Autowired
private BankReceiveLogBizService bankReceiveLogBizService;
public void onMessage(Message message, Channel channel) throws Exception {
try {
String boby = new String(message.getBody(), "utf-8");// 转换消息,我们是使用json数据格式
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
true);// 已消费
} catch (Exception e) {
e.printStackTrace();
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false);// 未消费,放回队列
}
}
}
4. 实现消息发送
@Autowired
private AmqpTemplate rabbitTemplate;
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE,
RabbitConfig.ROUTINGKEY_LOG, log);
就这么简单,如果需要广播方式,请去参考rabbit的其他路由配置。