天天看点

深入理解Spring事件机制(二):事件的推送[通俗易懂]

大家好,又见面了,我是你们的朋友全栈君。

前言

Spring 从

3.x

开始支持事件机制。在 Spring 的事件机制中,我们可以令一个事件类继承

ApplicationEvent

类,然后将实现了

ApplicationListener

Bean

注册到 spring 容器,最后向

ApplicationEventPublisher

推送事件对象即可令所有订阅者收到事件。在

4.2

以后,甚至不需要实现

ApplicationListener

接口,仅需在

Bean

中方法标记

@EventListener

注解即可。

笔者将基于 Spring 源码的

5.2.x

分支,分析该功能是如何实现的。

本文是其中的第二篇文章,将分析事件是如何通过广播器推送,并被监听器接收并处理的。

在开始前,推荐先阅读前文了解一点 Spring 的注解机制或者事务机制,这将更有利于流程与一些代码的理解。

相关文章:

  • 深入理解Spring事件机制(一):广播器与监听器的初始化
  • 深入理解Spring事件机制(二):事件的推送

一、事件的推送

1、将事件推送到上下文

当我们借助 Spring 发送一个事件对象的时候,一般都通过

ApplicationEventPublisher

完成,在默认情况下,通过容器获得的

ApplicationEventPublisher

单例实际上就是

ApplicationContext

本身。

ApplicationEventPublisher

提供了两个

publishEvent

方法,一个用于发布

ApplicationEvent

事件,另一个用于发布其他事件,在

AbstractApplicationContext

中,它们都通过同一个私有方法实现:

// 推送ApplicationEvent事件
@Override
public void publishEvent(ApplicationEvent event) {
    publishEvent(event, null);
}
// 推送非ApplicationEvent事件
@Override
public void publishEvent(Object event) {
    publishEvent(event, null);
}

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
    Assert.notNull(event, "Event must not be null");

    // 如果事件对象没继承ApplicationEvent,就包装为PayloadApplicationEvent
    ApplicationEvent applicationEvent;
    if (event instanceof ApplicationEvent) {
        applicationEvent = (ApplicationEvent) event;
    }
    else {
        applicationEvent = new PayloadApplicationEvent<>(this, event);
        if (eventType == null) {
            // 获得事件对象的实际类型
            eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
        }
    }

    // 如果上下文中早期事件列表的事件没清空,说明还在上下文初始化过程,还没有可用的广播器
    // 因此此时不直接广播事件,而是加入早期事件列表,等到广播器完成初始化后再推送
    if (this.earlyApplicationEvents != null) {
        this.earlyApplicationEvents.add(applicationEvent);
    }
    else {
        // 直接推送事件
        getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
    }

    // 如果有父上下文,则也向父上下文推送事件
    if (this.parent != null) {
        if (this.parent instanceof AbstractApplicationContext) {
            ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
        }
        else {
            this.parent.publishEvent(event);
        }
    }
}           

复制

这一步方法总共做了这些事:

  • 如果事件对象没有继承

    ApplicationEvent

    ,则将其包装为

    PayloadApplicationEvent

  • 若早期事件列表为空,说明还在上下文已有可用的广播器,直接通过广播器推送事件,否则就先把事件加入早期事件列表,等到广播器初始化完成后再推送;
  • 如果上下文存在父上下文,则向父上下文也推送事件;

针对早期事件列表,在容器调用

AbstractApplicationContext.refresh

方法进行初始化的过程中,早期事件列表在整个容器启动的第一个步骤

prepareRefresh

中被创建,而在非常靠后的

registerListeners

步骤中才被清空。

也就是说,当

registerListeners

还没执行前,任何向上下文推送的事件实际上都不会立刻执行,而是延迟到

registerListeners

这一步才会推送,在这一步后,向上下文推送的事件都会立刻被推送。

2、将事件推送到广播器

当上下文将事件推送到广播器时,需要调用

ApplicationEventMulticaster.multicastEvent

方法,我们以默认的实现类

SimpleApplicationEventMulticaster

为例:

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    // 获取事件的实际类型
    ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    // 获取任务执行器,
    Executor executor = getTaskExecutor();
    // 获取已经注册的监听器
    for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
        if (executor != null) {
            // 通过执行器执行广播调用过程
            executor.execute(() -> invokeListener(listener, event));
        }
        else {
            invokeListener(listener, event);
        }
    }
}           

