Spring 中的事件机制
本文内容纲要:
说到事件机制,可能脑海中最先浮现的就是日常使用的各种listener,listener去监听事件源,如果被监听的事件有变化就会通知listener,从而针对变化做相应的动作。这些listener是怎么实现的呢?说listener之前,我们先从设计模式开始讲起。
观察者模式
观察者模式一般包含以下几个对象:
- Subject:被观察的对象。它提供一系列方法来增加和删除观察者对象,同时它定义了通知方法notify()。目标类可以是接口,也可以是抽象类或具体类。
- ConcreteSubject:具体的观察对象。Subject的具体实现类,在这里实现通知事件。
- Observer:观察者。这里是抽象的观察者,观察者有一个或者多个。
- ConcreteObserver:具体的观察者。在这里维护观察对象的具体操作。
按照观察者对象,我们来写一个简单的观察者示例,定义一个气象中心,发布气象信息,观察者是各个电视台,订阅气象中心的信息,有新增的气象信息发布的时候,及时播报。
定义气象中心:
publicinterfaceWeatherCenter{
voidpublishWeatherInfo();
}
定义观察者对象:
publicinterfaceObserver{
voidsendWeatherWarning();
}
定义具体的观察者:
publicclassBeijingTvObserverimplementsObserver{
@Override
publicvoidsendWeatherWarning(){
System.out.println("北京卫视天气预报开始了");
}
}
中央电视台:
publicclassCCTVObserverimplementsObserver{
@Override
publicvoidsendWeatherWarning(){
System.out.println("中央电视台天气预报开始了");
}
}
现在发布北京的气象信息:
publicclassBeijingWeatherimplementsWeatherCenter{
privateList<Observer>observerArrayList=newArrayList<>();
@Override
publicvoidpublishWeatherInfo(){
for(Observerobserver:observerArrayList){
observer.sendWeatherWarning();
}
}
}
这时候给所有的订阅者推送一条气象发布消息,那么他们就收到最新的气象预报。
总结一下观察者模式的核心就是:事件中心持有所有的订阅者,每当事件发生时循环通知所有的订阅者。
当然上面我写的比较简单,你也可以在事件中心写一个注册订阅者的方法,每当有新的订阅者加入就调用该方法注册。
Java中的事件机制
Java中提供了基本的事件处理基类:
- EventObject:所有事件状态对象都将从其派生的根类;
- EventListener:所有事件侦听器接口必须扩展的标记接口;
具体使用方式可以用一个非常经典的开门案例来讲解:
首先创建一个开/关门事件:
importjava.util.EventObject;
/**
*@authorrickiyang
*@date2019-12-05
*@DescTODO
*/
publicclassDoorEventextendsEventObject{
privateIntegerdoorStatus;
publicDoorEvent(Objectsource){
super(source);
}
publicDoorEvent(Objectsource,Integerstatus){
super(source);
this.doorStatus=status;
}
publicvoidsetStatus(Integerstatus){
this.doorStatus=status;
}
publicIntegergetStatus(){
returndoorStatus;
}
}
所有的事件都继承EventObject。
然后创建监听器:
publicinterfaceDoorListenerextendsEventListener{
voidDoorEvent(DoorEventdoorEvent);
}
所有的监听器都要实现EventListener。
继续创建具体的开门/关门的监听器:
publicclassCloseDoorListenerimplementsDoorListener{
@Override
publicvoidDoorEvent(DoorEventdoorEvent){
IntegeropenStatus=doorEvent.getStatus();
if(0==openStatus){
System.out.println("thedoorisclose");
}
}
}
开门:
publicclassOpenDoorListenerimplementsDoorListener{
@Override
publicvoidDoorEvent(DoorEventdoorEvent){
IntegeropenStatus=doorEvent.getStatus();
if(1==openStatus){
System.out.println("thedoorisopen");
}
}
}
有了监听器和事件之后,下一步就是用上他们。还记得上面的观察者模式嘛,同样的使用方式:
/**
*将所有的listener保存起来
*
*@return
*/
publicstaticList<DoorListener>getAllListener(){
List<DoorListener>list=Lists.newArrayList();
list.add(newOpenDoorListener());
list.add(newCloseDoorListener());
returnlist;
}
publicstaticvoidmain(String[]args){
DoorEventopen=newDoorEvent("open",1);
List<DoorListener>listeners=getAllListener();
for(DoorListenerlistener:listeners){
listener.DoorEvent(open);
}
}
Spring中的事件机制
在Spring容器中通过ApplicationEvent
类和ApplicationListener
接口来处理事件,如果某个bean
实现ApplicationListener
接口并被部署到容器中,那么每次对应的ApplicationEvent
被发布到容器中都会通知该bean
,这是典型的观察者模式。
Spring的事件默认是同步的,即调用publishEvent
方法发布事件后,它会处于阻塞状态,直到onApplicationEvent
接收到事件并处理返回之后才继续执行下去,这种单线程同步的好处是可以进行事务管理。
先展示一下使用方式,我们拿用户登录来举例。首先来创建一个事件:
importorg.springframework.context.ApplicationEvent;
/**
*@authorrickiyang
*@date2019-12-04
*@DescTODO
*/
publicclassUserRegisterEventextendsApplicationEvent{
publicUserRegisterEvent(Objectsource){
super(source);
}
}
然后创建监听器去监听这个事件:
importcom.alibaba.fastjson.JSON;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.ApplicationListener;
importorg.springframework.stereotype.Component;
/**
*@authorrickiyang
*@date2019-12-05
*@Desc插入用户信息
*/
@Component
publicclassUserInsertListenerimplementsApplicationListener<UserRegisterEvent>{
@Override
publicvoidonApplicationEvent(UserRegisterEventuserRegisterEvent){
Stringsource=(String)userRegisterEvent.getSource();
Useruser=JSON.parseObject(source,User.class);
//insertdb
}
}
创建一个用户注册成功之后插入用户信息的监听器。
importcom.alibaba.fastjson.JSON;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.ApplicationListener;
importorg.springframework.stereotype.Component;
/**
*@authorrickiyang
*@date2019-12-05
*@Desc用户注册成功发送短信
*/
@Component
publicclassNotifyUserListenerimplementsApplicationListener<UserRegisterEvent>{
@Override
publicvoidonApplicationEvent(UserRegisterEventuserRegisterEvent){
Stringsource=(String)userRegisterEvent.getSource();
Useruser=JSON.parseObject(source,User.class);
//sendsms
}
}
创建注册成功发送通知短信的监听器。
importcom.alibaba.fastjson.JSON;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.ApplicationListener;
importorg.springframework.stereotype.Component;
/**
*@authorrickiyang
*@date2019-12-05
*@Desc用户注册成功给用户生成推荐商品
*/
@Component
publicclassRecommendListenerimplementsApplicationListener<UserRegisterEvent>{
@Override
publicvoidonApplicationEvent(UserRegisterEventuserRegisterEvent){
Stringsource=(String)userRegisterEvent.getSource();
Useruser=JSON.parseObject(source,User.class);
//generaterecommendcommodity
}
}
创建用户注册成功之后给用户推荐商品的事件。
用户注册事件的监听器创建完毕,那么接下来就发布事件等待监听器监听就行。在Spring中提供了ApplicationEventPublisherAware接口,从名称上看就知道是ApplicationEventPublisher的适配器类,用法就是你在业务类中实现该接口,然后使用ApplicationEventPublisher#publishEvent发布你的事件即可。
packagecom.rickiyang.learn.controller.test;
importcom.alibaba.fastjson.JSON;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.ApplicationEventPublisher;
importorg.springframework.context.ApplicationEventPublisherAware;
importorg.springframework.stereotype.Service;
/**
*@authorrickiyang
*@date2019-12-04
*@DescTODO
*/
@Service
publicclassUserRegisterPublisherServiceimplementsApplicationEventPublisherAware{
privateApplicationEventPublisherapplicationEventPublisher;
@Override
publicvoidsetApplicationEventPublisher(ApplicationEventPublisherapplicationEventPublisher){
this.applicationEventPublisher=applicationEventPublisher;
}
publicvoidinsert(Useruser){
UserRegisterEventevent=newUserRegisterEvent(JSON.toJSONString(user));
applicationEventPublisher.publishEvent(event);
}
}
调用insert方法就可以发布事件,写一个test测试一下:
importcom.rickiyang.learn.entity.User;
importorg.junit.Test;
importorg.junit.runner.RunWith;
importorg.springframework.boot.test.context.SpringBootTest;
importorg.springframework.test.context.junit4.SpringRunner;
importjavax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest
publicclassUserRegisterPublisherServiceTest{
@Resource
privateUserRegisterPublisherServiceuserRegisterPublisherService;
@Test
publicvoidtest1(){
Userbuild=User.builder().name("1").sex(1).phone("123456789").build();
userRegisterPublisherService.insert(build);
}
}
可以看到3个监听器都打印出来了:
发送短信
商品推荐
插入用户
有个问题不知道大家发现没,监听器的发布顺序是按照bean自然装载的顺序执行的,如果我们的bean是有序的应该怎么办呢?别怕,Spring自然考虑到这个问题。
SmartApplicationListener实现有序的监听
SmartApplicationListener接口继承了ApplicationListener,使用全局的ApplicationEvent作为监听的事件对象。之所以能提供顺序性,是因为继承了Ordered类,实现了排序的逻辑。另外添加了两个方法#supportsEventType、#supportsSourceType来作为区分是否是我们监听的事件,只有这两个方法同时返回true时才会执行onApplicationEvent方法。
packagecom.rickiyang.learn.controller.test;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.ApplicationEvent;
importorg.springframework.context.event.SmartApplicationListener;
importorg.springframework.stereotype.Component;
/**
*@authorrickiyang
*@date2019-12-05
*@DescTODO
*/
@Component
publicclassUserInsert1ListenerimplementsSmartApplicationListener{
@Override
publicbooleansupportsEventType(Class<?extendsApplicationEvent>aClass){
returnaClass==UserRegisterEvent.class;
}
@Override
publicbooleansupportsSourceType(Class<?>sourceType){
returnsourceType==User.class;
}
/**
*数字越小优先级越高
*默认值为2147483647
*@return
*/
@Override
publicintgetOrder(){
return8;
}
@Override
publicvoidonApplicationEvent(ApplicationEventapplicationEvent){
UserRegisterEventevent=(UserRegisterEvent)applicationEvent;
//inserttodb
}
}
如果你有对多个监听器做排序的需求,那么你只用在getOrder方法中指定当前的排序级别即可。数字越大优先级越低,默认的排序级别是2147483647,你可以自己调整。
Spring对事件监听机制的注解支持
Spring4.2
之后,ApplicationEventPublisher
自动被注入到容器中,不再需要显示实现Aware
接口。
importcom.alibaba.fastjson.JSON;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.ApplicationEventPublisher;
importorg.springframework.context.ApplicationEventPublisherAware;
importorg.springframework.stereotype.Service;
importjavax.annotation.Resource;
/**
*@authorrickiyang
*@date2019-12-04
*@DescTODO
*/
@Service
publicclassUserRegisterPublisher1Service{
@Resource
privateApplicationEventPublisherapplicationEventPublisher;
publicvoidinsert(Useruser){
UserRegisterEventevent=newUserRegisterEvent(JSON.toJSONString(user));
applicationEventPublisher.publishEvent(event);
}
}
创建listener也就不需要显式的继承ApplicationListener或者SmartApplicationListener,使用@EventListener注解即可:
importcom.alibaba.fastjson.JSON;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.event.EventListener;
importorg.springframework.core.annotation.Order;
importorg.springframework.stereotype.Service;
/**
*@authorrickiyang
*@date2019-12-07
*@DescTODO
*/
@Service
publicclassUserInfoCheckListener{
@Order(8)
@EventListener(classes=UserRegisterEvent.class)
publicvoidcheckUserInfo(UserRegisterEventevent){
Stringsource=(String)event.getSource();
Useruser=JSON.parseObject(source,User.class);
//todocheckuserinfo
}
}
如果你想使用顺序性的listener,那么只需要使用@Order注解就可以了。
异步事件的支持
上面说过Spring事件机制默认是同步阻塞的,如果ApplicationEventPublisher发布事件之后他会一直阻塞等待listener响应,多个listener的情况下前面的没有执行完后面的一直被阻塞。如果我们的应用场景是:用户订单完成之后异步发货,检查快递信息,这些操作是没有必要返回结果给用户的。
这种情况下,我们是不是想到可以使用异步线程的方式来处理。你可以把listener中的处理流程做一个异步线程,或者利用Spring提供的线程池注解@Async来实现异步线程。
要使用@Async之前需要先开启线程池,在启动类上添加@EnableAsync注解即可。线程池支持配置模式,如果你不想使用默认的线程池配置,可以手动指定:
packagecom.rickiyang.learn.controller.test;
importcom.google.common.util.concurrent.ThreadFactoryBuilder;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.scheduling.annotation.EnableAsync;
importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
importjava.util.concurrent.*;
/**
*@authorrickiyang
*@date2019-12-07
*@DescTODO
*/
@Configuration
@EnableAsync
publicclassAsyncConfig{
@Bean("userInfoPool")
publicExecutorgetExecutor(){
ThreadFactorynamedThreadFactory=newThreadFactoryBuilder()
.setNameFormat("consumer-queue-thread-%d").build();
ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();
//线程池维护线程的最少数量
executor.setCorePoolSize(5);
//线程池维护线程的最大数量
executor.setMaxPoolSize(10);
//缓存队列
executor.setQueueCapacity(25);
//线程名
executor.setThreadFactory(namedThreadFactory);
//线程池初始化
executor.initialize();
returnexecutor;
}
}
手动配置一个beanname为userInfoPool的线程池,接下来使用@Async注解使用线程池:
packagecom.rickiyang.learn.controller.test;
importcom.alibaba.fastjson.JSON;
importcom.rickiyang.learn.entity.User;
importorg.springframework.context.event.EventListener;
importorg.springframework.core.annotation.Order;
importorg.springframework.scheduling.annotation.Async;
importorg.springframework.stereotype.Service;
/**
*@authorrickiyang
*@date2019-12-07
*@DescTODO
*/
@Service
publicclassUserInfoCheckListener{
@Async("userInfoPool")
@Order(8)
@EventListener(classes=UserRegisterEvent.class)
publicvoidcheckUserInfo(UserRegisterEventevent){
Stringsource=(String)event.getSource();
Useruser=JSON.parseObject(source,User.class);
System.out.println("asyncdeel");
//todocheckuserinfo
}
}
这样我们就把UserInfoCheckListener变成了异步任务。
Spring中的事件机制分析
上面从基本的发布订阅设计模式到Java提供的基本的事件处理基类,再拓展到Spring中如何使用事件机制来拓展代码,整条线还是很清晰。讲完了我们应该如何在业务代码中使用发布订阅模式,我们也来分析一下Spring是如何实现发布订阅模式的,看看人家的代码功底。
在Spring中提供了Event的基类:ApplicationEvent,如果事件要想被Spring监听那么就必须继承该类,同样该类也继承了Java中的事件基类:EventObject。
有了事件源,我们要定义事件监听者用于处理事件,所有的事件监听者都要继承org.springframework.context.ApplicationListener
接口:
/**
*Interfacetobeimplementedbyapplicationeventlisteners.
*Basedonthestandard{@codejava.util.EventListener}interface
*fortheObserverdesignpattern.
*
*<p>AsofSpring3.0,anApplicationListenercangenericallydeclaretheeventtype
*thatitisinterestedin.WhenregisteredwithaSpringApplicationContext,events
*willbefilteredaccordingly,withthelistenergettinginvokedformatchingevent
*objectsonly.
*
*@authorRodJohnson
*@authorJuergenHoeller
*@param<E>thespecificApplicationEventsubclasstolistento
*@seeorg.springframework.context.event.ApplicationEventMulticaster
*/
publicinterfaceApplicationListener<EextendsApplicationEvent>extendsEventListener{
}
ApplicationListener提供了一个基于ApplicationEvent的泛型,所以你指定了某个类的监听者只会处理该类型的event。
上面我们说了Spring是基于ApplicationEventPublisher来发布事件,那么监听器是如何获取到事件呢?
注意到ApplicationListener上面的注释写到:@param<E>thespecificApplicationEventsubclasstolistentoApplicationEventMulticaster
,从名称上看这个类的作用应该是用于事件广播。
ApplicationEventMulticaster是一个接口,提供了如下方法:
- addApplicationListener(ApplicationListener<?>listener):新增一个listener;
- addApplicationListenerBean(StringlistenerBeanName):新增一个listener,参数为beanname;
- removeApplicationListener(ApplicationListener<?>listener):删除listener;
- removeApplicationListenerBean(StringlistenerBeanName):根据beanname删除listener;
- multicastEvent(ApplicationEventevent):广播事件;
- multicastEvent(ApplicationEventevent,@NullableResolvableTypeeventType):广播事件,指定事件的source类型。
从接口的方法看,该类的作用就是添加监听器然后对所有监听器或者指定监听器发送事件进行处理。
ApplicationEventMulticaster有两个实现类:
- SimpleApplicationEventMulticaster
- AbstractApplicationEventMulticaster
因为AbstractApplicationEventMulticaster是一个抽象类,并且SimpleApplicationEventMulticaster也继承了了SimpleApplicationEventMulticaster,所以我们直接看SimpleApplicationEventMulticaster:
publicabstractclassAbstractApplicationEventMulticaster
implementsApplicationEventMulticaster,BeanClassLoaderAware,BeanFactoryAware{
privatefinalListenerRetrieverdefaultRetriever=newListenerRetriever(false);
finalMap<ListenerCacheKey,ListenerRetriever>retrieverCache=newConcurrentHashMap<>(64);
@Override
publicvoidaddApplicationListener(ApplicationListener<?>listener){
synchronized(this.retrievalMutex){
//Explicitlyremovetargetforaproxy,ifregisteredalready,
//inordertoavoiddoubleinvocationsofthesamelistener.
ObjectsingletonTarget=AopProxyUtils.getSingletonTarget(listener);
if(singletonTargetinstanceofApplicationListener){
this.defaultRetriever.applicationListeners.remove(singletonTarget);
}
this.defaultRetriever.applicationListeners.add(listener);
this.retrieverCache.clear();
}
}
......
......
}
#addApplicationListener
方法用于新增监听器,新增的逻辑主要在这一句:
defaultRetriever.applicationListeners.add(listener);
继续看ListenerRetriever的实现:
privateclassListenerRetriever{
publicfinalSet<ApplicationListener<?>>applicationListeners=newLinkedHashSet<>();
publicfinalSet<String>applicationListenerBeans=newLinkedHashSet<>();
privatefinalbooleanpreFiltered;
publicListenerRetriever(booleanpreFiltered){
this.preFiltered=preFiltered;
}
publicCollection<ApplicationListener<?>>getApplicationListeners(){
List<ApplicationListener<?>>allListeners=newArrayList<>(
this.applicationListeners.size()+this.applicationListenerBeans.size());
allListeners.addAll(this.applicationListeners);
if(!this.applicationListenerBeans.isEmpty()){
BeanFactorybeanFactory=getBeanFactory();
for(StringlistenerBeanName:this.applicationListenerBeans){
try{
ApplicationListener<?>listener=beanFactory.getBean(listenerBeanName,ApplicationListener.class);
if(this.preFiltered||!allListeners.contains(listener)){
allListeners.add(listener);
}
}
catch(NoSuchBeanDefinitionExceptionex){
//Singletonlistenerinstance(withoutbackingbeandefinition)disappeared-
//probablyinthemiddleofthedestructionphase
}
}
}
if(!this.preFiltered||!this.applicationListenerBeans.isEmpty()){
AnnotationAwareOrderComparator.sort(allListeners);
}
returnallListeners;
}
}
看到没,最终还是持有了一个applicationListeners的集合,跟我们的发布订阅设计模式一样。
剩下的逻辑就好去解释,顺着咱们前面讲过的发布订阅模式的使用套路撸下去就行,事件广播的方法#multicastEvent
不外乎就是遍历所有的监听器进行匹配。
总结
这一篇讲的发布订阅模式以及在Spring中的使用在日常开发中只要稍加注意你就会发现对改善代码流程的影响还是挺大。写代码有90%的时间我们都是在写同步代码,因为不用动脑子,顺着该有的流程撸就完事。这样带来的后果就是你真的只是在搬砖!
有的时候停下来,从业务逻辑跳出来拿半个小时想想你应该如何让这这一次搬砖有点技术含量。或许从此刻开始,搬砖也会与众不同。
本文内容总结:
原文链接:https://www.cnblogs.com/rickiyang/p/12001524.html