TransactionalEventListener使用场景与原理分析

2021-12-07 23:14:16 浏览数 (1)

一、背景

开发中有这样一个场景,客服业务需要接入在线能力,基于其他团队的IM底层能力构建业务层能力,也就是需要先调二方restful服务创建群聊,然后调用本地服务创建会话,并且创建会话依赖于二方服务返回的群聊信息,那么就会出现本地服务异常回滚,但是二方服务已经调用成功的情况,如果不做处理那么下次再尝试创建群聊,用户id已经存在,创建不成功,考虑到异构服务(二方服务可能是java、C 或者其他)或者异构数据(mysql、TiDB等), 分布式事务并不是一个很好的选择,这个时候我们就可以考虑在产生异常时候手动回滚二方服务的方式。

二、案例分析

今天我们要描述的是使用TransactionalEventListener来做业务补偿,TransactionalEventListener本质上是一个EventListener,依赖于Spring事件体系的支撑,我们要做的就是优先调用二方服务并返回结果,如果二方服务异常,流程终止不执行本地业务,如果二方服务正常,执行本地业务,如果本地执行成功,整个流程执行成功,如果本地执行异常,本地数据回滚,然后发出异常事件,由TransactionalEventListener执行二方数据的手动回滚或者订正。大致流程如下:

业务代码:

代码语言:javascript复制
@Transactional
public void createSession() {
    //1.创建群
    String groupId = restful api.createGroup
    //2.发布补偿事件
    this.applicationEventPublisher.publishEvent();
    //3.创建会话
    this.createSession(groupId);   
}

首先调用二方服务,然后发送补偿事件,最后调用本地服务,2和3的顺序不能颠倒,否则会导致异常终止事件发送不出去。

TransactionalEventListener事件补偿:

代码语言:javascript复制
@Slf4j
@Component
public class CreateSessionFailedListener {

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void onRollbackEvent(CreateSessionFailedEvent event) {
        try {
            restful api补偿
        } catch (Exception e) {
            log.error("onRollbackEvent occur error;event={}",event,e);
        }
    }
}

使用TransactionalEventListener注解写事务监听器,并且监听的时机是异常回滚,也就是本地事务出现异常回滚后触发该事件监听。

这样就能实现二方服务执行成功后,本地事务回滚,然后补偿订正二方服务数据了。当然也可以在事务的上层调用方捕获并识别异常,然后根据需要决定是否需要补偿。

三、源码&原理解析

1.监听器初始化

@TransactionalEventListener注解肯定和事务相关,那么我们就从springboot开启事务注解的地方开始分析,先看EnableTransactionManagement:

代码语言:javascript复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
    ...
}

TransactionalEventListener导入了选择器TransactionManagementConfigurationSelector:

代码语言:javascript复制
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
  @Override
  protected String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
      case PROXY:
        return new String[] {AutoProxyRegistrar.class.getName(),
            ProxyTransactionManagementConfiguration.class.getName()};
      case ASPECTJ:
        return new String[] {determineTransactionAspectClass()};
      default:
        return null;
    }
  }
}

默认是Proxy代理,会引入AutoProxyRegistrar和ProxyTransactionManagementConfiguration,前者我们在spring cache原理解析中已经分析过,直接看ProxyTransactionManagementConfiguration:

代码语言:javascript复制
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

  @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
    BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
    advisor.setTransactionAttributeSource(transactionAttributeSource());
    advisor.setAdvice(transactionInterceptor());
    if (this.enableTx != null) {
      advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
    }
    return advisor;
  }

  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionAttributeSource transactionAttributeSource() {
    return new AnnotationTransactionAttributeSource();
  }

  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionInterceptor transactionInterceptor() {
    TransactionInterceptor interceptor = new TransactionInterceptor();
    interceptor.setTransactionAttributeSource(transactionAttributeSource());
    if (this.txManager != null) {
      interceptor.setTransactionManager(this.txManager);
    }
    return interceptor;
  }

}

ProxyTransactionManagementConfiguration定义了几个基础设施类,来实现事务逻辑织入,在之前的篇幅中已经不止一次分析过,此处不在赘述,我们看一下期继承关系:

