Spring的事件机制提供了一种低耦合、无侵入的解决方式,它由三部分组成:

  • ApplicationEventPublisher:发布器,让容器拥有发布应用上下文事件的功能,包括容器启动事件、关闭事件等。
  • ApplicationListener:监听器 ,可以接收到容器事件 , 并对事件进行响应处理 。
  • ApplicationEventMulticaster:事件多播器,它负责保存所有监听器,以便在容器产生上下文事件时通知这些事件监听者。
  • ApplicationEvent:具体的事件

图片来自:https://juejin.cn/post/7140849555607650335#heading-10

1 Spring中的内置事件

  • ContextRefreshedEvent:在refresh()执行完成时触发,通知容器刷新完成。
    • 很适合我们做一些系统启动后的准备工作,此时我们就可以监听该事件,作为系统启动后初始预热的契机。
  • ContextStartedEventConfigurableApplicationContextstart()执行完成时触发
    • 该事件的触发是所有的单例bean创建完成后发布,此时实现了Lifecycle接口的bean还没有回调start(),当这些start()被调用后,才会发布ContextStartedEvent事件。
  • ContextClosedEventConfigurableApplicationContextclose()执行完成时触发
    • 此时IOC容器已经关闭,但尚未销毁所有的bean。
  • ContextStoppedEventConfigurableApplicationContextstop()执行完成时触发

2 自定义事件的使用

  • 定义事件

    自定义事件在使用上很简单,继承 ApplicationEvent 即可:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class MyApplicationEvent extends ApplicationEvent {
    private Long id;
    public MyApplicationEvent(Long id) {
    super(id);
    this.id = id;
    }

    public Long getId() {
    return id;
    }
    }

  • 发布自定义事件

    ApplicationContext继承了ApplicationEventPublisher,因此也拥有消息发布的能力:

    1
    applicationContext.publishEvent(new MyApplicationEvent(1L));
  • 定义监听器

    监听器需要实现ApplicationListener接口,onApplicationEvent()负责处理具体的事件

    注意:需要通过泛型参数指定处理的事件类型

    1
    2
    3
    4
    5
    6
    7
    8
    public class MyEventListener implements ApplicationListener<MyApplicationEvent> {

    @Override
    public void onApplicationEvent(MyApplicationEvent event) {
    System.out.println(event.getSource());
    }
    }

3 ApplicationListener注册过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void refresh() throws BeansException, IllegalStateException {
try {
...
// 初始化各种监听器
this.registerListeners();
}
}

// 注册监听器
protected void registerListeners() {
// 1: 处理context.addApplicationListener() 方式注册的监听器,并将监听器注册到广播器中,
for (ApplicationListener<?> listener : this.getApplicationListeners()) {
this.getApplicationEventMulticaster().addApplicationListener(listener);
}

// 2: 去Spring容器中获取监听器(处理扫描的或者register方式注册的),同样也是添加到广播器中
String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
}

值得注意的是,IoC容器会扫描继承了ApplicationListener的监听器作为Bean并注册到容器中,但通过@EventListener方式注册的是一个方法,监听者应该是一个实例对象,那它是怎么被识别并注册呢?

3.1 @EventListener是如何工作的?

每次面对这种不同的注册方式时,适配器模式就要登场了。@EventListener的扫描和注册过程我觉得也很有参考意义,在此记录一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void refresh() throws BeansException, IllegalStateException {
try {
..
// 普通单例Bean的实例化
this.finishBeanFactoryInitialization(beanFactory);
}
}

protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) {
...
// 初始化非延迟加载的单例bean
beanFactory.preInstantiateSingletons();
}

@Override
public void preInstantiateSingletons() throws BeansException {
List<String> beanNames = new ArrayList<>(this.beanDefinitionNames);
// 1: 完成bean的实例化
for (String beanName : beanNames) {
// 调用BeanFactory尝试获取Bean,获取不到会创建Bean
getBean(beanName);
}

// 2: 调用bean的后置处理方法
// Trigger post-initialization callback for all applicable beans...
for (String beanName : beanNames) {
// singletonInstance instanceof SmartInitializingSingleton
smartSingleton.afterSingletonsInstantiated();
}
}

这里的smartSingleton.afterSingletonsInstantiated()会触发所有适用 bean 的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// EventListenerMethodProcessor.java
public void afterSingletonsInstantiated() {
for (String beanName : beanNames) {
processBean(beanName, type);
}
}

private void processBean(final String beanName, final Class<?> targetType) {
// 1: 解析bean上加了@EventListener的方法
Map<Method, EventListener> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
}
...
// 2: 遍历加了@EventListener的方法,注册为事件监听器
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
// 2.1 通过EventListenerFactory,将方法创建为监听器实例(ApplicationListenerMethodAdapter)
ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
// 2.2 注册为ApplicationListener
context.addApplicationListener(applicationListener);
break;
}
}
}
}