复制

这一步共干了四件事:

  • 获取事件的实际类型;
  • 获取广播器中配置的任务执行器;
  • 通过事件的实际类型获取对应的监听器;
  • 遍历监听器,在执行器中调用监听器;

更简单的概况,就是:找到事件对应的监听器,然后依次放到执行器执行。

下面我们将详细分析监听器查询与执行的过程。

二、监听器的检索

在广播器广播事件时,会调用

getApplicationListeners

方法:

protected Collection<ApplicationListener<?>> getApplicationListeners(
    ApplicationEvent event, ResolvableType eventType) {

    // 获取事件的实际类型
    Object source = event.getSource();
    Class<?> sourceType = (source != null ? source.getClass() : null);
    // 生成监听器与事件类型的缓存key
    ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);

    // 从缓存中获取事件对应的监听器检索器,若不存在则新建并加入缓存
    CachedListenerRetriever newRetriever = null;
    CachedListenerRetriever existingRetriever = this.retrieverCache.get(cacheKey);
    if (existingRetriever == null) {
        // Caching a new ListenerRetriever if possible
        if (this.beanClassLoader == null ||
            (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
             (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
            newRetriever = new CachedListenerRetriever();
            existingRetriever = this.retrieverCache.putIfAbsent(cacheKey, newRetriever);
            if (existingRetriever != null) {
                newRetriever = null;  // no need to populate it in retrieveApplicationListeners
            }
        }
    }

    // 若存在,则从检索器中获取全部监听器
    if (existingRetriever != null) {
        Collection<ApplicationListener<?>> result = existingRetriever.getApplicationListeners();
        if (result != null) {
            return result;
        }
        // If result is null, the existing retriever is not fully populated yet by another thread.
        // Proceed like caching wasn't possible for this current local attempt.
    }

    // 从检索器中获取所需的监听器
    return retrieveApplicationListeners(eventType, sourceType, newRetriever);
}           

复制

1、监听器检索器

默认的广播器

SimpleApplicationEventMulticaster

中维护了两个检索器内部类,用于管理注册到广播器的监听器,它们有不同的用途:

  • DefaultListenerRetriever

    :用于支持监听器的注册功能;
  • CachedListenerRetriever

    :用于提供基于事件类型快速查询对应监听器的缓存功能;

DefaultListenerRetriever

DefaultListenerRetriever

主要用于支持监听器的注册,他唯一的作用就是提供

getApplicationListeners

方法:

private class DefaultListenerRetriever {

    // 监听器实例
    public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();

    // 监听器Bean名称
    public final Set<String> applicationListenerBeans = new LinkedHashSet<>();

    public Collection<ApplicationListener<?>> getApplicationListeners() {
        // 先获得Bean实例
        List<ApplicationListener<?>> allListeners = new ArrayList<>(
            this.applicationListeners.size() + this.applicationListenerBeans.size());
        allListeners.addAll(this.applicationListeners);
        
        // 再根据BeanName从BeanFactory中获取监听器Bean
        if (!this.applicationListenerBeans.isEmpty()) {
            BeanFactory beanFactory = getBeanFactory();
            for (String listenerBeanName : this.applicationListenerBeans) {
                try {
                    ApplicationListener<?> listener =
                        beanFactory.getBean(listenerBeanName, ApplicationListener.class);
                    if (!allListeners.contains(listener)) {
                        allListeners.add(listener);
                    }
                }
                catch (NoSuchBeanDefinitionException ex) {
                    // Singleton listener instance (without backing bean definition) disappeared -
                    // probably in the middle of the destruction phase
                }
            }
        }
        // 对监听器排序
        AnnotationAwareOrderComparator.sort(allListeners);
        return allListeners;
    }
}           

复制

该检索器支持直接注册

Bean

实例,或者只注册

BeanName

,当调用

getApplicationListeners

将会全量的获得已注册的监听器实例。

CachedListenerRetriever

CachedListenerRetriever

用于提供基于事件类型快速查询对应监听器的缓存功能,它总是在

DefaultListenerRetriever

创建后,向广播器推送事件的时候才会被创建,而当向广播器注册监听器时,如果已有该缓存检索器,则它们全部都会被销毁,等待下一次推送时间时再被创建,从而实现刷新缓存的功能:

private class CachedListenerRetriever {

    @Nullable
    public volatile Set<ApplicationListener<?>> applicationListeners;

    @Nullable
    public volatile Set<String> applicationListenerBeans;

    @Nullable
    public Collection<ApplicationListener<?>> getApplicationListeners() {
        Set<ApplicationListener<?>> applicationListeners = this.applicationListeners;
        Set<String> applicationListenerBeans = this.applicationListenerBeans;
        if (applicationListeners == null || applicationListenerBeans == null) {
            // Not fully populated yet
            return null;
        }

        List<ApplicationListener<?>> allListeners = new ArrayList<>(
            applicationListeners.size() + applicationListenerBeans.size());
        allListeners.addAll(applicationListeners);
        if (!applicationListenerBeans.isEmpty()) {
            BeanFactory beanFactory = getBeanFactory();
            for (String listenerBeanName : applicationListenerBeans) {
                try {
                    allListeners.add(beanFactory.getBean(listenerBeanName, ApplicationListener.class));
                }
                catch (NoSuchBeanDefinitionException ex) {
                    // Singleton listener instance (without backing bean definition) disappeared -
                    // probably in the middle of the destruction phase
                }
            }
        }
        if (!applicationListenerBeans.isEmpty()) {
            // 对监听器排序
            AnnotationAwareOrderComparator.sort(allListeners);
        }
        return allListeners;
    }
}           

复制

而每一个

CachedListenerRetriever

都会有一个对应的

ListenerCacheKey

,这个类重写了

hashCode

equals

方法,用来在 Map 集合中根据事件类型与事件 source 作为 key:

private static final class ListenerCacheKey implements Comparable<ListenerCacheKey> {
    private final ResolvableType eventType;
    @Nullable
    private final Class<?> sourceType;
}           

复制

2、获取监听器

getApplicationListeners

方法中,主要用于创建一个

ListenerCacheKey

对应的

CachedListenerRetriever

缓存实例,不过实际上此时

CachedListenerRetriever

仍然还是空的,需要在最后通过

retrieveApplicationListeners

方法从

DefaultListenerRetriever

中找到对应的监听器,然后将他们都刷进缓存:

private Collection<ApplicationListener<?>> retrieveApplicationListeners(
    ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable CachedListenerRetriever retriever) {

    List<ApplicationListener<?>> allListeners = new ArrayList<>();
    Set<ApplicationListener<?>> filteredListeners = (retriever != null ? new LinkedHashSet<>() : null);
    Set<String> filteredListenerBeans = (retriever != null ? new LinkedHashSet<>() : null);

    Set<ApplicationListener<?>> listeners;
    Set<String> listenerBeans;
    synchronized (this.defaultRetriever) {
        listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
        listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
    }

    // 遍历监听器实例,如果支持处理该事件
    for (ApplicationListener<?> listener : listeners) {
        if (supportsEvent(listener, eventType, sourceType)) {
            // 有检索器缓存
            if (retriever != null) {
                filteredListeners.add(listener);
            }
            // 无检索器缓存
            allListeners.add(listener);
        }
    }

    // 遍历仅注册了BeanName的监听器,并将其中支持处理该事件的监听器BeanFactory中取出
    if (!listenerBeans.isEmpty()) {
        ConfigurableBeanFactory beanFactory = getBeanFactory();
        for (String listenerBeanName : listenerBeans) {
            try {
                if (supportsEvent(beanFactory, listenerBeanName, eventType)) {
                    ApplicationListener<?> listener =
                        beanFactory.getBean(listenerBeanName, ApplicationListener.class);
                    if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
                        if (retriever != null) {
                            // 如果是单例Bean,可以直接取出备用
                            if (beanFactory.isSingleton(listenerBeanName)) {
                                filteredListeners.add(listener);
                            }
                            // 如果不是单例Bean,则不保证后续会不会再调整,因此只记录BeanName,等要用的时候再获取
                            else {
                                filteredListenerBeans.add(listenerBeanName);
                            }
                        }
                        allListeners.add(listener);
                    }
                }
                else {
                    // 移除不匹配该事件的监听器
                    Object listener = beanFactory.getSingleton(listenerBeanName);
                    if (retriever != null) {
                        filteredListeners.remove(listener);
                    }
                    allListeners.remove(listener);
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                // Singleton listener instance (without backing bean definition) disappeared -
                // probably in the middle of the destruction phase
            }
        }
    }

    // 对监听器排序
    AnnotationAwareOrderComparator.sort(allListeners);
    if (retriever != null) {
        if (filteredListenerBeans.isEmpty()) {
            retriever.applicationListeners = new LinkedHashSet<>(allListeners);
            retriever.applicationListenerBeans = filteredListenerBeans;
        }
        else {
            retriever.applicationListeners = filteredListeners;
            retriever.applicationListenerBeans = filteredListenerBeans;
        }
    }
    return allListeners;
}           

复制

而这里多次调用的

support

方法按顺序依次则有:

protected boolean supportsEvent(
    ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) {

    GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
                                                (GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
    return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}

private boolean supportsEvent(
    ConfigurableBeanFactory beanFactory, String listenerBeanName, ResolvableType eventType) {

    // 获取监听器类型,并判断监听器是否支持处理该事件:
    Class<?> listenerType = beanFactory.getType(listenerBeanName);
    // 1.GenericApplicationListener与SmartApplicationListener默认支持所有ApplicationEvent,
    // 因此如果监听器是上述任意一种,直接返回ture
    if (listenerType == null || GenericApplicationListener.class.isAssignableFrom(listenerType) ||
        SmartApplicationListener.class.isAssignableFrom(listenerType)) {
        return true;
    }
    // 2.获取监听器ApplicationListener的类泛型,判断是否为该类事件,不是就直击返回false
    if (!supportsEvent(listenerType, eventType)) {
        return false;
    }
    try {
        // 3.如果还不行,尝试直接从BeanFactory获取BeanDefinition信息,然后转为ApplicationListener后再获取它在类泛型上声明的事件类型
        BeanDefinition bd = beanFactory.getMergedBeanDefinition(listenerBeanName);
        ResolvableType genericEventType = bd.getResolvableType().as(ApplicationListener.class).getGeneric();
        // 如果能获得事件类型就判断,不能说明实际上支持所有类型,都返回ture
        return (genericEventType == ResolvableType.NONE || genericEventType.isAssignableFrom(eventType));
    }
    catch (NoSuchBeanDefinitionException ex) {
        // Ignore - no need to check resolvable type for manually registered singleton
        return true;
    }
}

protected boolean supportsEvent(Class<?> listenerType, ResolvableType eventType) {
    // 获取在监听器泛型声明的事件类型,确认这个事件类型是否与当前事件为一类事件
    ResolvableType declaredEventType = GenericApplicationListenerAdapter.resolveDeclaredEventType(listenerType);
    return (declaredEventType == null || declaredEventType.isAssignableFrom(eventType));
}

static ResolvableType resolveDeclaredEventType(Class<?> listenerType) {
    ResolvableType eventType = eventTypeCache.get(listenerType);
    if (eventType == null) {
        // 将监听器转为ApplicationListener,获取其类泛型声明的事件类型
        eventType = ResolvableType.forClass(listenerType).as(ApplicationListener.class).getGeneric();
        eventTypeCache.put(listenerType, eventType);
    }
    return (eventType != ResolvableType.NONE ? eventType : null);
}           

复制

这里的逻辑有点长,不过主要目标还是很清晰的:

  • 先根据事件的类型和

    source

    获取

    CachedListenerRetriever

    缓存,如果没缓存就先创建;
  • DefaultListenerRetriever

    获取全部监听器:
    1. 如果能获取实例,就直接获得实例;
    2. 不能获得实例,就通过

      BeanName

      BeanFactory

      里取;
  • 判断这些监听器支不支持处理当前推送的事件:
    1. 如果能拿到实例,并且如果实现了

      GenericApplicationListener

      ,就直接调用

      supportsEventType

      supportsSourceType

      方法判断;
    2. 如果不能拿到实例,那就尝试拿到实际的类型,并且通过泛型确认是否支持当前推送的事件;
  • 获得全部支持该事件的监听器后,再将其刷入

    CachedListenerRetriever

    缓存,下次再来就直接从缓存里头取;

三、监听器的执行

监听器的执行分为两步,第一步是在广播器中调用监听器的

onApplicationEvent

,第二步是在

onApplicationEvent

调用真正的处理逻辑,这里根据监听器类型的不同而具有不同的实现,这里我们重点关注注解式监听器。

1、广播器的广播

监听器的执行对应

multicastEvent

中的这一段代码:

// 获取任务执行器,
Executor executor = getTaskExecutor();
// 获取已经注册的监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
    if (executor != null) {
        // 通过执行器异步执行广播调用过程
        executor.execute(() -> invokeListener(listener, event));
    }
    else {
        // 如果通过当前线程执行广播调用过程
        invokeListener(listener, event);
    }
}

protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
    ErrorHandler errorHandler = getErrorHandler();
    if (errorHandler != null) {
        try {
            // 发生异常时调用
            doInvokeListener(listener, event);
        }
        catch (Throwable err) {
            errorHandler.handleError(err);
        }
    }
    else {
        doInvokeListener(listener, event);
    }
}

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
    try {
        // 调用 onApplicationEvent 方法
        listener.onApplicationEvent(event);
    }
    catch (ClassCastException ex) {
        String msg = ex.getMessage();
        if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
            // Possibly a lambda-defined listener which we could not resolve the generic event type for
            // -> let's suppress the exception and just log a debug message.
            Log logger = LogFactory.getLog(getClass());
            if (logger.isTraceEnabled()) {
                logger.trace("Non-matching event type for listener: " + listener, ex);
            }
        }
        else {
            throw ex;
        }
    }
}           

