天天看點

RabbitMQ詳解(五):RabbitMQ整合Spring AMQP、SpringBoot、Spring Cloud Stream

八、RabbitMQ整合篇

1、RabbitMQ整合Spring AMQP詳解

1)、RabbitAdmin

  • RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進行注入即可
  • autoStartup必須要設定為true,否則Spring容器不會加載RabbitAdmin類
  • RabbitAdmin底層實作就是從Spring容器中擷取Exchange、Binding、Routingkey以及Queue的@Bean聲明
  • 然後使用RabbitTemplate的execut方法執行對應的聲明、修改、删除等一系列RabbitMQ基礎功能操作

配置類:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.hand.rabbitmq")
public class RabbitMqConfig {
    //<bean id="connectionFactory"/>
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.126.151:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    //ConnectionFactory形參名字和connectionFactory()方法名保持一緻
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
           

測試類:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
public class RabbitmqApplicationTests {
    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void testAdmin() {
        //建立交換機
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
        //建立隊列
        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
        //建立綁定關系 Binding構造函數的參數 隊列名稱、綁定類型、交換機名稱、綁定鍵
        rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE,
                "test.direct",
                "direct",
                new HashMap<>()));
        rabbitAdmin.declareBinding(
                BindingBuilder.
                        bind(new Queue("test.topic.queue", false))
                        .to(new TopicExchange("test.topic", false, false))
                        .with("user.#"));
        rabbitAdmin.declareBinding(
                BindingBuilder.
                        bind(new Queue("test.fanout.queue", false))
                        .to(new FanoutExchange("test.fanout", false, false)));
        //清空隊列資料
        rabbitAdmin.purgeQueue("test.topic.queue", false);
    }

}
           

RabbitAdmin源碼分析:

@ManagedResource(description = "Admin Tasks")
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
		BeanNameAware, InitializingBean {
           

RabbitAdmin實作了InitializingBean接口,在Bean加載之後做一些設定

public interface InitializingBean {

	void afterPropertiesSet() throws Exception;

}
           
@ManagedResource(description = "Admin Tasks")
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
		BeanNameAware, InitializingBean {
	@Override
	public void afterPropertiesSet() {

		synchronized (this.lifecycleMonitor) {
            
			//如果是running或者autoStartup為false的話就直接return
			if (this.running || !this.autoStartup) {
				return;
			}

			if (this.retryTemplate == null && !this.retryDisabled) {
				this.retryTemplate = new RetryTemplate();
				this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(DECLARE_MAX_ATTEMPTS));
				ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
				backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL);
				backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER);
				backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL);
				this.retryTemplate.setBackOffPolicy(backOffPolicy);
			}
			if (this.connectionFactory instanceof CachingConnectionFactory &&
					((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
				this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
				return;
			}

			// Prevent stack overflow...
			final AtomicBoolean initializing = new AtomicBoolean(false);

			this.connectionFactory.addConnectionListener(connection -> {

				if (!initializing.compareAndSet(false, true)) {
					// If we are already initializing, we don't need to do it again...
					return;
				}
				try {
					
					if (this.retryTemplate != null) {
						this.retryTemplate.execute(c -> {
                            //初始化
							initialize();
							return null;
						});
					}
					else {
						initialize();
					}
				}
				finally {
					initializing.compareAndSet(true, false);
				}

			});

			this.running = true;

		}
	}     
            
	@Override // NOSONAR complexity
	public void initialize() {

		if (this.applicationContext == null) {
			this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
			return;
		}

		this.logger.debug("Initializing declarations");
        
        //聲明了Exchange、Queue、Binding三個集合
		Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
				this.applicationContext.getBeansOfType(Exchange.class).values());
		Collection<Queue> contextQueues = new LinkedList<Queue>(
				this.applicationContext.getBeansOfType(Queue.class).values());
		Collection<Binding> contextBindings = new LinkedList<Binding>(
				this.applicationContext.getBeansOfType(Binding.class).values());

		processLegacyCollections(contextExchanges, contextQueues, contextBindings);
		//将Bean類型是Exchange、Queue、Binding的添加到集合中
        processDeclarables(contextExchanges, contextQueues, contextBindings);

		final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
		final Collection<Queue> queues = filterDeclarables(contextQueues);
		final Collection<Binding> bindings = filterDeclarables(contextBindings);

        //将Exchange、Queue集合循環拼接成RabbitMQ能夠識别的方式
		for (Exchange exchange : exchanges) {
			if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
						+ exchange.getName()
						+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
						+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
						+ "reopening the connection.");
			}
		}

		for (Queue queue : queues) {
			if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
						+ queue.getName()
						+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
						+ queue.isExclusive() + ". "
						+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
						+ "alive, but all messages will be lost.");
			}
		}

		if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
			this.logger.debug("Nothing to declare");
			return;
		}
        
        //調用rabbitTemplate.execute在RabbitMQ上建立Exchange、Queue、Binding
		this.rabbitTemplate.execute(channel -> {
			declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
			declareQueues(channel, queues.toArray(new Queue[queues.size()]));
			declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
			return null;
		});
		this.logger.debug("Declarations finished");

	}       

