案例代碼:https://github.com/q279583842q/springcloud-e-book
在實際開發過程中,服務與服務之間通信經常會使用到消息中間件,而以往使用了哪個中間件比如RabbitMQ,那麼該中間件和系統的耦合性就會非常高,如果我們要替換為Kafka那麼變動會比較大,這時我們可以使用SpringCloudStream來整合我們的消息中間件,來降低系統和中間件的耦合性。
一、什麼是SpringCloudStream
官方定義 Spring Cloud Stream 是一個建構消息驅動微服務的架構。
應用程式通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 互動,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與消息中間件互動。是以,我們隻需要搞清楚如何與 Spring Cloud Stream 互動就可以友善使用消息驅動的方式。
通過使用Spring Integration來連接配接消息代理中間件以實作消息事件驅動。Spring Cloud Stream 為一些供應商的消息中間件産品提供了個性化的自動化配置實作,引用了釋出-訂閱、消費組、分區的三個核心概念。目前僅支援RabbitMQ、Kafka。
二、Stream 解決了什麼問題?
Stream解決了開發人員無感覺的使用消息中間件的問題,因為Stream對消息中間件的進一步封裝,可以做到代碼層面對中間件的無感覺,甚至于動态的切換中間件(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程
官網結構圖
三、消息驅動入門案例
我們通過一個入門案例來示範下通過stream來整合RabbitMQ來實作消息的異步通信的效果,是以首先要開啟RabbitMQ服務,RabbitMQ不清楚的請參考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404
1.建立消息發送者服務
1.1 建立項目
建立一個SpringCloud項目
1.2 pom檔案<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
</parent>
<groupId>com.bobo</groupId>
<artifactId>stream-sender</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.3 配置檔案
配置檔案中除了必要的服務名稱,端口和Eureka的資訊外我們還要添加RabbitMQ的注冊資訊
spring.application.name=stream-sender
server.port=9060
#設定服務注冊中心位址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 連結資訊
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
1.4 建立消費發送者接口
建立一個發送消息的接口。具體如下:方法名稱自定義,傳回類型必須是SubscribableChannel,在Output注解中指定交換器名稱。
/**
* 發送消息的接口
* @author dengp
*
*/
public interface ISendeService {
/**
* 指定輸出的交換器名稱
* @return
*/
@Output("dpb-exchange")
SubscribableChannel send();
}
1.5 啟動類
在啟動類中通過@EnableBinding注解綁定我們建立的接口類。
@SpringBootApplication
@EnableEurekaClient
// 綁定我們剛剛建立的發送消息的接口類型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {
public static void main(String[] args) {
SpringApplication.run(StreamSenderStart.class, args);
}
}
2.建立消息消費者服務
2.1 建立項目
2.2 pom檔案
添加的依賴和發送消息的服務是一緻的
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
</parent>
<groupId>com.bobo</groupId>
<artifactId>stream-receiver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.3 配置檔案
注意修改服務名稱和端口
spring.application.name=stream-receiver
server.port=9061
#設定服務注冊中心位址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 連結資訊
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
2.4 建立接收消息的接口
此接口和發送消息的接口相似,注意使用的是@Input注解。
/**
* 接收消息的接口
* @author dengp
*
*/
public interface IReceiverService {
/**
* 指定接收的交換器名稱
* @return
*/
@Input("dpb-exchange")
SubscribableChannel receiver();
}
2.5 建立處理消息的處理類
注意此類并不是實作上面建立的接口。而是通過@EnableBinding來綁定我們建立的接口,同時通過@StreamListener注解來監聽dpb-exchange對應的消息服務
/**
* 具體接收消息的處理類
* @author dengp
*
*/
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
@StreamListener("dpb-exchange")
public void onReceiver(byte[] msg){
System.out.println("消費者:"+new String(msg));
}
}
2.6 啟動類
同樣要添加@EnableBinding注解
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {
public static void main(String[] args) {
SpringApplication.run(StreamReceiverStart.class, args);
}
}
3.編寫測試代碼
通過單元測試來測試服務。
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import com.bobo.stream.StreamSenderStart;
import com.bobo.stream.sender.ISendeService;
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
@Autowired
private ISendeService sendService;
@Test
public void testStream(){
String msg = "hello stream ...";
// 将需要發送的消息封裝為Message對象
Message message = MessageBuilder
.withPayload(msg.getBytes())
.build();
sendService.send().send(message );
}
}
啟動消息消費者後,執行測試代碼。結果如下:
消息接收者擷取到了發送者發送的消息,同時我們在RabbitMQ的web界面也可以看到相關的資訊
總結
我們同stream實作了消息中間件的使用,我們發現隻有在兩處位址和RabbitMQ有耦合,第一處是pom檔案中的依賴,第二處是application.properties中的RabbitMQ的配置資訊,而在具體的業務進行中并沒有出現任何RabbitMQ相關的代碼,這時如果我們要替換為Kafka的話我們隻需要将這兩處換掉即可,即實作了中間件和服務的高度解耦。