天天看点

【Spring Cloud】分布式必学springcloud(十二)——结合SpringCloud Stream 异步调用一、前言二、什么是事件?三、异步发送四、小结

一、前言

      在上一篇博客中,小编向大家介绍了一下SpringCloud Stream,相信实践过的老铁们,肯定是能收发消息了。但是有一个问题,比如我的发送端,使用的是

@InboundChannelAdapter(value = Source.OUTPUT)

,会一直的向消息队列中发消息。在真正业务场景中,基本不会出现。下面小编就想大家介绍一下,真正的项目中通过发布事件来触发发送消息到消息队列中。

二、什么是事件?

      事件,Event。

      当一个事件发生的时候,如果有人关注这个事件,那么他会做出一定的动作。比如,假定七夕情人节马上就要到了,这个就是一个事件,关注着这个节日的情侣们,必定会导致旅馆宾馆的房源紧张,同时也会促进某种产品的销量。

      在代码中呢,当我们处理完一个充值后,我们需要向客户发送一个通知,告诉他,充值完毕。充值完成后,就是发生的时间,发送通知是要做的动作,而且这个动作是异步完成的。这种场景是非常常见的。

      在springboot中,提供了监听事件 ApplicationListener 和 ApplicationEvent 。

(1)自定义事件:继承ApplicationEvent

(2)定义事件监听器:实现ApplicationListener

(3)使用容器发布事件

三、异步发送

3.1 自定义事件

package com.zero.event;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.springframework.context.ApplicationEvent;

/**
 * 异步通知事件
 */
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = false)
public class FeeResponseEvent extends ApplicationEvent {

    public FeeResponseEvent(Object source) {
        super(source);
    }
}
           

3.2 定义事件监听器:实现ApplicationListener

package com.zero.event;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * Created by Ares on 2018/4/18.
 */
@Component
@EnableBinding(Source.class)
public class Sender implements ApplicationListener<FeeResponseEvent> {

    @Autowired
    private MessageChannel output;


    /**
     * 发送事件消息到消息队列中-王雷-2018年5月18日16:20:22
     * @param event 费用事件
     */
    private void sendMessageToWorker(FeeResponseEvent event){
        output.send(MessageBuilder.withPayload(event.getSource()).build());
        System.out.println("发送成功:"+event.getSource());

    }

    /**
     * 当有发布事件发生时,执行此方法-王雷-2018年5月18日16:19:52
     * @param feeResponseEvent 费用自定义事件
     */
    @Override
    public void onApplicationEvent(FeeResponseEvent feeResponseEvent) {
        this.sendMessageToWorker(feeResponseEvent);
    }
}
           

      补充:ApplicationListener支持泛型,上面的代码是泛型的写法,下面的是非泛型的写法。

@Component
//这里注意我们直接把监听类注册成组件
public class MessageListener implements ApplicationListener {

      private final Logger logger = LoggerFactory.getLogger(MessageListener.class);

      @Autowired
      private APIMessageService apiMessageService;


      @Async
      @Override
      public void onApplicationEvent(ApplicationEvent applicationEvent) {

        if (applicationEvent instanceof ApplyEvent) {
            //做处理
        }  
    }
}
           

3.3 发布事件

@Autowired
    ApplicationContext context;
     /**
     * 发布事件-王雷-2018年5月18日16:18:56
     */
    @Override
    public void feeMessage() {
        FeeResponseEvent event = new FeeResponseEvent("ares");
        context.publishEvent(event);
    }
           

3.4 使用spring cloud stream 发送消息

      建立消息发送器:

package com.zero.service;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 费用事件发送器
 */
public interface SinkSender {
    String FEE="fee";
    /**
     * 费用计算完成事件消息
     * @return
     */
    @Output(FEE)
    MessageChannel feeMessage();


}
           

3.5 配置文件配置

spring:
  rabbitmq:
    host: *2.*1.**.
    port: 
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: fee
          contentType: 'application/json'
           

3.6 发送测试

      启动服务后,调用发布事件,会引起事件监听监听到,然后会向mq中放数据。

3.7 使用 spring cloud stream 接收消息

      配置监听器:

      配置监听器,一直监听Exchange名字为fee的。当监听到有消息的时候,就会调用下面指定的方法进行相关的操作。

package com.zero.service;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

/**
 * Created by Ares on 2018/5/18.
 */
public interface SinkIn {
    String INPUT = "fee";

    @Input("fee")
    MessageChannel input();
}
           
package com.zero.listener;

import com.zero.service.SinkIn;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * 接收队列消息
 * @author Ares
 *
 */
@Component
@EnableBinding(SinkIn.class)
public class ReceiverMessage {

    @StreamListener(SinkIn.INPUT)
    public void receive(Object playload) {

        System.out.println("接收消息:" + playload);

    }

}
           

3.8 异步设置

      启异步配置在配置类中添加注解@EnableAsync

package com.zero;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync
public class FeeMasterWebApplication {

    private static final Logger log = LoggerFactory.getLogger(FeeMasterWebApplication.class);

    public static void main(String[] args) {
        log.info("费用模块提供者启动....");
        SpringApplication.run(FeeMasterWebApplication.class, args);
        log.info("费用模块提供者启动成功....");

    }
}
           

      通过MessageListener 在onApplicationEvent上添加注解@Async,@Async也可以添加在类上。

/**
     * 当有发布事件发生时,执行此方法-王雷-2018年5月18日16:19:52
     * @param feeResponseEvent 费用自定义事件
     */
    @Override
    @Async
    public void onApplicationEvent(FeeResponseEvent feeResponseEvent) {
        this.sendMessageToWorker(feeResponseEvent);
    }
}
           

四、小结

      通过这次的使用,有一种思想可以提一下,在做架构设计的时候,必定会涉及到异步操作,所以我们可以把设计到异步操作的提取出来,单独做一个服务。这样就可以解耦合。

继续阅读