private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,
			Collection<Binding> contextBindings) {
		Collection<Declarables> declarables = this.applicationContext.getBeansOfType(Declarables.class, false, true)
				.values();
		declarables.forEach(d -> {
			d.getDeclarables().forEach(declarable -> {
				if (declarable instanceof Exchange) {
					contextExchanges.add((Exchange) declarable);
				}
				else if (declarable instanceof Queue) {
					contextQueues.add((Queue) declarable);
				}
				else if (declarable instanceof Binding) {
					contextBindings.add((Binding) declarable);
				}
			});
		});
	}            
           

2)、RabbitTemplate(消息模闆)

  • 在與Spring AMQP整合的時候進行發送消息的關鍵類
  • 提供了豐富的發送消息方法,包括可靠性投遞消息方法、回調監聽消息接口ConfirmCallback、傳回值确認接口ReturnCallback等等。同樣需要注入到Spring容器中,然後直接使用

配置類:

//ConnectionFactory形參名字和connectionFactory()方法名保持一緻
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
           

測試方法:

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() {
        //建立消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "資訊描述");
        messageProperties.getHeaders().put("type", "自定義消息類型");
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        //發送消息
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            //在執行消息轉換後添加/修改标題或屬性然後在進行發送
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().getHeaders().put("desc", "額外修改的資訊描述");
                message.getMessageProperties().getHeaders().put("attr", "額外新添加的屬性");
                return message;
            }
        });
    }
           
RabbitMQ詳解(五):RabbitMQ整合Spring AMQP、SpringBoot、Spring Cloud Stream

rabbitTemplate其他的方法:

@Test
    public void testSendMessage2() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq消息1".getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.amqp", message);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "mq消息2");
    }
           

3)、SimpleMessageListenerContainer(簡單消息監聽容器)

  • 對這個類進行設定,對于消費者的配置項,這個類都可以滿足
  • 監聽隊列(多個隊列)、自動啟動、自動聲明功能
  • 設定事務特性、事務管理器、事務屬性、事務容量(并發)、是否開始事務、復原消息等
  • 設定消費者數量、最大最小數量、批量消費
  • 設定消息确認和自動确認模式、是否重回隊列、異常捕獲handler函數
  • 設定消費者标簽生成政策、是否獨占模式、消費者屬性等
  • 設定具體的監聽器、消息轉換器等等

注意:SimpleMessageListenerContainer可以進行動态設定,比如在運作中的應用可以動态的修改其消費者數量的大小、接收消息的模式等

配置類:

//ConnectionFactory形參名字和connectionFactory()方法名保持一緻
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //目前消費者數量
        container.setConcurrentConsumers(1);
        //最大消費者數量
        container.setMaxConcurrentConsumers(5);
        //設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
        container.setDefaultRequeueRejected(false);
        /**
         * 設定消息接收确認模式
         * - AcknowledgeMode.NONE:不确認
         * - AcknowledgeMode.MANUAL:自動确認
         * - AcknowledgeMode.AUTO:手動确認
         */
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的标簽政策
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID();
            }
        });
        //設定消息監聽
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("消費者:" + msg);
            }
        });
        return container;
    }
           
RabbitMQ詳解(五):RabbitMQ整合Spring AMQP、SpringBoot、Spring Cloud Stream

4)、MessageListenerAdapter(消息監聽擴充卡)

1)擴充卡方式一:自定義方法名稱和參數類型

配置類:

@Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //目前消費者數量
        container.setConcurrentConsumers(1);
        //最大消費者數量
        container.setMaxConcurrentConsumers(5);
        //設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
        container.setDefaultRequeueRejected(false);
        /**
         * 設定消息接收确認模式
         * - AcknowledgeMode.NONE:不确認
         * - AcknowledgeMode.MANUAL:自動确認
         * - AcknowledgeMode.AUTO:手動确認
         */
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的标簽政策
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID();
            }
        });
        //擴充卡方式
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        //MessageListenerAdapter自定義方法名
        adapter.setDefaultListenerMethod("consumeMessage");
        //添加轉換器:從位元組數組轉換為String,到MessageDelegate的時候調用參數為String類型的方法
        adapter.setMessageConverter(new TextMessageConverter());
        container.setMessageListener(adapter);
        return container;
    }
           
