天天看點

Spring cloud stream【入門介紹】

案例代碼:https://github.com/q279583842q/springcloud-e-book

 在實際開發過程中,服務與服務之間通信經常會使用到消息中間件,而以往使用了哪個中間件比如RabbitMQ,那麼該中間件和系統的耦合性就會非常高,如果我們要替換為Kafka那麼變動會比較大,這時我們可以使用SpringCloudStream來整合我們的消息中間件,來降低系統和中間件的耦合性。

一、什麼是SpringCloudStream

 官方定義 Spring Cloud Stream 是一個建構消息驅動微服務的架構。

 應用程式通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 互動,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與消息中間件互動。是以,我們隻需要搞清楚如何與 Spring Cloud Stream 互動就可以友善使用消息驅動的方式。

 通過使用Spring Integration來連接配接消息代理中間件以實作消息事件驅動。Spring Cloud Stream 為一些供應商的消息中間件産品提供了個性化的自動化配置實作,引用了釋出-訂閱、消費組、分區的三個核心概念。目前僅支援RabbitMQ、Kafka。

二、Stream 解決了什麼問題?

 Stream解決了開發人員無感覺的使用消息中間件的問題,因為Stream對消息中間件的進一步封裝,可以做到代碼層面對中間件的無感覺,甚至于動态的切換中間件(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程

官網結構圖

Spring cloud stream【入門介紹】
Spring cloud stream【入門介紹】

三、消息驅動入門案例

 我們通過一個入門案例來示範下通過stream來整合RabbitMQ來實作消息的異步通信的效果,是以首先要開啟RabbitMQ服務,RabbitMQ不清楚的請參考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404

1.建立消息發送者服務

1.1 建立項目

 建立一個SpringCloud項目

Spring cloud stream【入門介紹】
1.2 pom檔案

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.13.RELEASE</version>
    </parent>
    <groupId>com.bobo</groupId>
    <artifactId>stream-sender</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Dalston.SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>      

1.3 配置檔案

 配置檔案中除了必要的服務名稱,端口和Eureka的資訊外我們還要添加RabbitMQ的注冊資訊

spring.application.name=stream-sender
server.port=9060
#設定服務注冊中心位址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 連結資訊
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/      

1.4 建立消費發送者接口

 建立一個發送消息的接口。具體如下:方法名稱自定義,傳回類型必須是SubscribableChannel,在Output注解中指定交換器名稱。

/**
 * 發送消息的接口
 * @author dengp
 *
 */
public interface ISendeService {

    /**
     * 指定輸出的交換器名稱
     * @return
     */
    @Output("dpb-exchange")
    SubscribableChannel send();
}      

1.5 啟動類

 在啟動類中通過@EnableBinding注解綁定我們建立的接口類。

@SpringBootApplication
@EnableEurekaClient
// 綁定我們剛剛建立的發送消息的接口類型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {

    public static void main(String[] args) {
        SpringApplication.run(StreamSenderStart.class, args);
    }
}      

2.建立消息消費者服務

2.1 建立項目

Spring cloud stream【入門介紹】

2.2 pom檔案

 添加的依賴和發送消息的服務是一緻的

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.13.RELEASE</version>
    </parent>
    <groupId>com.bobo</groupId>
    <artifactId>stream-receiver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Dalston.SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>      

2.3 配置檔案

 注意修改服務名稱和端口

spring.application.name=stream-receiver
server.port=9061
#設定服務注冊中心位址,指向另一個注冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 連結資訊
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/      

2.4 建立接收消息的接口

 此接口和發送消息的接口相似,注意使用的是@Input注解。

/**
 * 接收消息的接口
 * @author dengp
 *
 */
public interface IReceiverService {

    /**
     * 指定接收的交換器名稱
     * @return
     */
    @Input("dpb-exchange")
    SubscribableChannel receiver();
}
      

2.5 建立處理消息的處理類

 注意此類并不是實作上面建立的接口。而是通過@EnableBinding來綁定我們建立的接口,同時通過@StreamListener注解來監聽dpb-exchange對應的消息服務

/**
 * 具體接收消息的處理類
 * @author dengp
 *
 */
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {

    @StreamListener("dpb-exchange")
    public void onReceiver(byte[] msg){
        System.out.println("消費者:"+new String(msg));
    }
}      

2.6 啟動類

 同樣要添加@EnableBinding注解

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {

    public static void main(String[] args) {
        SpringApplication.run(StreamReceiverStart.class, args);
    }
}      

3.編寫測試代碼

 通過單元測試來測試服務。

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import com.bobo.stream.StreamSenderStart;
import com.bobo.stream.sender.ISendeService;

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
    
    @Autowired
    private ISendeService sendService;

    @Test
    public void testStream(){
        String msg = "hello stream ...";
        // 将需要發送的消息封裝為Message對象
        Message message = MessageBuilder
                                .withPayload(msg.getBytes())
                                .build();
        sendService.send().send(message );
    }
}      

啟動消息消費者後,執行測試代碼。結果如下:

Spring cloud stream【入門介紹】

消息接收者擷取到了發送者發送的消息,同時我們在RabbitMQ的web界面也可以看到相關的資訊

Spring cloud stream【入門介紹】

總結

 我們同stream實作了消息中間件的使用,我們發現隻有在兩處位址和RabbitMQ有耦合,第一處是pom檔案中的依賴,第二處是application.properties中的RabbitMQ的配置資訊,而在具體的業務進行中并沒有出現任何RabbitMQ相關的代碼,這時如果我們要替換為Kafka的話我們隻需要将這兩處換掉即可,即實作了中間件和服務的高度解耦。