一、概述
自SpringCloud2020開始,官方就标記了@StreamListener、@Input等注解為過期類,新增了函數式消息接收機制。到SpringCloud2022,過期類全部被移除,隻能使用函數式消息接收機制。本文通過前後版本配置對比進行SpringCloudStream4版本下的消息配置介紹。
二、SpringCloud2020及之前版本
2.1公共配置
Pipeline
public interface TestPipeline {
String INPUT_TEST = "input_test";
String OUTPUT_TEST = "output_test";
@Input(ServiceWorkerPipeline.INPUT_TEST)
SubscribableChannel listen();
@Output(ServiceWorkerPipeline.OUTPUT_TEST)
MessageChannel send();
}
2.2生産者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json
發送消息
@Component
public class AsyncMsgUtil {
private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
@Autowired
private TestPipeline testPipeline;
public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
try {
this.LOGGER.info("email={}", email);
Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
boolean flag = this.testPipeline.send().send(message);
this.LOGGER.info("send status={}", flag);
} catch (Exception e) {
this.LOGGER.error("ERROR", e);
}
}
}
2.3消費者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.stream.bindings.output_test.binder=rabbit
spring.cloud.stream.bindings.output_test.destination=test
spring.cloud.stream.bindings.output_test.content-type=application/json
消費消息代碼
@StreamListener(target = TestPipeline.INPUT_TEST, condition =
"headers['action']=='test'")
public void test(User user,@Header(value = "caller") Long callerId) {
//do something
}
2.4總結
通過以上示例,可以看到,SpringCloud2020及之前版本,是通過一個Pipeline類聲明輸入輸出通道。聲明時主要聲明INPUT和OUTPUT,INPUT的值對應生産者配置檔案裡spring.cloud.stream.bindings.<binding name>.*中的binding name;同理,OUTPUT的值也對應消費者配置檔案裡的binding name。
另外消費消息時,通過注解@StreamListener指定通道,以及篩選指定header的消息。被注解的方法,不能有傳回值,參數會自動注入。
三、SpringCloud2020及之後版本
3.1生産者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
發送消息
@Component
public class AsyncMsgUtil {
private final Logger LOGGER = LoggerFactory.getLogger(AsyncMsgUtil.class);
@Autowired
private StreamBridge bridge;
public void sendEmailMsg(String email, String title, TemplateKey4Mail key, Map<String, Object> args) {
try {
this.LOGGER.info("email={}", email);
Message<MailMsgData> message = MessageBuilder.build(new MailMsgData(email, title, key, args), null);
boolean flag = this.bridge.send("output_test-in-0", message);
this.LOGGER.info("send status={}", flag);
} catch (Exception e) {
this.LOGGER.error("ERROR", e);
}
}
}
3.2消費者配置
application.properties
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注釋
spring.cloud.stream.bindings.input_test-in-0.group=worker
消費消息代碼
@Bean
Consumer<JsonResult<User>> input_test() throws Exception {
return jsonResult -> {
System.out.println("testmq-----" + jsonResult);
};
}
3.3總結
相比于之前,減少了很多配置,但是減少的前提是增大了約定。主要是:
1、bridge.send("output_test-in-0", message)中“output_test-in-0”對應生産者配置檔案中spring.cloud.stream.bindings.<binding name>.*的binding name,生産者可隻配置rabbit資訊,其它通過消費者配置實作最少化配置。
2、消費者取消@StreamListener,使用普通的@Bean注解,是以配置裡要指定哪些是rabbit的消費者,使用配置spring.cloud.function.definition指定,配置值是方法名,多個以英文分号進行。
3、消費者配置spring.cloud.stream.bindings.<binding name>.*中的binding name命名有特定規則:<binding name>_in_0,同時<binding name>也是@ Bean注解的方法名。
4、如果生産者0配置,那麼消費者配置時需要額外配置下destination屬性,比如生産者發送消息的aaa,那麼消費者的配置應該為:
spring.rabbitmq.addresses=amqp://localhost:5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.connection-timeout=100000
spring.cloud.stream.default-binder=rabbit
spring.cloud.function.definition=input_test
#此配置消息持久化,若不需要可注釋
spring.cloud.stream.bindings.input_test-in-0.destination=aaa
spring.cloud.stream.bindings.input_test-in-0.group=worker
5、函數式無法進行同一個通道按header接收消息,這塊和之前不一緻,是以生産者發送消息時,最好用枚舉,避免随意發送而沒地接收的情況。
6、如果生産者在發生消息時按照<binding name>_in_0格式發送,且消息不需要持久化,那麼生産者和消費者可以都隻配置rabbit資訊,不用配置其他任何資訊。