一、背景
我们大部分人的编程习惯都是线性编程,所谓线性编程就是一个请求涉及到A,B,C,D等n个有顺序关系的操作在编码处理层面都是顺序性的,这样会导致随着业务的发展,依赖A操作结果的业务越来越多,请求处理会出现A->B->C->D->E....等很多个操作和A操作耦合在一起,会直接导致接口的rt变高,另外业务层面边界变得模糊,各个业务线的逻辑相互穿插,相互强依赖.
先举几个例子:
- 订单支付成功,需要核销优惠券,需要扣积分,需要通知仓库履约
- 自来水公司通知来水了,小明可以洗衣服了,妈妈可以做饭了,爸爸可以上厕所了
- 放学了,同学们可以出去玩了,老师可以回家了,门卫可以关校门了
- 客服上线了,可以接待在线会话了,也可以处理离线工单了
我们用编程思维逐个分析一下上述案例:订单支付成功后,核销用户优惠券,然后扣除积分,然后调履约系统发货,才算支付完成;自来水公司广播来水了,等小明洗完衣服、小明妈妈做完饭和爸爸上完厕所才算广播结束;放学铃声响了,同学们都去玩了,老师骑着二八大杠到家了,门卫大爷把门锁上了才算放学;客服切换工作状态为在线后,要等在线会话分配完成,离线工单分单完成才算上线成功.
软件编程讲究高内聚低耦合,也就是领域边界问题,细想一下前边几种场景前后有没有必然的同步依赖关系,再抽象一点,每一个业务场景、每一个操作都有自己的业务主体和操作主线,也就是说理论上只关心自己领域边界内的操作完成,对于其他的附带操作不是必然的,如果后续有操作依赖其结果,那么可以通过消息或者事件订阅来做逻辑层面解耦和操作层面异步化.
也即是操作主线完成操作业务主体领域内的业务,把结果发布出去,有依赖该结果的业务,自己订阅并在领域内消化处理该事件,理论上适用于业务边界清晰和若依赖性场景,如果是强依赖型需要自己做一致性保证或者业务层面重新抽象和定于领域边界问题,考虑把强依赖性的放到同一个业务领域.
二、概念
1.事件
事件是对操作行为的抽象,比如上述案例中的订单支付成功、放学铃响和客服上线等等,是基于当前业务变更产生的广播通知,周边业务可以基于此操作行为通知完成自己业务领域内的操作。
2.通知
前边有提到某个业务完成自己操作后,需要将结果发布出去,对于依赖此结果完成后置操作的场景订阅事件完成自己的业务,发布通知又分为应用内和应用外,应用外通知基本上都是借助于消息中间件来发布,应用内也就是把事件发布出去,或者发布到应用内特有的容器或者队列,供依赖方消费。
3.监听器
监听器是对某种业务侧封装,订阅自己感兴趣的事件,在接收到事件通知后完成自己领域内的操作,比如在线会话监听器在接收到客服上线事件通知后,触发会话分配操作。
三、Jdk观察者模式
jdk1.0就引入了观察者模式,有两个核心的类:Observable和Observer.
- Observable 是一个事件操作封装类,继承该类的类被定义为一个可被观察的对象或者数据源,在完成当前业务的操作后可以根据需要把变更事件发布出去
- Observer 是一个接口,实现该接口的类被定义为一个观察者,也就是前边说的监听器,在接收到变更事件的时候会被Observable调用
1.实现
新建被观察者:
代码语言:javascript复制@Slf4j
public class TestObservable extends Observable {
public TestObservable() {
//1.添加观察者
this.addObserver(new TestObserver());
}
public void doSomething() {
log.info("TestObserve.doSomething currentThread={}",Thread.currentThread().getName());
//2.发布变更通知
this.setChanged();
this.notifyObservers();
//this.notifyObservers();
}
}
新建观察者
代码语言:javascript复制@Slf4j
public class TestObserver implements Observer {
@Override
public void update(Observable o, Object arg) {
log.info("TestObserver.update receive change event;Observable={},arg={},currentThread={}",o,arg,Thread.currentThread().getName());
}
}
编写测试代码并执行:
代码语言:javascript复制public static void main(String[] args) {
TestObservable observable = new TestObservable();
observable.doSomething();
}
整个实现比较简单,但是过程中也有一些点我们需要注意并记录下来用于后续分析:
- 被观察者必须继承Observable类,受限于java类单继承规则,如果业务类需要继承其他业务类做代码和逻辑复用,这里不能接受
- 观察者装配与业务代码耦合,背离IOC原则
- 同步操作,从截图中打印的结果来看,被观察者和观察者的业务逻辑执行是同一个线程,如果观察者异常直接影响到被观察者主逻辑运行
- 泛化事件上下文,Observer的update方法第二个参数是Object类型,观察者无法通过事件类型来做业务逻辑隔离
2.原理
jdk自带的观察者模式实现我们主要分析一下Observable类就好,先看代码实现:
代码语言:javascript复制public class Observable {
private boolean changed = false;
private Vector<Observer> obs;
public Observable() {
obs = new Vector<>();
}
public synchronized void addObserver(Observer o) {
if (o == null)
throw new NullPointerException();
if (!obs.contains(o)) {
obs.addElement(o);
}
}
public void notifyObservers() {
notifyObservers(null);
}
public void notifyObservers(Object arg) {
Object[] arrLocal;
synchronized (this) {
if (!changed)
return;
arrLocal = obs.toArray();
clearChanged();
}
for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}
}
该类实现很简单,有几个关键信息:obs(观察者容器),addObserver(添加观察者),notifyObservers(事件通知),没有什么好分析的,就是在合适的时机将观察者添加到被观察者容器中,被观察者发生变更后调用notifyObservers方法把容器中的观察者都执行一遍。
3.优缺点
上述代码是jdk1.0石器时代的代码,除了学习观察者模式和做案例讲解,估计罕有人用,当然互联网发展到今天我们还是以批判性的眼光来分析一下其优缺点.
优点: 实现简单易懂
缺点: 被观察者必须继承Observable类,观察/被观察关系装配与逻辑代码耦合,同步操作,泛化上下文
4.改进
对于继承Observable类无能为力,除非自己重新写一套基于接口的来绕开单继承规则,那么我们可以从观察者/被观察者关系装配和同步操作这两个点着手考虑开进方案.
首先为每一种事件类型定义对应的监听器:
代码语言:javascript复制public interface TestObservableListener extends Observer {}
然后观察者实现监听器接口并注入应用容器:
代码语言:javascript复制@Slf4j
@Component
public class TestObserver implements TestObservableListener {
@Override
public void update(Observable o, Object arg) {
log.info("TestObserver.update receive change event;Observable={},arg={},currentThread={}",o,arg,Thread.currentThread().getName());
}
}
被观察者实现ApplicationContextAware接口,借鸡生蛋从应用容器取出指定类型观察者放入被观察者容器:
代码语言:javascript复制@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String,TestObservableListener> map = applicationContext.getBeansOfType(TestObservableListener.class);
map.forEach((k,v) -> this.addObserver(v));
}
使用线程池异步通知变更事件:
代码语言:javascript复制public void doSomething() {
log.info("TestObserve.doSomething currentThread={}",Thread.currentThread().getName());
ThreadPoolUtil.COMMON_POOL.execute(() -> {
this.setChanged();
this.notifyObservers();
});
}
拉出来溜一圈:
通过改进实现了观察者与被观察者关系装配分离以及监听器操作异步化,能满足比较简单的业务场景,但是只能说还是很难用。
四、Jdk属性变更事件
jdk1.1版本引入了事件模式,比较简单,有两个关键的类:EventObject和EventListener.
- EventObject 事件类,不同的事件类型各自继承该类做定制化
- EventListener 监听器接口,实现该接口的类被定义为事件监听器
但是这个版本只是引入了事件的概念,在编码层面并没有本质性的改善,后续版本引入属性变更事件,实用性也好了起来,有三个比较核心的类:PropertyChangeEvent/PropertyChangeSupport/PropertyChangeListener.
- PropertyChangeEvent类继承于EventObject,新增了一些定制化属性,属性名、老的值和新的值
- PropertyChangeListener也是一个接口,继承于EventListener,新增了propertyChange方法,接收PropertyChangeEvent事件
- PropertyChangeSupport是一个工具类,提供了属性变更相关操作的抽象,提供了承载监听器的容器
1.实现
新建监听器:
代码语言:javascript复制@Slf4j
public class SimplePropertyChangeListener implements PropertyChangeListener {
@Override
public void propertyChange(PropertyChangeEvent evt) {
log.info("SimplePropertyChangeListener.propertyChange evt={},thread={}",evt,Thread.currentThread().getName());
}
}
新建业务类:
代码语言:javascript复制@Slf4j
public class EventManager {
private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
public EventManager() {
this.propertyChangeSupport.addPropertyChangeListener(new SimplePropertyChangeListener());
}
private String str;
public void setStr(String str) {
log.info("EventManager.setStr;thread={}",Thread.currentThread().getName());
String oldStr = this.str;
this.str = str;
this.propertyChangeSupport.firePropertyChange("str",oldStr,str);
}
}
测试:
代码语言:javascript复制public static void main(String[] args) {
new EventManager().setStr("hello");
}
实现也简单,过程中也有一些点我们需要注意并记录下来用于后续分析:
- 业务类自己维护与监听器的关系,背离IOC的业务类只关注与业务逻辑处理,bean依赖关系交给应用容器管理
- 同步操作,从截图看,业务类和监听器业务逻辑执行是同一个线程,如果观察者异常直接影响到被观察者主逻辑运行
2.原理
其他两个类比较简单,直接分析PropertyChangeSupport类:
代码语言:javascript复制public class PropertyChangeSupport implements Serializable {
private PropertyChangeListenerMap map = new PropertyChangeListenerMap();
public PropertyChangeSupport(Object sourceBean) {
if (sourceBean == null) {
throw new NullPointerException();
}
source = sourceBean;
}
public void addPropertyChangeListener(PropertyChangeListener listener) {
if (listener == null) {
return;
}
if (listener instanceof PropertyChangeListenerProxy) {
PropertyChangeListenerProxy proxy =
(PropertyChangeListenerProxy)listener;
// Call two argument add method.
addPropertyChangeListener(proxy.getPropertyName(),
proxy.getListener());
} else {
this.map.add(null, listener);
}
}
public void firePropertyChange(String propertyName, Object oldValue, Object newValue) {
if (oldValue == null || newValue == null || !oldValue.equals(newValue)) {
firePropertyChange(new PropertyChangeEvent(this.source, propertyName, oldValue, newValue));
}
}
public void firePropertyChange(PropertyChangeEvent event) {
Object oldValue = event.getOldValue();
Object newValue = event.getNewValue();
if (oldValue == null || newValue == null || !oldValue.equals(newValue)) {
String name = event.getPropertyName();
PropertyChangeListener[] common = this.map.get(null);
PropertyChangeListener[] named = (name != null)
? this.map.get(name)
: null;
fire(common, event);
fire(named, event);
}
}
private static void fire(PropertyChangeListener[] listeners, PropertyChangeEvent event) {
if (listeners != null) {
for (PropertyChangeListener listener : listeners) {
listener.propertyChange(event);
}
}
}
}
该类内部维护一个map,用于存放监听器,在业务类调用firePropertyChange方法后,从容器中取出符合条件的监听器循环调用propertyChange方法。
3.优缺点
优点: 实现简单,PropertyChangeSupport封装了监听器容器和对一些复杂操作做了透明化处理
缺点: 需要手动将监听器添加到PropertyChangeSupport维护的容器中,同步操作
4.改进
改进方案和观察者模式类似,利用spring容器的扩展点将业务方和监听器关系交给容器维护,在触发fire的时候利用多线程。
做一下思维发散,我们可以继承PropertyChangeSupport重写fire方法,在这里做多线程异步化。
五、Guava事件总线
EventBus是谷歌开源的实现事件驱动编程的事件总线,并且提供了基于注解的编码方式,对于需要实现应用内业务解耦的场景,是一个不错的选择,常用的有有两个类和一个注解:EventBus、AsyncEventBus和@Subscribe.
- EventBus 事件总线,封装了监听器容器和事件发布逻辑
- AsyncEventBus 是EventBus的异步实现,大部分场景业务解耦之后,监听器的执行结果不影响事件发布者的主逻辑,使用异步事件总线后开启新线程执行监听器逻辑,不阻塞事件发布者逻辑
- @Subscribe注解,该注解作用于方法,将该方法连同宿主类封装成监听器,在事件发布后,事件主线从监听器容器中根据事件类型获取对应的监听器列表并执行逻辑
1.实现
创建事件:
代码语言:javascript复制@Data
class TestEvent1 {
private int message;
public TestEvent1(int message) {
this.message = message;
}
}
创建事件监听器:
代码语言:javascript复制@Slf4j
class EventListener {
@Subscribe
public void onEvent(TestEvent1 event) {
log.info("EventListener onEvent event={},thread={}",event,Thread.currentThread().getName());
}
}
guava的事件总线设计和实现都已经比较成熟,能够满足绝大多数应用内业务解耦的诉求,但是有些点还是对我们不够友好,在编码层面还是要对其做一些封装处理。
- 业务类自己维护与监听器的关系,背离IOC原则
- 同步操作,从截图看,业务类和监听器业务逻辑执行是同一个线程,如果观察者异常直接影响到被观察者主逻辑运行,当然guava为了解决这一点新增了AsyncEventBus来做异步化
2.原理
guava时间总线的核心类就是EventBus,我们剪取我们用到的方法看一下其实现:
代码语言:javascript复制public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
public EventBus() {
this("default");
}
public EventBus(String identifier) {
this(
identifier,
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
LoggingHandler.INSTANCE);
}
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this(
"default",
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
exceptionHandler);
}
EventBus(
String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}
public void register(Object object) {
subscribers.register(object);
}
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
}
identifier是时间总线的身份信息,subscribers是事件监听器容器,dispatcher是事件派发器,我们看一下构造器和方法:
构造器:
代码语言:javascript复制EventBus(
String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}
创建事件总线指定标识符,监听器业务执行器,事件派发器,异常处理器.
注册监听器:
代码语言:javascript复制public void register(Object object) {
subscribers.register(object);
}
调用了SubscriberRegistry监听器注册中心的注册方法:
代码语言:javascript复制void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
先调用了findAllSubscribers方法,从监听器所属类解析出使用@Subscribe注解的方法并封装成Subscriber放入Multimap返回.
代码语言:javascript复制 private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
然后将返回的Multimap<Event,Subscriber>打平转换成Map<Event,List<Subscriber>>,遍历Map使用CopyOnWriteArraySet做事件的读写分离,然后把监听器注册到subscribers中.
发布事件:
代码语言:javascript复制 public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
先从监听器注册中心根据事件类型获取监听器列表,然后调用派发器发布事件,选取一种派发器LegacyAsyncDispatcher看一下实现:
代码语言:javascript复制@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
将事件封装成EventWithSubscriber放入本地队列,然后再派发出去,最后再看一下Subscriber的事件派发实现:
代码语言:javascript复制 final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
此处是执行事件监听器逻辑的地方,如果创建事件总线的时候传入了线程池,那么这里会使用线程池的的线程执行,如果没有传入线程池,EventBus会使用MoreExecutors.directExecutor()作为Executor,而该Executor直接使用业务线程执行监听器逻辑,也就是同步的.
代码语言:javascript复制enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
然后我们再简单分析一下AsyncEventBus:
代码语言:javascript复制public class AsyncEventBus extends EventBus {
public AsyncEventBus(String identifier, Executor executor) {
super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
}
可以理解为多了需要传入线程池的构造器.
3.优缺点
优点: 接入简单,设计成熟,可以根据事件类型将监听器归类
缺点: 需要手动将监听器添加到EventBus事件监听器注册中心
4.改进
自定义事件监听器接口,主要用于在spring容器管理的时候归类处理:
代码语言:javascript复制public interface IEventListener {}
自定义事件监听器:
代码语言:javascript复制@Subscribe
public void handle(CloseSessionEvent event) {
log.info("CloseSessionEventListener.handle receive event={}",event);
}
继承AsyncEventBus自定义异步事件总线,利用spring容器将事件监听器自动注册到事件总线:
代码语言:javascript复制@Slf4j
@Service
public class EventBusHelper extends AsyncEventBus implements ApplicationContextAware {
public static final String DEFAULT_EVENT_BUS = "async_event_bus";
private static final AtomicBoolean init = new AtomicBoolean(false);
private static final Executor DEFAULT_EXECUTOR = ThreadPoolUtil.COMMON_POOL;
public EventBusHelper() {
this(DEFAULT_EVENT_BUS);
}
public EventBusHelper(String identifier) {
this(identifier,DEFAULT_EXECUTOR);
}
public EventBusHelper(String identifier,Executor executor) {
super(identifier,executor);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(init.get()) {
return;
}
Map<String,IEventListener> beanMap = applicationContext.getBeansOfType(IEventListener.class);
beanMap.forEach((k,v) -> this.register(v));
}
}
这样就实现了事件监听器管理与业务代码解耦,在业务类中引入EventBusHelper调用post发布事件即可,其他的已经交给IOC处理.
六、Spring事件驱动
spring作为除了jdk之外最被广泛使用的基础构件,每个模块之间也大量使用了事件驱动编程,并且留出了扩展点供开发者使用,在满足开闭原则的情况下,开发者可以写很少的代码就能复用spring的事件驱动编程.
我们实际用到的有3个类和一个注解:ApplicationEvent、ApplicationListener、ApplicationEventPublisher和@EventListener注解.
- ApplicationEvent 是一个抽象类,继承于EventObject,用户自定义事件继承ApplicationEvent类做个性化实现.
- ApplicationListener 是一个接口,继承了EventListener,实现该接口的类被定义为事件监听器,监听器类通过事件类型归类.
- ApplicationEventPublisher 也是一个接口,业务类可以实现该接口新增事件发布者身份,并注入发布事件能力.
- @EventListener是注解,将该注解标记在类方法上会被spring容器自动装配成事件监听器(EventListenerMethodProcessor实现),和实现ApplicationListener接口作用等同.
1.实现
创建事件类:
代码语言:javascript复制public class TestSwitchStateEvent extends ApplicationEvent {
private Integer originStatus;
private Integer targetStatus;
private String casAccount;
/**
* Create a new ApplicationEvent.
*
* @param targetStatus the object on which the event initially occurred (never {@code null})
*/
public TestSwitchStateEvent(Integer targetStatus) {
super(targetStatus);
}
public TestSwitchStateEvent(Integer originStatus, Integer targetStatus, String casAccount) {
this(targetStatus);
this.targetStatus = targetStatus;
this.originStatus = originStatus;
this.casAccount = casAccount;
}
}
新增监听器:
代码语言:javascript复制@Slf4j
@Component
public class Test3SwitchStateListener implements ApplicationListener<TestSwitchStateEvent> {
@Override
public void onApplicationEvent(TestSwitchStateEvent event) {
log.info("Test3SwitchStateListener.switchStateListener receive event={}",event);
}
}
编写事件发布逻辑,spring容器已经初始化了ApplicationEventPublisher,只需要实现ApplicationEventPublisherAware接口注入线程事件发布器:
代码语言:javascript复制@Slf4j
@Component
public class TestApplicationEventPublisher implements ApplicationEventPublisherAware, InitializingBean {
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
public void doSomething() {
log.info("TestApplicationEventPublisher.doSomething thread={}",Thread.currentThread().getName());
this.applicationEventPublisher.publishEvent(new TestSwitchStateEvent(2,1,"Typhoon"));
}
@Override
public void afterPropertiesSet() throws Exception {
this.doSomething();
}
}
启动测试,事件正常发布和订阅:
调整一下监听器实现,使用注解方式:
代码语言:javascript复制@Slf4j
@Component
public class Test3SwitchStateListener {
@EventListener
public void onApplicationEvent(TestSwitchStateEvent event) {
log.info("Test3SwitchStateListener.switchStateListener receive event={},thread={}",event,Thread.currentThread().getName());
}
}
和实现接口的方式效果一样,但是从截图中能看出事件发布者和监听器都使用相同线程执行,当然这不一定算是问题,但是在我们业务场景中对于事件驱动的使用主要是为了业务解耦,既然解耦了为什么不用异步?
2.原理
spring事件驱动的原理大致从两个点分析,@EventListener解析成监听器逻辑和事件发送逻辑。
EventListenerMethodProcessor负责将@EventListener注解的方法解析封装成ApplicationListener,看其核心方法processBean:
代码语言:javascript复制private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) && !isSpringContainerClass(targetType)) {
Map<Method, EventListener> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
}
catch (Throwable ex) {
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
this.nonAnnotatedClasses.add(targetType);
}
else {
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
context.addApplicationListener(applicationListener);
break;
}
}
}
}
}
}
解析到@EventListener注解的方法后,DefaultEventListenerFactory将其封装成ApplicationListenerMethodAdapter(实现了ApplicationListener接口),然后放到ApplicationContext容器中备用.
接着看一下事件发布逻辑,ApplicationEventPublisher直接调用了AbstractApplicationContext的pulishEvent方法:
代码语言:javascript复制 @Override
public void publishEvent(ApplicationEvent event) {
publishEvent(event, null);
}
继续看重载方法publishEvent:
代码语言:javascript复制protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
//1.发布事件
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// 2.父容器发布事件
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
代码中标注了两个关键点,当前容器发布事件和如果有父容器也同样发布事件(处理逻辑类似),继续看发布事件逻辑,ApplicationEventMulticaster接口只有一个默认的实现SimpleApplicationEventMulticaster:
代码语言:javascript复制@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
这里逻辑也不复杂,解析事件类型,然后根据事件类型从容器中获取监听器列表,然后逐个调用监听器的onApplicationEvent方法,需要注意的是如果有线程池就放到线程池中执行.
3.优缺点
优点: 接入简单,设计成熟,可以根据事件类型将监听器归类,支持注解
缺点: 异步化支持不好,如果要支持异步要开启另外一个套件@EnableAsync
4.改进
既然提到改进,那按照我的使用习惯,我并不想为了支持异步化然后又在应用维度开启一个全局化的能力@EnableAsync,那就改成自定义实现,用一个异步化事件监听器 自定义线程池解决.
新增异步监听器注解:
代码语言:javascript复制@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncEventListener {
@AliasFor("classes")
Class<?>[] value() default {};
@AliasFor("value")
Class<?>[] classes() default {};
String condition() default "";
}
新增异步事件监听适配器(无法复用ApplicationListenerMethodAdapter):
代码语言:javascript复制@Slf4j
public class AsyncApplicationListenerMethodAdapter implements GenericApplicationListener {
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
private final String beanName;
private final Method method;
private final Method targetMethod;
private final AnnotatedElementKey methodKey;
private final List<ResolvableType> declaredEventTypes;
@Nullable
private final String condition;
private final int order;
@Nullable
private ApplicationContext applicationContext;
@Nullable
private EventExpressionEvaluator evaluator;
public AsyncApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {
this.beanName = beanName;
this.method = BridgeMethodResolver.findBridgedMethod(method);
this.targetMethod = (!Proxy.isProxyClass(targetClass) ?
AopUtils.getMostSpecificMethod(method, targetClass) : this.method);
this.methodKey = new AnnotatedElementKey(this.targetMethod, targetClass);
AsyncEventListener ann = AnnotatedElementUtils.findMergedAnnotation(this.targetMethod, AsyncEventListener.class);
this.declaredEventTypes = resolveDeclaredEventTypes(method, ann);
this.condition = (ann != null ? ann.condition() : null);
this.order = resolveOrder(this.targetMethod);
}
private static List<ResolvableType> resolveDeclaredEventTypes(Method method, @Nullable AsyncEventListener ann) {
int count = method.getParameterCount();
if (count > 1) {
throw new IllegalStateException(
"Maximum one parameter is allowed for event listener method: " method);
}
if (ann != null) {
Class<?>[] classes = ann.classes();
if (classes.length > 0) {
List<ResolvableType> types = new ArrayList<>(classes.length);
for (Class<?> eventType : classes) {
types.add(ResolvableType.forClass(eventType));
}
return types;
}
}
if (count == 0) {
throw new IllegalStateException(
"Event parameter is mandatory for event listener method: " method);
}
return Collections.singletonList(ResolvableType.forMethodParameter(method, 0));
}
void init(ApplicationContext applicationContext, EventExpressionEvaluator evaluator) {
this.applicationContext = applicationContext;
this.evaluator = evaluator;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
processEvent(event);
}
public void processEvent(ApplicationEvent event) {
Object[] args = resolveArguments(event);
if (shouldHandle(event, args)) {
Executor executor = this.getDefaultExecutor(applicationContext);
executor.execute(() -> {
log.info("AsyncApplicationListenerMethodAdapter.processEvent async execute current thread={}",Thread.currentThread().getName());
Object result = doInvoke(args);
if (result != null) {
handleResult(result);
}
else {
log.trace("No result object given - no result to handle");
}
});
}
}
protected void handleResult(Object result) {
if (result.getClass().isArray()) {
Object[] events = ObjectUtils.toObjectArray(result);
for (Object event : events) {
publishEvent(event);
}
}
else if (result instanceof Collection<?>) {
Collection<?> events = (Collection<?>) result;
for (Object event : events) {
publishEvent(event);
}
}
else {
publishEvent(result);
}
}
private void publishEvent(@Nullable Object event) {
if (event != null) {
Assert.notNull(this.applicationContext, "ApplicationContext must not be null");
this.applicationContext.publishEvent(event);
}
}
}
分析一下processEvent方法,先调用getDefaultExecutor获取Executor然后执行监听器逻辑,看一下getDefaultExecutor实现:
代码语言:javascript复制 protected Executor getDefaultExecutor(@Nullable ApplicationContext applicationContext) {
if (applicationContext != null) {
try {
return applicationContext.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
log.warn("Could not find unique TaskExecutor bean", ex);
try {
return applicationContext.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
}
}
catch (NoSuchBeanDefinitionException ex) {
log.debug("Could not find default TaskExecutor bean", ex);
try {
return applicationContext.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
log.info("No task executor bean found for async processing: "
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return ThreadPoolUtil.COMMON_POOL;
}
看过spring异步编程源码的看到这里会很熟悉,这里借用了其实现思想和逻辑,优先从容器中获取TaskExecutor,如果没有根据指定的名称获取,如果最后还获取不到线程池,那么返回指定的兜底线程池来做异步逻辑.
新建异步事件监听器工厂:
代码语言:javascript复制public class AsyncEventListenerFactory implements EventListenerFactory, Ordered {
private int order = LOWEST_PRECEDENCE;
public void setOrder(int order) {
this.order = order;
}
@Override
public int getOrder() {
return this.order;
}
public boolean supportsMethod(Method method) {
return AnnotatedElementUtils.hasAnnotation(method, AsyncEventListener.class);
}
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new AsyncApplicationListenerMethodAdapter(beanName, type, method);
}
}
新增异步EventListenerMethodProcessor用于解析@AsyncEventListener.
代码语言:javascript复制public class AsyncEventListenerMethodProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) && !isSpringContainerClass(targetType)) {
Map<Method, AsyncEventListener> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<AsyncEventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, AsyncEventListener.class));
}
catch (Throwable ex) {
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
this.nonAnnotatedClasses.add(targetType);
}
else {
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if ( factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof AsyncApplicationListenerMethodAdapter) {
((AsyncApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
context.addApplicationListener(applicationListener);
break;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() " @EventListener methods processed on bean '"
beanName "': " annotatedMethods);
}
}
}
}
}
解析@AsyncEventListener注解的方法并封装成AsyncApplicationListenerMethodAdapter添加到容器中.
是骡子是马拉出来遛遛,使用@AsyncEventListener注解方法:
代码语言:javascript复制@Component
@Slf4j
public class Test2SwitchStateListener {
@AsyncEventListener
protected void switchStateListener(TestSwitchStateEvent event) {
log.info("Test2SwitchStateListener.switchStateListener receive event={}",event);
}
}
执行成功并且开启了新线程异步执行.
七、总结
本篇介绍了事件驱动编程和几种常见的事件编程实现,对于设计和实现维度来说,guava的事件总线和spring事件驱动都比较成熟,功能比较完善,能够满足大部分业务场景,对于使用spring全家桶的应用可以直接使用spring事件驱动编程,其他情况下也没有严格意义上的孰优孰劣之分,看个人和团队使用习惯.
简单总结一下,事件驱动有三个重要概念:事件、事件发布者和事件监听者,事件驱动解决的是应用内部业务解耦,实现的时候注意要做异步化.