public class MessageDelegate {
    public void handleMessage(byte[] messageBody) {
        System.out.println("預設方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.out.println("位元組數組方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.out.println("字元串方法, 消息内容:" + messageBody);
    }
}
           
public class TextMessageConverter implements MessageConverter {
    //Java對象轉換成Message對象的方式
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    //Message對象轉換成Java對象的方式
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if (contentType != null && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return null;
    }
}
           

2)擴充卡方式二:隊列的名稱和方法名稱也可以進行一一的比對

配置類:

@Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //目前消費者數量
        container.setConcurrentConsumers(1);
        //最大消費者數量
        container.setMaxConcurrentConsumers(5);
        //設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的标簽政策
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID();
            }
        });
        //擴充卡方式:隊列的名稱和方法名稱也可以進行一一的比對
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put("queue001","method1");
        queueOrTagToMethodName.put("queue002","method2");
        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        adapter.setMessageConverter(new TextMessageConverter());
        container.setMessageListener(adapter);
        return container;
    }
           

MessageListenerAdapter源碼分析:

public class MessageListenerAdapter extends AbstractAdaptableMessageListener {

    //将隊列名和方法名進行比對,将指定隊列适配到指定方法裡
	private final Map<String, String> queueOrTagToMethodName = new HashMap<String, String>();

	public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";

	//new MessageListenerAdapter(new MessageDelegate())時傳入的委托對象,用于處理消息
	private Object delegate;

    //預設的監聽方法的名字為handleMessage,如果要自定義MessageListenerAdapter方法名預設為handleMessage
	private String defaultListenerMethod = ORIGINAL_DEFAULT_LISTENER_METHOD;    
           

5)、MessageConverter(消息轉換器)

在進行發送消息的時候,正常情況下消息體為二進制的資料方式進行傳輸,如果希望内部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter

自定義消息轉換器,需要實作MessageConverter接口,重寫toMessage(Java對象轉換成Message對象的方式)和fromMessage(Message對象轉換成Java對象的方式)方法

  • Json轉換器:Jackson2JsonMessageConverter可以進行Java對象的轉換功能
  • DefaultJackson2JavaTypeMapper映射器:可以進行Java對象的映射關系
  • 自定義二進制轉換器:比如圖檔類型、PDF、PPT、流媒體

1)Jackson2JsonMessageConverter

配置類:

@Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //目前消費者數量
        container.setConcurrentConsumers(1);
        //最大消費者數量
        container.setMaxConcurrentConsumers(5);
        //設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的标簽政策
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID();
            }
        });
        //支援json格式的轉換器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        return container;
    }
           

對應MessageDelegate中的方法:

public void consumeMessage(Map messageBody) {
        System.out.println("map方法, 消息内容:" + messageBody);
    }
           

測試方法:

@Test
    public void testSendJsonMessage() throws Exception {
        Order order = new Order("001","消息訂單","描述資訊");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        MessageProperties messageProperties = new MessageProperties();
        //這裡一定要修改contentType為application/json
        messageProperties.setContentType("application/json");
        Message message = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.order", message);
    }
           

2)Jackson2JsonMessageConverter & DefaultJackson2JavaTypeMapper支援Java對象的轉換

配置類:

@Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //目前消費者數量
        container.setConcurrentConsumers(1);
        //最大消費者數量
        container.setMaxConcurrentConsumers(5);
        //設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的标簽政策
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID();
            }
        });
        //Jackson2JsonMessageConverter & DefaultJackson2JavaTypeMapper支援Java對象的轉換
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        //否則會抛出異常The class '...' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
        //預設隻支援java.util和java.lang包下的類
        javaTypeMapper.setTrustedPackages("*");
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        return container;
    }
           

對應MessageDelegate中的方法:

public void consumeMessage(Order order) {
        System.out.println("order對象, 消息内容, id: " + order.getId() +
                ", name: " + order.getName() +
                ", content: " + order.getContent());
    }
           

測試方法:

@Test
    public void testSendJavaMessage() throws Exception {
        Order order = new Order("001","消息訂單","描述資訊");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        MessageProperties messageProperties = new MessageProperties();
        //這裡一定要修改contentType為application/json
        messageProperties.setContentType("application/json");
        //__TypeId__指定類的全路徑
        messageProperties.getHeaders().put("__TypeId__", "com.hand.rabbitmq.domain.Order");
        Message message = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.order", message);
    }
           