复制

这里的逻辑非常简单:

  • 获取全部支持处理该事件的监听器,然后依次调用

    ApplicationListener.onApplicationEvent

    方法;
  • 如果广播器有设置线程池,则调用监听器的时候就在线程池里调用,否则在当前线程里调用;
  • 当调用发生异常时,就调用广播器中注册的异常处理器

    ErrorHandler

2、普通监听器的执行

我们以最基本的注解式监听器

ApplicationListenerMethodAdapter

为例,当调用它的

onApplicationEvent

方法时:

@Override
public void onApplicationEvent(ApplicationEvent event) {
    processEvent(event);
}

public void processEvent(ApplicationEvent event) {
    Object[] args = resolveArguments(event);
    if (shouldHandle(event, args)) {
        Object result = doInvoke(args);
        if (result != null) {
            handleResult(result);
        }
        else {
            logger.trace("No result object given - no result to handle");
        }
    }
}           

复制

条件判断

此过程平平无奇,其中

shouldHandle

方法用于根据在

@EventListener

中的

condition

属性指定的

SpEL

表达式确定是否需要真正调用注解方法:

private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
    if (args == null) {
        return false;
    }
    String condition = getCondition();
    if (StringUtils.hasText(condition)) {
        Assert.notNull(this.evaluator, "EventExpressionEvaluator must not be null");
        return this.evaluator.condition(
            condition, event, this.targetMethod, this.methodKey, args, this.applicationContext);
    }
    return true;
}           

