天天看點

SpringCloudStream4-rabbit配置

作者:Java尚浩宇

一、概述

自SpringCloud2020開始,官方就标記了@StreamListener、@Input等注解為過期類,新增了函數式消息接收機制。到SpringCloud2022,過期類全部被移除,隻能使用函數式消息接收機制。本文通過前後版本配置對比進行SpringCloudStream4版本下的消息配置介紹。

二、SpringCloud2020及之前版本

2.1公共配置

Pipeline

public interface TestPipeline {

	String INPUT_TEST = "input_test";
	String OUTPUT_TEST = "output_test";

	@Input(ServiceWorkerPipeline.INPUT_TEST)
	SubscribableChannel listen();

	
	@Output(ServiceWorkerPipeline.OUTPUT_TEST)
	MessageChannel send();
}           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

2.2生産者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

發送消息

@Component
public class AsyncMsgUtil {
	private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
	@Autowired
	private TestPipeline testPipeline;

	public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
		try {
			this.LOGGER.info("email={}", email);
			Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
			boolean flag = this.testPipeline.send().send(message);
			this.LOGGER.info("send status={}", flag);
		} catch (Exception e) {
			this.LOGGER.error("ERROR", e);
		}
	}
}
           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

2.3消費者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

消費消息代碼

@StreamListener(target = TestPipeline.INPUT_TEST, condition =
	 "headers['action']=='test'")
public void test(User user,@Header(value = "caller") Long callerId) {
      //do something
}           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

2.4總結

通過以上示例,可以看到,SpringCloud2020及之前版本,是通過一個Pipeline類聲明輸入輸出通道。聲明時主要聲明INPUT和OUTPUT,INPUT的值對應生産者配置檔案裡spring.cloud.stream.bindings.<binding name>.*中的binding name;同理,OUTPUT的值也對應消費者配置檔案裡的binding name。

另外消費消息時,通過注解@StreamListener指定通道,以及篩選指定header的消息。被注解的方法,不能有傳回值,參數會自動注入。

三、SpringCloud2020及之後版本

3.1生産者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

發送消息

@Component
public class AsyncMsgUtil {
	private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
	@Autowired
	private StreamBridge bridge;

	public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
		try {
			this.LOGGER.info("email={}", email);
			Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
			boolean flag = this.bridge.send("output_test-in-0", message);
			this.LOGGER.info("send status={}", flag);
		} catch (Exception e) {
			this.LOGGER.error("ERROR", e);
		}
	}
}
           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

3.2消費者配置

application.properties

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注釋
spring.cloud.stream.bindings.input_test-in-0.group=worker           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

消費消息代碼

@Bean
	Consumer<JsonResult<User>> input_test() throws Exception {
		return jsonResult -> {
			System.out.println("testmq-----" + jsonResult);
		};
	}           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

3.3總結

相比于之前,減少了很多配置,但是減少的前提是增大了約定。主要是:

1、bridge.send("output_test-in-0", message)中“output_test-in-0”對應生産者配置檔案中spring.cloud.stream.bindings.<binding name>.*的binding name,生産者可隻配置rabbit資訊,其它通過消費者配置實作最少化配置。

2、消費者取消@StreamListener,使用普通的@Bean注解,是以配置裡要指定哪些是rabbit的消費者,使用配置spring.cloud.function.definition指定,配置值是方法名,多個以英文分号進行。

3、消費者配置spring.cloud.stream.bindings.<binding name>.*中的binding name命名有特定規則:<binding name>_in_0,同時<binding name>也是@ Bean注解的方法名。

4、如果生産者0配置,那麼消費者配置時需要額外配置下destination屬性,比如生産者發送消息的aaa,那麼消費者的配置應該為:

spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit

spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注釋
spring.cloud.stream.bindings.input_test-in-0.destination=aaa
spring.cloud.stream.bindings.input_test-in-0.group=worker           
SpringCloudStream4-rabbit配置
SpringCloudStream4-rabbit配置

5、函數式無法進行同一個通道按header接收消息,這塊和之前不一緻,是以生産者發送消息時,最好用枚舉,避免随意發送而沒地接收的情況。

6、如果生産者在發生消息時按照<binding name>_in_0格式發送,且消息不需要持久化,那麼生産者和消費者可以都隻配置rabbit資訊,不用配置其他任何資訊。

繼續閱讀