天天看点

SpringBoot实现Rabbit消息队列

1. maven 引用 Rabbit

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

2. 配置RabbitConfig,可放在Application 同级

package com.onem2;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.onem2.base.common.util.Config;
import com.onem2.common.controller.RabbitComponent;

/**
 * @ClassName: RabbitConfig
 * @Description: 注册消息队列
 * @author heliang
 * @date 2017-12-13 上午10:21:33
 * @version V2.1 * Update Logs: * Name: * Date: * Description: 初始化
 */
@Configuration
public class RabbitConfig {
    /** 消息交换机名称 */
    public static final String EXCHANGE = "bank-log-exchange";
    /** 路由键1 */
    public static final String ROUTINGKEY_LOG = "bank-log-queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("xxx.xx.xxx.xx");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername(Config.getProperty("rabbit.username"));
        connectionFactory.setPassword(Config.getProperty("rabbit.password"));
        connectionFactory.setVirtualHost(Config
                .getProperty("rabbit.virtualHost"));
        connectionFactory.setPublisherConfirms(true); 
        return connectionFactory;
    }

    /**
     * 接受消息的监听,这个监听客户交易流水的消息 针对消费者配置
     * 
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer1(
            ConnectionFactory connectionFactory, RabbitComponent rabbitComponent) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
                connectionFactory);
        container.setQueues(queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(10);
        container.setConcurrentConsumers(10);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置确认模式手工确认
        container.setMessageListener(rabbitComponent);
        return container;
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE);
    }

    /** 创建队列 */
    @Bean
    public Queue queue() {
        return new Queue(ROUTINGKEY_LOG, true);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(topicExchange())
                .with(ROUTINGKEY_LOG);
    }

}
           

3. 实现消息队列接收,如果不需要接收,只要发送,只需要取消掉messageContainer1这个方法就行了。

RabbitComponent 为接收处理方案

package com.onem2.common.controller;

import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.onem2.capacity.bank.biz.BankReceiveLogBizService;
import com.rabbitmq.client.Channel;

@Component
public class RabbitComponent implements ChannelAwareMessageListener {

    @Autowired
    private BankReceiveLogBizService bankReceiveLogBizService;

    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            String boby = new String(message.getBody(), "utf-8");// 转换消息,我们是使用json数据格式
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                    true);// 已消费
        } catch (Exception e) {
            e.printStackTrace();
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                    false);// 未消费,放回队列
        }

    }
}
           

4. 实现消息发送 

@Autowired
private AmqpTemplate rabbitTemplate;



this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE,
            			RabbitConfig.ROUTINGKEY_LOG, log);
           

就这么简单,如果需要广播方式,请去参考rabbit的其他路由配置。

继续阅读