复制

至于

evaluator

则是 Spring 基于

SpEL

表达式封装的一个用于事件的工具类

EventExpressionEvaluator

,关于这个可以单独了解

SpelExpressionParser

StandardEvaluationContext

Expression

这三个类,或者直接了解 Spring 的

SpEL

表达式相关功能。

3、返回值支持

相比于普通的编程式监听器,注解式监听器还会多处一步对返回值的处理。我们以

ApplicationListenerMethodAdapter

为例,当

doInvoke

以后,若注解方法返回值不为null,则会尝试通过

handleResult

对返回值进行处理:

protected void handleResult(Object result) {
    if (reactiveStreamsPresent && new ReactiveResultHandler().subscribeToPublisher(result)) {
        if (logger.isTraceEnabled()) {
            logger.trace("Adapted to reactive result: " + result);
        }
    }
    // 1、返回值是CompletionStage
    else if (result instanceof CompletionStage) {
        ((CompletionStage<?>) result).whenComplete((event, ex) -> {
            // 发生异常
            if (ex != null) {
                handleAsyncError(ex);
            }
            // 当完成后若有返回值,则继续尝试处理返回值
            else if (event != null) {
                publishEvent(event);
            }
        });
    }
    // 2、返回值是ListenableFuture
    else if (result instanceof ListenableFuture) {
        ((ListenableFuture<?>) result).addCallback(this::publishEvents, this::handleAsyncError);
    }
    // 3、返回值是普通对象、数组或者集合
    else {
        publishEvents(result);
    }
}           