ProxyTransactionManagementConfiguration继承了AbstractTransactionManagementConfiguration

类,该类定义了一个bean:

代码语言:javascript复制
@Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
  return new TransactionalEventListenerFactory();
}

该bean所在类定义了创建事件监听器的方法:

代码语言:javascript复制
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {

  @Override
  public boolean supportsMethod(Method method) {
    return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
  }
  @Override
  public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
    return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
  }
}

该方法根据用户使用@TransactionalEventListener注解的方法创建时间监听器代理,在应用启动的时候EventListenerMethodProcessor中调用,其原理在另外一片文章《事件驱动编程》中也分析过,然后我们看一下创建的监听器代理的实现ApplicationListenerMethodTransactionalAdapter:

代码语言:javascript复制
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {

  private final TransactionalEventListener annotation;

  public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
    super(beanName, targetClass, method);
    TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
    if (ann == null) {
      throw new IllegalStateException("No TransactionalEventListener annotation found on method: "   method);
    }
    this.annotation = ann;
  }

  @Override
  public void onApplicationEvent(ApplicationEvent event) {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
      TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
      TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
    }
    else if (this.annotation.fallbackExecution()) {
      if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
        logger.warn("Processing "   event   " as a fallback execution on AFTER_ROLLBACK phase");
      }
      processEvent(event);
    }
    else {
      // No transactional event execution at all
      if (logger.isDebugEnabled()) {
        logger.debug("No transaction is active - skipping "   event);
      }
    }
  }
}

监听器逻辑调用会调用onApplicationEvent方法,这一段逻辑比较巧妙,首先检查当前上下文是否在事务中,如果是则把监听器逻辑注册到事务同步器中,等待后续事务执行过程指定节点触发,如果没有在事务中则立即触发事件监听逻辑。

事件工厂初始化:

事务事件监听器初始化:

2.事务事件调用与监听器触发

在spring体系中我们可以直接注入事件发布器来发布事件:

代码语言:javascript复制
@Autowired
protected ApplicationEventPublisher applicationEventPublisher;

看一下ApplicationEventPublisher定义:

代码语言:javascript复制
@FunctionalInterface
public interface ApplicationEventPublisher {
  /**
   * Notify all <strong>matching</strong> listeners registered with this
   * application of an application event. Events may be framework events
   * (such as RequestHandledEvent) or application-specific events.
   * @param event the event to publish
   * @see org.springframework.web.context.support.RequestHandledEvent
   */
  default void publishEvent(ApplicationEvent event) {
    publishEvent((Object) event);
  }
  /**
   * Notify all <strong>matching</strong> listeners registered with this
   * application of an event.
   * <p>If the specified {@code event} is not an {@link ApplicationEvent},
   * it is wrapped in a {@link PayloadApplicationEvent}.
   * @param event the event to publish
   * @since 4.2
   * @see PayloadApplicationEvent
   */
  void publishEvent(Object event);
}

调用ApplicationEventPublisher#publishEvent会调用AbstractApplicationContext#publishEvent:

代码语言:javascript复制
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
  Assert.notNull(event, "Event must not be null");
  // Decorate event as an ApplicationEvent if necessary
  ApplicationEvent applicationEvent;
  if (event instanceof ApplicationEvent) {
    applicationEvent = (ApplicationEvent) event;
  }
  else {
    applicationEvent = new PayloadApplicationEvent<>(this, event);
    if (eventType == null) {
      eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
    }
  }
  // Multicast right now if possible - or lazily once the multicaster is initialized
  if (this.earlyApplicationEvents != null) {
    this.earlyApplicationEvents.add(applicationEvent);
  }
  else {
    getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
  }

  // Publish event via parent context as well...
  if (this.parent != null) {
    if (this.parent instanceof AbstractApplicationContext) {
      ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
    }
    else {
      this.parent.publishEvent(event);
    }
  }
}

这里是一个递归调用,当前上下文先发布事件,然后递归找父上下文发布事件,最终会调用SimpleApplicationEventMulticaster#multicastEvent来发布事件:

代码语言:javascript复制
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
  ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
    Executor executor = getTaskExecutor();
    if (executor != null) {
      executor.execute(() -> invokeListener(listener, event));
    }
    else {
      invokeListener(listener, event);
    }
  }
}

这里是从上下文中先获取监听器集合,然后如果有任务执行器就调用任务执行器执行监听器逻辑(多线程),否则当前线程调用监听器逻辑,然后看invokeListener实现:

代码语言:javascript复制
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
  ErrorHandler errorHandler = getErrorHandler();
  if (errorHandler != null) {
    try {
      doInvokeListener(listener, event);
    }
    catch (Throwable err) {
      errorHandler.handleError(err);
    }
  }
  else {
    doInvokeListener(listener, event);
  }
}

继续看doInvokeListener:

代码语言:javascript复制
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
  try {
    listener.onApplicationEvent(event);
  }
  catch (ClassCastException ex) {
    String msg = ex.getMessage();
    if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
      // Possibly a lambda-defined listener which we could not resolve the generic event type for
      // -> let's suppress the exception and just log a debug message.
      Log logger = LogFactory.getLog(getClass());
      if (logger.isDebugEnabled()) {
        logger.debug("Non-matching event type for listener: "   listener, ex);
      }
    }
    else {
      throw ex;
    }
  }
}

到这里就比较清晰了,我们自定义事件监听器都实现了ApplicationListener接口,此处会调用监听器的onApplicationEvent方法执行自定义逻辑。

然后我们回顾一下事务事件监听器适配器实现:

代码语言:javascript复制
@Override
public void onApplicationEvent(ApplicationEvent event) {
  if (TransactionSynchronizationManager.isSynchronizationActive()) {
    TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
    TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
  }
  else if (this.annotation.fallbackExecution()) {
    if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
      logger.warn("Processing "   event   " as a fallback execution on AFTER_ROLLBACK phase");
    }
    processEvent(event);
  }
  else {
    // No transactional event execution at all
    if (logger.isDebugEnabled()) {
      logger.debug("No transaction is active - skipping "   event);
    }
  }
}

当上一步doInvokeListener调用到ApplicationListenerMethodTransactionalAdapter#onApplicationEvent的时候,如果检测到当前上下文有活跃的事务,那么就把监听器逻辑注册到事务中,等到事务执行到指定的节点触发监听器逻辑,否则如果检测到TransactionalEventListener.fallbackExecution属性为true(如果没有事务,是否处理事件),则直接调用处理事件逻辑,否则返回调用。

我们暂且理解为当前逻辑在事务中,先创建事务同步逻辑:

代码语言:javascript复制
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
    ApplicationEvent event, TransactionPhase phase) {

  this.listener = listener;
  this.event = event;
  this.phase = phase;
}

包含了事件监听器,事件类型和事务事件触发阶段。然后调用事件同步管理器把事件同步逻辑注册到事务中:

代码语言:javascript复制
public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException {

  Assert.notNull(synchronization, "TransactionSynchronization must not be null");
  if (!isSynchronizationActive()) {
    throw new IllegalStateException("Transaction synchronization is not active");
  }
  synchronizations.get().add(synchronization);
}

我们把事务事件监听器执行的此阶段叫做注册阶段,用时序图更清晰的分析一下其逻辑:

那事件事务监听器逻辑注册到事务生命周期成功了,什么时候触发呢?那就要回到ProxyTransactionManagementConfiguration的TransactionInterceptor了,加事务注解的逻辑执行的时候会被TransactionInterceptor拦截到,然后执行invoke逻辑:

代码语言:javascript复制
public Object invoke(MethodInvocation invocation) throws Throwable {
  // Work out the target class: may be {@code null}.
  // The TransactionAttributeSource should be passed the target class
  // as well as the method, which may be from an interface.
  Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

  // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

在invokeWithinTransaction逻辑中会调用commitTransactionAfterReturning方法:

代码语言:javascript复制
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
  if (txInfo != null && txInfo.getTransactionStatus() != null) {
    if (logger.isTraceEnabled()) {
      logger.trace("Completing transaction for ["   txInfo.getJoinpointIdentification()   "]");
    }
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

然后会调用事务管理器执行事务状态的提交逻辑:

代码语言:javascript复制
@Override
public final void commit(TransactionStatus status) throws TransactionException {
  if (status.isCompleted()) {
    throw new IllegalTransactionStateException(
        "Transaction is already completed - do not call commit or rollback more than once per transaction");
  }
  DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  if (defStatus.isLocalRollbackOnly()) {
    if (defStatus.isDebug()) {
      logger.debug("Transactional code has requested rollback");
    }
    processRollback(defStatus, false);
    return;
  }
  if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    if (defStatus.isDebug()) {
      logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    }
    processRollback(defStatus, true);
    return;
  }
  processCommit(defStatus);
}

如果事务已结束,异常终止,如果事务需要回滚则执行processRollback,否则执行processCommit提交,我们继续看processRollback回滚逻辑:

代码语言:javascript复制
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
  try {
    boolean unexpectedRollback = unexpected;
    try {
            //1.处理事务提交前事件监听逻辑
      triggerBeforeCompletion(status);
            //回滚逻辑
            ...      
    }
    catch (RuntimeException | Error ex) {
      triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
      throw ex;
    }
    //2触发事务提交后监听逻辑
    triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    // Raise UnexpectedRollbackException if we had a global rollback-only marker
    if (unexpectedRollback) {
      throw new UnexpectedRollbackException(
          "Transaction rolled back because it has been marked as rollback-only");
    }
  }
  finally {
    cleanupAfterCompletion(status);
  }
}

看一下triggerAfterCompletion实现:

代码语言:javascript复制
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
  if (status.isNewSynchronization()) {
    List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
    TransactionSynchronizationManager.clearSynchronization();
    if (!status.hasTransaction() || status.isNewTransaction()) {
      if (status.isDebug()) {
        logger.trace("Triggering afterCompletion synchronization");
      }
      // No transaction or new transaction for the current scope ->
      // invoke the afterCompletion callbacks immediately
      invokeAfterCompletion(synchronizations, completionStatus);
    }
    else if (!synchronizations.isEmpty()) {
      // Existing transaction that we participate in, controlled outside
      // of the scope of this Spring transaction manager -> try to register
      // an afterCompletion callback with the existing (JTA) transaction.
      registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
    }
  }
}

如果当前阶段没有事务或者新事务则执行后置回调逻辑invokeAfterCompletion:

代码语言:javascript复制
protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
  TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
}

然后调用TransactionSynchronizationUtils#invokeAfterCompletion方法:

代码语言:javascript复制
public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
    int completionStatus) {

  if (synchronizations != null) {
    for (TransactionSynchronization synchronization : synchronizations) {
      try {
        synchronization.afterCompletion(completionStatus);
      }
      catch (Throwable tsex) {
        logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
      }
    }
  }
}

获取到注册到当前事务的事件列表并执行,前边我们注册的是TransactionSynchronizationEventAdapter,直接看其afterCompletion实现:

代码语言:javascript复制
@Override
public void afterCompletion(int status) {
  if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
    processEvent();
  }
  else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
    processEvent();
  }
  else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
    processEvent();
  }
}

从@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)可以看出我们注册事件监听的截断是TransactionPhase.AFTER_ROLLBACK,逻辑会进入第二个分支调用processEvent方法:

代码语言:javascript复制
protected void processEvent() {
  this.listener.processEvent(this.event);
}

到这里就执行到我们自定义监听器的逻辑了,也用时序图来清晰的描述事务事件的触发时机和逻辑:

总结

我们本篇从使用和源码角度分别分析了TransactionalEventListener使用方式和实现原理,可以得出以下几个结论:

  • TransactionalEventListener本质上是EventListener,依托于spring事件体系支持
  • TransactionalEventListener从注册到触发依赖于事务管理器和事务的生命周期
  • TransactionalEventListener适用于在事务的生命周期中特定节点做一些前置逻辑和后置补偿

对于一个事务涉及到本地和二方服务调用的场景,并且本地业务的执行依赖二方服务的结果,在本地服务出现异常发生回滚的时候,可以使用事务事件监听来做逻辑解耦和数据补偿,并且这种方式更优雅和简单。

0 人点赞