一、背景
开发中有这样一个场景,客服业务需要接入在线能力,基于其他团队的IM底层能力构建业务层能力,也就是需要先调二方restful服务创建群聊,然后调用本地服务创建会话,并且创建会话依赖于二方服务返回的群聊信息,那么就会出现本地服务异常回滚,但是二方服务已经调用成功的情况,如果不做处理那么下次再尝试创建群聊,用户id已经存在,创建不成功,考虑到异构服务(二方服务可能是java、C 或者其他)或者异构数据(mysql、TiDB等), 分布式事务并不是一个很好的选择,这个时候我们就可以考虑在产生异常时候手动回滚二方服务的方式。
二、案例分析
今天我们要描述的是使用TransactionalEventListener来做业务补偿,TransactionalEventListener本质上是一个EventListener,依赖于Spring事件体系的支撑,我们要做的就是优先调用二方服务并返回结果,如果二方服务异常,流程终止不执行本地业务,如果二方服务正常,执行本地业务,如果本地执行成功,整个流程执行成功,如果本地执行异常,本地数据回滚,然后发出异常事件,由TransactionalEventListener执行二方数据的手动回滚或者订正。大致流程如下:
业务代码:
代码语言:javascript复制@Transactional
public void createSession() {
//1.创建群
String groupId = restful api.createGroup
//2.发布补偿事件
this.applicationEventPublisher.publishEvent();
//3.创建会话
this.createSession(groupId);
}
首先调用二方服务,然后发送补偿事件,最后调用本地服务,2和3的顺序不能颠倒,否则会导致异常终止事件发送不出去。
TransactionalEventListener事件补偿:
代码语言:javascript复制@Slf4j
@Component
public class CreateSessionFailedListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void onRollbackEvent(CreateSessionFailedEvent event) {
try {
restful api补偿
} catch (Exception e) {
log.error("onRollbackEvent occur error;event={}",event,e);
}
}
}
使用TransactionalEventListener注解写事务监听器,并且监听的时机是异常回滚,也就是本地事务出现异常回滚后触发该事件监听。
这样就能实现二方服务执行成功后,本地事务回滚,然后补偿订正二方服务数据了。当然也可以在事务的上层调用方捕获并识别异常,然后根据需要决定是否需要补偿。
三、源码&原理解析
1.监听器初始化
@TransactionalEventListener注解肯定和事务相关,那么我们就从springboot开启事务注解的地方开始分析,先看EnableTransactionManagement:
代码语言:javascript复制@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
...
}
TransactionalEventListener导入了选择器TransactionManagementConfigurationSelector:
代码语言:javascript复制public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
}
默认是Proxy代理,会引入AutoProxyRegistrar和ProxyTransactionManagementConfiguration,前者我们在spring cache原理解析中已经分析过,直接看ProxyTransactionManagementConfiguration:
代码语言:javascript复制@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource());
advisor.setAdvice(transactionInterceptor());
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor() {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource());
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
ProxyTransactionManagementConfiguration定义了几个基础设施类,来实现事务逻辑织入,在之前的篇幅中已经不止一次分析过,此处不在赘述,我们看一下期继承关系:
ProxyTransactionManagementConfiguration继承了AbstractTransactionManagementConfiguration
类,该类定义了一个bean:
代码语言:javascript复制@Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
return new TransactionalEventListenerFactory();
}
该bean所在类定义了创建事件监听器的方法:
代码语言:javascript复制public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
@Override
public boolean supportsMethod(Method method) {
return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
}
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}
}
该方法根据用户使用@TransactionalEventListener注解的方法创建时间监听器代理,在应用启动的时候EventListenerMethodProcessor中调用,其原理在另外一片文章《事件驱动编程》中也分析过,然后我们看一下创建的监听器代理的实现ApplicationListenerMethodTransactionalAdapter:
代码语言:javascript复制class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
private final TransactionalEventListener annotation;
public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
super(beanName, targetClass, method);
TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
if (ann == null) {
throw new IllegalStateException("No TransactionalEventListener annotation found on method: " method);
}
this.annotation = ann;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " event " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " event);
}
}
}
}
监听器逻辑调用会调用onApplicationEvent方法,这一段逻辑比较巧妙,首先检查当前上下文是否在事务中,如果是则把监听器逻辑注册到事务同步器中,等待后续事务执行过程指定节点触发,如果没有在事务中则立即触发事件监听逻辑。
事件工厂初始化:
事务事件监听器初始化:
2.事务事件调用与监听器触发
在spring体系中我们可以直接注入事件发布器来发布事件:
代码语言:javascript复制@Autowired
protected ApplicationEventPublisher applicationEventPublisher;
看一下ApplicationEventPublisher定义:
代码语言:javascript复制@FunctionalInterface
public interface ApplicationEventPublisher {
/**
* Notify all <strong>matching</strong> listeners registered with this
* application of an application event. Events may be framework events
* (such as RequestHandledEvent) or application-specific events.
* @param event the event to publish
* @see org.springframework.web.context.support.RequestHandledEvent
*/
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
/**
* Notify all <strong>matching</strong> listeners registered with this
* application of an event.
* <p>If the specified {@code event} is not an {@link ApplicationEvent},
* it is wrapped in a {@link PayloadApplicationEvent}.
* @param event the event to publish
* @since 4.2
* @see PayloadApplicationEvent
*/
void publishEvent(Object event);
}
调用ApplicationEventPublisher#publishEvent会调用AbstractApplicationContext#publishEvent:
代码语言:javascript复制protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
这里是一个递归调用,当前上下文先发布事件,然后递归找父上下文发布事件,最终会调用SimpleApplicationEventMulticaster#multicastEvent来发布事件:
代码语言:javascript复制@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
这里是从上下文中先获取监听器集合,然后如果有任务执行器就调用任务执行器执行监听器逻辑(多线程),否则当前线程调用监听器逻辑,然后看invokeListener实现:
代码语言:javascript复制protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
继续看doInvokeListener:
代码语言:javascript复制private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " listener, ex);
}
}
else {
throw ex;
}
}
}
到这里就比较清晰了,我们自定义事件监听器都实现了ApplicationListener接口,此处会调用监听器的onApplicationEvent方法执行自定义逻辑。
然后我们回顾一下事务事件监听器适配器实现:
代码语言:javascript复制@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " event " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " event);
}
}
}
当上一步doInvokeListener调用到ApplicationListenerMethodTransactionalAdapter#onApplicationEvent的时候,如果检测到当前上下文有活跃的事务,那么就把监听器逻辑注册到事务中,等到事务执行到指定的节点触发监听器逻辑,否则如果检测到TransactionalEventListener.fallbackExecution属性为true(如果没有事务,是否处理事件),则直接调用处理事件逻辑,否则返回调用。
我们暂且理解为当前逻辑在事务中,先创建事务同步逻辑:
代码语言:javascript复制public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
ApplicationEvent event, TransactionPhase phase) {
this.listener = listener;
this.event = event;
this.phase = phase;
}
包含了事件监听器,事件类型和事务事件触发阶段。然后调用事件同步管理器把事件同步逻辑注册到事务中:
代码语言:javascript复制public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}
我们把事务事件监听器执行的此阶段叫做注册阶段,用时序图更清晰的分析一下其逻辑:
那事件事务监听器逻辑注册到事务生命周期成功了,什么时候触发呢?那就要回到ProxyTransactionManagementConfiguration的TransactionInterceptor了,加事务注解的逻辑执行的时候会被TransactionInterceptor拦截到,然后执行invoke逻辑:
代码语言:javascript复制public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
在invokeWithinTransaction逻辑中会调用commitTransactionAfterReturning方法:
代码语言:javascript复制protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" txInfo.getJoinpointIdentification() "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
然后会调用事务管理器执行事务状态的提交逻辑:
代码语言:javascript复制@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
如果事务已结束,异常终止,如果事务需要回滚则执行processRollback,否则执行processCommit提交,我们继续看processRollback回滚逻辑:
代码语言:javascript复制private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
//1.处理事务提交前事件监听逻辑
triggerBeforeCompletion(status);
//回滚逻辑
...
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
//2触发事务提交后监听逻辑
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
看一下triggerAfterCompletion实现:
代码语言:javascript复制private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
if (status.isNewSynchronization()) {
List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
TransactionSynchronizationManager.clearSynchronization();
if (!status.hasTransaction() || status.isNewTransaction()) {
if (status.isDebug()) {
logger.trace("Triggering afterCompletion synchronization");
}
// No transaction or new transaction for the current scope ->
// invoke the afterCompletion callbacks immediately
invokeAfterCompletion(synchronizations, completionStatus);
}
else if (!synchronizations.isEmpty()) {
// Existing transaction that we participate in, controlled outside
// of the scope of this Spring transaction manager -> try to register
// an afterCompletion callback with the existing (JTA) transaction.
registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
}
}
}
如果当前阶段没有事务或者新事务则执行后置回调逻辑invokeAfterCompletion:
代码语言:javascript复制protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
}
然后调用TransactionSynchronizationUtils#invokeAfterCompletion方法:
代码语言:javascript复制public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
int completionStatus) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
try {
synchronization.afterCompletion(completionStatus);
}
catch (Throwable tsex) {
logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
}
}
}
}
获取到注册到当前事务的事件列表并执行,前边我们注册的是TransactionSynchronizationEventAdapter,直接看其afterCompletion实现:
代码语言:javascript复制@Override
public void afterCompletion(int status) {
if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
processEvent();
}
}
从@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)可以看出我们注册事件监听的截断是TransactionPhase.AFTER_ROLLBACK,逻辑会进入第二个分支调用processEvent方法:
代码语言:javascript复制protected void processEvent() {
this.listener.processEvent(this.event);
}
到这里就执行到我们自定义监听器的逻辑了,也用时序图来清晰的描述事务事件的触发时机和逻辑:
总结
我们本篇从使用和源码角度分别分析了TransactionalEventListener使用方式和实现原理,可以得出以下几个结论:
- TransactionalEventListener本质上是EventListener,依托于spring事件体系支持
- TransactionalEventListener从注册到触发依赖于事务管理器和事务的生命周期
- TransactionalEventListener适用于在事务的生命周期中特定节点做一些前置逻辑和后置补偿
对于一个事务涉及到本地和二方服务调用的场景,并且本地业务的执行依赖二方服务的结果,在本地服务出现异常发生回滚的时候,可以使用事务事件监听来做逻辑解耦和数据补偿,并且这种方式更优雅和简单。