复制

当监听器方法有返回值的时候,这里有三种处理:

  • 返回值是

    CompletionStage

    ,继续完成下一步异步调用;
  • 返回值是

    ListenableFuture

    ,调用回调方法;
  • 返回值返回值是对象、数组或集合,尝试作为将其作为事件对象发送;

返回值是CompletionStage

当看到了

CompletionStage

的时候,很容易联想到基于它实现的

CompletableFuture

。它表示一个待完成的异步任务,在

ApplicationListenerMethodAdapter

中,监听器会通过如下代码,为其追加任务完成后的回调:

// 当这一步完成后,获取执行结构
((CompletionStage<?>) result).whenComplete((event, ex) -> {
    // 如果发生异常
    if (ex != null) {
        handleAsyncError(ex);
    }
    // 当完成后若有返回值,则继续尝试处理返回值
    else if (event != null) {
        publishEvent(event);
    }
});           

复制

返回值是ListenableFuture

ListenableFuture

也是一个异步任务回调接口,它的用法与 Guava 中的

ListenableFuture

几乎完全一致,处理的逻辑与上文的返回值是

CompletionStage

的情况也完全一致,就是追加任务完成后的回调:

((ListenableFuture<?>) result).addCallback(this::publishEvents, this::handleAsyncError);           