总结一下流程:在容器中所有的bean实例化后,会再次遍历所有bean,调用afterSingletonsInstantiated()的方法,此时符合条件的EventListenerMethodProcessor就会被调用

关于@EventListener标注方法的解析时机,笔者首先想到的应该和@Bean的处理时机一致,在扫描类的时候,就解析出来加了@EventListener的方法,抽象为BeanDefinition放到容器中,后面实例化时候,和正常扫描出来的bean是一样的实例化流程。但是查找下来发现Spring并没有这样处理,而是在bean初始化后回调阶段处理的。究其原因,大概是@Bean真的是需要托付给Spring管理,而@EventListener只是一个标识,无需放入放入容器,防止对完暴露所致吧。

4 Spring如何广播消息?

ApplicationEventMulticaster负责事件的分发,我们可以看一下它的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// SimpleApplicationEventMulticaster.java
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
...
// getApplicationListeners 获取符合的监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 执行每个监听器的逻辑
// 注意,这里是同步的
invokeListener(listener, event);
}
}

// invokeListener最后调用doInvokeListener
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
// 调用监听器的onApplicationEvent方法进行处理
listener.onApplicationEvent(event);
}
}

getApplicationListeners()找到了所有匹配的监听器,我们继续跟踪看一下是如何进行事件匹配的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
// 省略缓存相关代码
return retrieveApplicationListeners(eventType, sourceType, newRetriever);
}

private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable CachedListenerRetriever retriever) {
// 1: 获取所有的ApplicationListener
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.defaultRetriever) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}

for (ApplicationListener<?> listener : listeners) {
// 2: 遍历判断是否匹配
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
filteredListeners.add(listener);
}
allListeners.add(listener);
}
}
}

protected boolean supportsEvent(
ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) {
GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
(GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
// supportsEventType 根据ApplicationListener的泛型, 和事件类型,看是否匹配
// supportsSourceType 根据事件源类型,判断是否匹配
return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}

以上就是ApplicationEventMulticaster分发事件的大致逻辑,本质上就是通过泛型查找到所有匹配的监听器,然后在一个for循环中调用监听器的事件处理方法

但是这里也有一个问题,这种方式事件的处理是同步的,可能会存在发布阻塞的问题,并且有性能问题。有没有办法能够使用异步机制来处理消息呢?

5 异步处理事件

Spring提供了两种异步处理事件的方式:

5.1 给ApplicationEventMulticaster添加线程池

通过setTaskExecutor()方法可以给ApplicationEventMulticaster设置线程池,这样的做法是全局生效的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
// 获取执行线程池
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 如果存在线程池,使用线程池异步执行
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
// 如果不存在线程池,同步执行
else {
invokeListener(listener, event);
}
}
}

public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}

5.2 使用@Async

还有一种方式是通过给监听者的onApplicationEvent()添加@Async注解使其变成异步的,这种方式只对单个方法生效,使用方式就跟我们平时定义的异步方法一样,这里就不多赘述了

6 全局异常处理

ApplicationEventMulticaster分发事件的时候发生异常怎么办,后面的监听器还能执行吗?

Spring事件的处理,默认是同步依次执行。那如果前面的监听器出现了异常,并且没有处理异常,会对后续的监听器还能顺利接收该事件吗?其实不能的,因为异常中断了事件的发送了

如果设置了异步执行,因为不是一个线程执行,是不会互相影响的。

Spring提供了ErrorHandler来方便我们对消息处理异常进行统一处理:

1
2
3
ApplicationEventMulticaster multicaster = context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
((SimpleApplicationEventMulticaster) multicaster).setErrorHandler(t -> System.out.println(t));

7 总结

Spring事件机制存在什么问题?

我认为主要的问题还是公共线程池造成的问题:

  1. 发布阻塞

    监听器的执行速度会互相影响、甚至会发生阻塞。假如某一个监听器执行的很慢,把线程池中线程都占用了,此时其他的事件虽然发布但没有资源执行,只能在缓存队列等待线程释放
    其实这里可以参考Netty的boss-work工作模型,广播器只负责分发事件,调度执行监听器的逻辑交给由具体的work线程负责会更合适。

  2. 无法定制监听器执行线程池

    由于每种事件产生的数量、处理逻辑、处理速度差异化可能很大,所以每个监听器都有适合自己场景的线程数,为每个监听器配置单独的线程池尤为重要。Spring事件机制无法单独为监听器设置线程池,只能共用线程池,无法做到精准控制,线程拥堵或者线程浪费出现的几率极大

    虽然我们也可以在接收到事件后使用自定义的线程池处理,但是我们更希望简单化配置就能支持


参考: