序
本文主要研究一下spring的TransactionSynchronizationAdapter
示例代码
代码语言:javascript复制public void insert(TechBook techBook){
bookMapper.insert(techBook);
// send after tx commit but is async
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
System.out.println("send email after transaction commit...");
}
}
);
System.out.println("service end");
}
使用TransactionSynchronizationManager.registerSynchronization注册了一个TransactionSynchronizationAdapter,在其afterCommit方法也就是事务提交成功之后执行一些额外逻辑
TransactionSynchronizationAdapter
org/springframework/transaction/support/TransactionSynchronizationAdapter.java
代码语言:javascript复制public abstract class TransactionSynchronizationAdapter implements TransactionSynchronization, Ordered {
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void suspend() {
}
@Override
public void resume() {
}
@Override
public void flush() {
}
@Override
public void beforeCommit(boolean readOnly) {
}
@Override
public void beforeCompletion() {
}
@Override
public void afterCommit() {
}
@Override
public void afterCompletion(int status) {
}
}
TransactionSynchronizationAdapter是个抽象类,声明实现TransactionSynchronization及Ordered接口
TransactionSynchronization
org/springframework/transaction/support/TransactionSynchronization.java
代码语言:javascript复制 /**
* Invoked after transaction commit. Can perform further operations right
* <i>after</i> the main transaction has <i>successfully</i> committed.
* <p>Can e.g. commit further operations that are supposed to follow on a successful
* commit of the main transaction, like confirmation messages or emails.
* <p><b>NOTE:</b> The transaction will have been committed already, but the
* transactional resources might still be active and accessible. As a consequence,
* any data access code triggered at this point will still "participate" in the
* original transaction, allowing to perform some cleanup (with no commit following
* anymore!), unless it explicitly declares that it needs to run in a separate
* transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW} for any
* transactional operation that is called from here.</b>
* @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
* (note: do not throw TransactionException subclasses here!)
*/
default void afterCommit() {
}
注意这里注释说明了异常不会被捕获,而且建议不在这里抛出TransactionException的子类;另外对于afterCommit有数据库相关操作的建议使用PROPAGATION_REQUIRES_NEW这个事务传播级别,不然afterCommit的操作可能不会生效
registerSynchronization
org/springframework/transaction/support/TransactionSynchronizationManager.java
代码语言:javascript复制
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
/**
* Register a new transaction synchronization for the current thread.
* Typically called by resource management code.
* <p>Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
TransactionSynchronizationManager的registerSynchronization方法会把TransactionSynchronization注册到当前线程的synchronizations
processCommit
org/springframework/transaction/support/AbstractPlatformTransactionManager.java
代码语言:javascript复制private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering afterCommit synchronization");
}
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
AbstractPlatformTransactionManager的processCommit方法,在提交成功之后触发triggerAfterCommit,这里调用了TransactionSynchronizationUtils.triggerAfterCommit(),注意这里没有try catch,说明triggerAfterCommit的异常最终会抛给调用方
triggerAfterCommit
org/springframework/transaction/support/TransactionSynchronizationUtils.java
代码语言:javascript复制 public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
这里遍历synchronizations执行afterCommit方法,如果其中有一个有异常抛出则中断
小结
使用TransactionSynchronizationManager.registerSynchronization可以在当前线程的事务注册一个TransactionSynchronizationAdapter,可以在afterCommit方法也就是事务提交成功之后执行一些额外逻辑;注意这里抛出的异常不影响事务提交,但是异常不会被catch需要由调用方处理,对于afterCommit有数据库相关操作的建议使用PROPAGATION_REQUIRES_NEW这个事务传播级别,不然afterCommit的db操作可能不会生效。
在事务提交之后做一些事情可能不需要TransactionSynchronizationManager.registerSynchronization这种方式也能实现,也就是需要额外一层来调用事务操作,有异常会抛出,没有异常则执行事务提交之后的事情,前提就是事务回滚异常不能被吞掉,不然外层调用可能以为事务成功了 还有一种方式就是使用TransactionalEventListener,这种方式比TransactionSynchronizationManager.registerSynchronization更为优雅一些
doc
- 如何在数据库事务提交成功后进行异步操作
- Spring事务aftercommit原理及实践