复制

当任务的返回值不为 null 的时候,就尝试处理它的返回值。

返回值是普通对象、数组或集合

当对象是普通对象的时候,监听器会尝试将返回值也作为一个事件推送。而如果是数组或者集合,会先将其摊平,然后把其中的每一个元素都取出尝试作为事件推送:

private void publishEvents(Object result) {
    // 如果是数组,就将数组取出然后依次作为事件发送
    if (result.getClass().isArray()) {
        Object[] events = ObjectUtils.toObjectArray(result);
        for (Object event : events) {
            publishEvent(event);
        }
    }
    // 如果是集合,也平铺后依次将其中元素作为事件发送
    else if (result instanceof Collection<?>) {
        Collection<?> events = (Collection<?>) result;
        for (Object event : events) {
            publishEvent(event);
        }
    }
    // 直接将返回值作为事件发送
    else {
        publishEvent(result);
    }
}

private void publishEvent(@Nullable Object event) {
    if (event != null) {
        Assert.notNull(this.applicationContext, "ApplicationContext must not be null");
        this.applicationContext.publishEvent(event);
    }
}           

复制

4、事务支持

由于编程式监听器天生就是支持事务的,因此 Spring 只单纯为注解式监听器专门准备的事务监听器方法适配器

ApplicationListenerMethodTransactionalAdapter

,它由监听器工厂

TransactionalEventListenerFactory

生产,与一个

@TransactionalEventListener

注解对应。

@TransactionalEventListener

@TransactionalEventListener

是一个基于 Spring 元注解机制扩展的注解,它在

@EventListener

的基础上扩展了一些事务相关的配置:

@EventListener
public @interface TransactionalEventListener {
    
    // 当当前事务进行到哪个阶段时,调用该监听器,若fallbackExecution为true时无意义
    TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
    
    // 当前没有事务的时候,是否调用该监听器
    boolean fallbackExecution() default false;
    
    // 监听的事件类型
    @AliasFor(annotation = EventListener.class, attribute = "classes")
    Class<?>[] value() default {};
    @AliasFor(annotation = EventListener.class, attribute = "classes")
    Class<?>[] classes() default {};
    
    // 用于判断是否处理事件的SpEL表达式
    String condition() default "";

}           

复制

ApplicationListenerMethodTransactionalAdapter

事务监听器方法适配器

ApplicationListenerMethodTransactionalAdapter

继承了非事务的适配器

ApplicationListenerMethodAdapter

,但是基于

TransactionSynchronization

在监听器的

onApplicationEvent

方法做了一些事务的处理:

class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {

    private final TransactionalEventListener annotation;