3)DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter支援java對象多映射轉換

配置類:

@Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //目前消費者數量
        container.setConcurrentConsumers(1);
        //最大消費者數量
        container.setMaxConcurrentConsumers(5);
        //設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的标簽政策
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID();
            }
        });
        //DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter支援java對象多映射轉換
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("order", Order.class);
        idClassMapping.put("packaged", Packaged.class);
        javaTypeMapper.setIdClassMapping(idClassMapping);
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        return container;
    }
           

對應MessageDelegate中的方法:

public void consumeMessage(Order order) {
        System.out.println("order對象, 消息内容, id: " + order.getId() +
                ", name: " + order.getName() +
                ", content: " + order.getContent());
    }

    public void consumeMessage(Packaged pack) {
        System.out.println("package對象, 消息内容, id: " + pack.getId() +
                ", name: " + pack.getName() +
                ", content: " + pack.getDescription());
    }
           

測試方法:

@Test
    public void testSendMappingMessage() throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        Order order = new Order("001","消息訂單","描述資訊");
        String json1 = mapper.writeValueAsString(order);
        MessageProperties messageProperties1 = new MessageProperties();
        //這裡注意一定要修改contentType為 application/json
        messageProperties1.setContentType("application/json");
        messageProperties1.getHeaders().put("__TypeId__", "order");
        Message message1 = new Message(json1.getBytes(), messageProperties1);
        rabbitTemplate.send("topic001", "spring.order", message1);
        Packaged pack = new Packaged("002","包裹消息","包裹描述資訊");
        String json2 = mapper.writeValueAsString(pack);
        MessageProperties messageProperties2 = new MessageProperties();
        //這裡一定要修改contentType為application/json
        messageProperties2.setContentType("application/json");
        messageProperties2.getHeaders().put("__TypeId__", "packaged");
        Message message2 = new Message(json2.getBytes(), messageProperties2);
        rabbitTemplate.send("topic001", "spring.pack", message2);
    }
           

4)ContentTypeDelegatingMessageConverter全局轉換器

配置類:

@Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //目前消費者數量
        container.setConcurrentConsumers(1);
        //最大消費者數量
        container.setMaxConcurrentConsumers(5);
        //設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消費端的标簽政策
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID();
            }
        });
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        //全局的轉換器
        ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
        TextMessageConverter textConvert = new TextMessageConverter();
        convert.addDelegate("text", textConvert);
        convert.addDelegate("html/text", textConvert);
        convert.addDelegate("xml/text", textConvert);
        convert.addDelegate("text/plain", textConvert);
        Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
        convert.addDelegate("json", jsonConvert);
        convert.addDelegate("application/json", jsonConvert);
        ImageMessageConverter imageConverter = new ImageMessageConverter();
        convert.addDelegate("image/png", imageConverter);
        convert.addDelegate("image", imageConverter);
        PDFMessageConverter pdfConverter = new PDFMessageConverter();
        convert.addDelegate("application/pdf", pdfConverter);
        adapter.setMessageConverter(convert);
        container.setMessageListener(adapter);
        return container;
    }
           

2、RabbitMQ整合SpringBoot詳解

1)生産端整合

核心配置:

spring.rabbitmq.addresses=192.168.126.151:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#開啟Publisher Confirm機制
spring.rabbitmq.publisher-confirms=true
#開啟Publisher Return機制
spring.rabbitmq.publisher-returns=true
#啟用強制消息,設定為false收不到Publisher Return機制傳回的消息
spring.rabbitmq.template.mandatory=false
           
  • publisher-confirms,實作一個監聽器用于監聽Broker端給我們傳回的确認消息:RabbitTemplate.ConfirmCallback
  • publisher-returns,保證消息對Broker端是可達的,如果出現路由鍵不可達的情況,則使用監聽器對不可達的消息進行後續的處理,保證消息的路由成功:RabbitTemplate.ReturnCallback
  • 在發送消息的時候對template進行配置mandatory=true保證監聽有效
  • 生産端還可以配置其他屬性,比如發送重試、逾時時間、次數、間隔等
import com.hand.rabbitmq.domain.Order;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

