天天看点

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream一、前言二、什么是Spring Cloud Stream?三、快速上手四、小结

一、前言

      在前面的博客中,基本上已经把springcloud系列的大部分都介绍过了。如果有太明白的小白,还是建议从小编的第一篇博客进行学习。

      在这篇博客中,小白向大家介绍一个消息事件驱动框架——Spring Cloud Stream。

二、什么是Spring Cloud Stream?

      首先要说说消息驱动和事件驱动:

假设系统是这样的:处理A事后还有B事要处理

  • 事件驱动,告诉处理A事的程序B事是如何做的,在A事处理完后,直接调用处理B事的程序(或接口)来处理B事。
  • 消息驱动,处理完A事,放个消息在某个地方,意思是我处理完A事了,此时,处理A的程序已经完事大吉了。至于何时,如何处理B事,由另一个程序根据那个消息来处理。

      事件模式耦合高,同模块内好用;消息模式耦合低,跨模块好用。事件模式集成其它语言比较繁琐,消息模式集成其他语言比较轻松。事件是侵入式设计,霸占你的主循环;消息是非侵入式设计,将主循环该怎样设计的自由留给用户。

      Spring Cloud Stream 就是一个可以使得微服务拥有消息驱动的能力的框架。提供了消息驱动的机制,它通过Spring Integration来连接消息中间件以实现消息驱动。并为消息中间件提供了个性化的自动化配置,引入了发布-订阅、消费组和分区三个核心概念。

三、快速上手

3.1 准备工作

      还是老样子,依旧是依托上一篇博客整理的内容,小编把上几次的demo都已经上传到git,大家可以自行下载。

      https://github.com/AresKingCarry/SpringCloudDemo

      另外既然是消息驱动,自然需要用到消息中间件,目前为止,spring cloud stream只支持rabbitmq和kafuka。这里呢,小编继续使用rabbitmq。

3.2 消息接收端

      建立一个新的springboot项目,命名为stream。

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream一、前言二、什么是Spring Cloud Stream?三、快速上手四、小结

3.3 添加对stream 的依赖

      因为使用的是rabbitmq,所以添加

spring-cloud-starter-stream-rabbit

      spring-cloud-starter-stream-rabbit依赖是Spring Cloud Stream对RabbitMQ的封装,这里边也包含了对RabbitMQ的自动化配置,比如连接的RabbitMQ的默认地址就是localhost,默认端口就是5672,默认用户名是guest,默认密码也是guest,由于我们的RabbitMQ都是采用了默认配置,所以这里的配置可以不去修改,一样也可以运行。如果小伙伴需要修改,则和上篇文章一样,直接在application.properties中修改即可。

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
           

3.4 创建接收器

      建立一个类SinkRecevier,用来接收RabbitMQ发送来的消息:

package com.wl.stream.listener;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

/**
 * Created by Ares on 2018/4/18.
 */
@EnableBinding(Sink.class)
public class SinkRecevier {
    @StreamListener(Sink.INPUT)
    public void receive(Object payload){
        System.out.println("接收到消息:"+payload);
    }
}
           

      代码说明:

@EnableBinding

注解,绑定消息通道。该注解用来指定一个或者多个定义了

@Input

@Output

注解的接口。

      在代码中,我们通过@EnableBinding(Sink.class),绑定了Sink接口,Sink接口是Spring Cloud 中默认绑定输入通道,除此之外,还有绑定输出通道Source,还有绑定输入输出通道的Processor通道。除了Spring Cloud定义的接口外,我们也可以自定义。

      @StreamListener注解是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。

      在代码中,我们通过receive方注册为input消息通道的处理方法,当监听到input消息通道的消息的时候,receive方法会运行。

3.7 修改配置文件

      在配置文件中,我们要配置要监听的rabbitmq的地址,用户名等。

server:
  port: 
spring:
  rabbitmq:
    host: 
    port: 
    username: admin
    password: admin
  application:
    name: stream
  cloud:
    stream:
      bindings:
        input:
          destination: trade
          contentType: 'application/json'
           

      其中:

      cloud.stream.bindings.input.destination: trade

      cloud.stream.bindings.input.contentType: ‘application/json’

      分别指明了要监听的队列为trade,监听的数据格式为json。

3.6 运行项目

      我们运行项目,启动代码为默认代码,不用修改,直接运行就可以了

      运行后,在mq的监控平台会发现,队列中多了一条记录,

trade.anonymous.XK5Pb5dZTwSoly1iowTDVQ

,这个就是我们监听的队列。

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream一、前言二、什么是Spring Cloud Stream?三、快速上手四、小结

      我们点击这条队列,在队列中发送一条消息:

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream一、前言二、什么是Spring Cloud Stream?三、快速上手四、小结

      在控制平台中,会看到监听打印出来的信息:

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream一、前言二、什么是Spring Cloud Stream?三、快速上手四、小结

      这样就简单的实现了接收端的搭建。下面我们进行发送端的搭建。

3.7 搭建发送端

      同样我们需要建立一个springboot项目,streamsend。

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream一、前言二、什么是Spring Cloud Stream?三、快速上手四、小结

3.8 添加依赖

      通接收端,添加对rabbitmq的依赖:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
           

3.9 编写发送端的代码

      建立一个类sender:

package com.wl.streamsend.sender;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Created by Ares on 2018/4/18.
 */
@EnableBinding(Source.class)
public class Sender {
    @InboundChannelAdapter(value = Source.OUTPUT)
    public String timerMessageSource() {
        String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        System.out.println(format);
        return format;
    }
}
           

      代码说明:

      同样用到了上文提到的注解@EnableBinding,因为是要发送消息,所以绑定了发送接口Source。

      另外,还有@InboundChannelAdapter注解,发送接口。

3.10 修改配置文件

      和接收端一样,绑定指定的队列。

server:
  port: 
spring:
  rabbitmq:
    host: 
    port: 
    username: admin
    password: admin
  application:
    name: stream-send
  cloud:
    stream:
      bindings:
        output:
          destination: trade
          contentType: 'application/json'

           

3.11 发送消息

      启动类也是默认的,没有做修改。启动项目。

      会看到接收和发送控制台打印出日志。

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream一、前言二、什么是Spring Cloud Stream?三、快速上手四、小结

四、小结

      通过这次学习,发现springcloud已经利用注解把很多代码省略,效果非常好,开发人员通过简单的配置注解,就可以简单的开发,约定大于配置。

继续阅读