    public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
        super(beanName, targetClass, method);
        TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
        if (ann == null) {
            throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);
        }
        this.annotation = ann;
    }


    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        // 如果全局事务管理器可以用,则创建一个TransactionSynchronizationEventAdapter同步事务,
        // 并注册到全局事务管理器
        if (TransactionSynchronizationManager.isSynchronizationActive() &&
            TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
        }
        // 如果全局事务管理器不可用,并且在注解中配置了fallbackExecution为true,即没有事务也调用监听器
        else if (this.annotation.fallbackExecution()) {
            if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
                logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
            }
            // 处理器事件
            processEvent(event);
        }
        else {
            // No transactional event execution at all
            if (logger.isDebugEnabled()) {
                logger.debug("No transaction is active - skipping " + event);
            }
        }
    }

    private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {
        return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase());
    }


    private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {

        private final ApplicationListenerMethodAdapter listener;

        private final ApplicationEvent event;

        private final TransactionPhase phase;

        public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
                                                      ApplicationEvent event, TransactionPhase phase) {

            this.listener = listener;
            this.event = event;
            this.phase = phase;
        }

        @Override
        public int getOrder() {
            return this.listener.getOrder();
        }

        @Override
        public void beforeCommit(boolean readOnly) {
            // 如果注解中配置了调用阶段为TransactionPhase.BEFORE_COMMIT,则事务提交前调用
            if (this.phase == TransactionPhase.BEFORE_COMMIT) {
                processEvent();
            }
        }

        @Override
        public void afterCompletion(int status) {
            // 事务提交时调用
            if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
                processEvent();
            }
            // 事务回滚时调用
            else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
                processEvent();
            }
            // 事务提交完毕后调用
            else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
                processEvent();
            }
        }

        protected void processEvent() {
            this.listener.processEvent(this.event);
        }
    }

}           

复制

这里的逻辑也很直白:

  • 当调用方法时

    onApplicationEvent

    方法时,判断事务是否可用;
  • 事务不可用,并且

    @TransactionalEventListener

    fallbackExecution

    属性为

    true

    ,则直接调用注解放方法;
  • 若事务可用,则创建一个实现了

    TransactionSynchronization

    接口的事务同步操作

    TransactionSynchronizationEventAdapter

    ,并将其注册到

    TransactionSynchronizationManager

  • TransactionSynchronizationManager

    在事务进行到指定阶段后,会调用

    TransactionSynchronizationEventAdapter

    的对应回调方法;
  • TransactionSynchronizationEventAdapter

    在回调方法中,再确认当前事务阶段与在

    @TransactionalEventListener

    phase

    属性指定的阶段是否一致,若一致则调用注解方法;

因此此处也不难理解为什么广播器进行广播的时候,若指定了线程池则事务会失效了,因为具体到监听器适配器调用时,通过

TransactionSynchronizationManager

注册的事务是当前线程池中的工作线程的事务,而调用广播器的主线程的事务与其不是一个事务,因此监听器中事务回滚不会令主线程的事务一并回滚。

总结

当我们向

ApplicationContext

推送事件时:

  • ApplicationContext

    会将事件推送至内部持有的广播器实例

    SimpleApplicationEventMulticaster

  • SimpleApplicationEventMulticaster

    在内部维护了两个检索器,分别是

    DefaultListenerRetriever

    CachedListenerRetriever

    前者用于提供监听器注册和全量查询功能,后者用于提供基于事件类型与事件源类型对监听器进行快速查询的缓存,

    当推送事件时,会先通过

    DefaultListenerRetriever

    全量查询,然后将其加入

    CachedListenerRetriever

    缓存,下次查询时直接从缓存中获取订阅了事件的监听器;
  • 当获得订阅了事件的监听器后,会尝试调用监听器的

    onApplicationEvent

    方法:
    1. 若广播器中注册了线程池,则会直接把操作提交到线程池中执行;
    2. 若广播器中没有注册线程,则会直接在当前线程执行;
  • 监听器被调用的时候,处理基本内的事件处理,而注解时监听器还额外支持一些功能,比如:
    1. 如果使用了

      @TransactionalEventListener

      注解会生成支持事务的监听器

      ApplicationListenerMethodTransactionalAdapter

      ,该监听器会在调用前向事务管理器注册同步事务,从而获得事务支持;
    2. 默认的注解式监听器生成

      ApplicationListenerMethodAdapter

      这种类型的监听器,该类监听器在绑定的注解方法有返回值时,会尝试将返回值也作为一个事件发送,而如果是集合或者数组,则会摊平后尝试将每一个元素都作为事件发生;

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/170757.html原文链接:https://javaforall.cn