@Component
public class RabbitSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData:" + correlationData);
            System.out.println("ack:" + ack);
            if (!ack) {
                System.out.println("異常處理");
            }
        }
    };

    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("return exchange:" + exchange + ",routingKey:" + routingKey + ",replyCode:" + replyCode + ",replyText:" + replyText);
        }
    };

    public void send(Object message, Map<String, Object> properties) {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //最好是id +時間戳 全局唯一
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("exchange-1", "springboot.hello", msg, cd);
    }

    //Order需要實作序列化接口
    public void sendOrder(Order order) {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //最好是id +時間戳 全局唯一
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("exchange-2", "springboot.hello", order, cd);
    }
}
           

2)消費端整合

核心配置:

#确認模式為手動确認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
           
  • 首先配置手工确認模式,用于ACK的手工确認,這樣可以保證消息的可靠性送達,或者在消費端消費失敗的時候可以做到重回隊列、根據業務記錄日志等處理
  • 可以設定消費端的監聽個數和最大個數,用于控制消費端的并發情況
  • 消費端監聽使用@RabbitListener注解,@RabbitListener是一個組合注解,裡面可以注解配置@Queue、@QueueBinding、@Exchange直接通過這個組合注解一次性解決消費端交換機、隊列、綁定、路由并且配置監聽功能等

配置資訊:

spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
           
import com.hand.rabbitmq.domain.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitReceiver {

    //如果沒有可以自動建立交換機、隊列、綁定、路由
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", durable = "true"),
            exchange = @Exchange(value = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    ))
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("消費端:" + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                    durable = "${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
    ))
    @RabbitHandler
    //Order需要實作序列化接口
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.out.println("消費端:" + order);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
           

3、RabbitMQ整合Spring Cloud Stream詳解

Spring Cloud Stream核心架構圖:

RabbitMQ詳解(五):RabbitMQ整合Spring AMQP、SpringBoot、Spring Cloud Stream

Spring Cloud Stream通過定義綁定器Binder作為中間層,完美地實作了應用程式與消息中間件之間的隔離。通過向應用程式暴露統一的Channel通道,使得應用程式不需要再考慮各種不同的消息中間件的實作。當需要更新消息中間件,或是更換其他消息中間件産品時,隻需要更換對應的Binder綁定器而不需要修改任何的應用邏輯

RabbitMQ詳解(五):RabbitMQ整合Spring AMQP、SpringBoot、Spring Cloud Stream

上圖中黃色的為RabbitMQ的部分,綠色的部分為Spring Cloud Stream在生産者和消費者添加了一層中間件

  • @EnableBinding:value參數指定用于定義綁定消息通道的接口,在應用啟動時實作對定義消息通道的綁定
  • @Output:輸出注解,用于定義發送消息接口
  • @Input:輸入注解,用于定義消息的消費者接口
  • @StreamListener:用于定義監聽方法的注解

使用Spring Cloud Stream不能實作可靠性的投遞,會存在少量消息丢失的問題(為了兼顧Kafka)

添加依賴:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
           

Spring Cloud Stream提供了Sink、Source和Processor三個預設實作消息通道的接口

public interface Sink {

	String INPUT = "input";

	@Input(Sink.INPUT)
	SubscribableChannel input();

}
           
public interface Source {

	String OUTPUT = "output";

	@Output(Source.OUTPUT)
	MessageChannel output();

}
           
public interface Processor extends Source, Sink {

}
           

1)生産端整合

server.port=8001

spring.application.name=producer
spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster

spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.126.151:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
           
public interface MqMessageSource {
    String OUTPUT_CHANNEL = "output_channel";

    @Output(MqMessageSource.OUTPUT_CHANNEL)
    MessageChannel output();
}
           
@EnableBinding(MqMessageSource.class)
@Service
public class RabbitmqSender {

    @Autowired
    private MqMessageSource mqMessageSource;

    /**
     * 發送消息
     */
    public String sendMessage(Object message, Map<String, Object> properties) {
        try {
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            boolean sendStatus = mqMessageSource.output().send(msg);
            System.out.println("發送資料:" + message + ",sendStatus: " + sendStatus);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
        return null;
    }

}
           

2)消費端整合

server.port=8002

spring.application.name=consumer
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5

spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.126.151:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
           
public interface MqMessageSink {
    String INPUT_CHANNEL = "input_channel";

    @Input(MqMessageSink.INPUT_CHANNEL)
    SubscribableChannel input();
}
           
@EnableBinding(MqMessageSink.class)
@Service
public class RabbitmqReceiver {

    @StreamListener(MqMessageSink.INPUT_CHANNEL)
    public void receiver(Message message) throws Exception {
        Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        System.out.println("Input Stream 1 接受資料:" + message);
        channel.basicAck(deliveryTag, false);
    }
}