我们看看 Spring 中的事务处理的代码,使用 Spring 管理事务有声明式和编程式两种方式,声明式事务处理通过 AOP 的实现把事物管理代码作为方面封装来横向插入到业务代码中,使得事务管理代码和业务代码解藕。在这种方式我们结合 IoC 容器和 Spirng 已有的FactoryBean 来对事务管理进行属性配置,比如传播行为,隔离级别等。其中最简单的方式就是通过配置 TransactionProxyFactoryBean来实现声明式事物;在整个源代码分析中,我们可以大致可以看到 Spring 实现声明式事物管理有这么几个部分:
- 对在上下文中配置的属性的处理,这里涉及的类是 TransactionAttributeSourceAdvisor,这是一个通知器,用它来对属性值进行处理,属性信息放在 TransactionAttribute 中来使用,而这些属性的处理往往是和对切入点的处理是结合起来的。对属性的处理放在类TransactionAttributeSource 中完成。
- 创建事物的过程,这个过程是委托给具体的事物管理器来创建的,但 Spring 通过 TransactionStatus 来传递相关的信息。
- 对事物的处理通过对相关信息的判断来委托给具体的事物管理器完成。
我们下面看看具体的实现,在 TransactionFactoryBean 中:
代码语言:javascript复制 1public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements FactoryBean, BeanFactoryAware {
2 //这里是 Spring 事务处理而使用的 AOP 拦截器,中间封装了 Spring 对事务处理的代码来支持声明式事务处理的实现
3 private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
4
5 private Pointcut pointcut;
6
7 //这里 Spring 把 TransactionManager 注入到 TransactionInterceptor 中去
8 public void setTransactionManager(PlatformTransactionManager transactionManager) {
9 this.transactionInterceptor.setTransactionManager(transactionManager);
10 }
11
12 //这里把在 bean 配置文件中读到的事务管理的属性信息注入到 TransactionInterceptor 中去
13 public void setTransactionAttributes(Properties transactionAttributes) {
14 this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
15 }
16
17 ...中间省略了其他一些方法...
18
19 //这里创建 Spring AOP 对事务处理的 Advisor
20 protected Object createMainInterceptor() {
21 this.transactionInterceptor.afterPropertiesSet();
22 if (this.pointcut != null) {
23 //这里使用默认的通知器
24 return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
25 }else {
26 // 使用上面定义好的 TransactionInterceptor 作为拦截器,同时使用 TransactionAttributeSourceAdvisor
27 return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
28 }
29 }
30}
那什么时候 Spring 的 TransactionInterceptor 被注入到 Spring AOP 中成为 Advisor 中的一部分呢?我们看到在TransactionProxyFactoryBean 中,这个方法在 IOC 初始化 bean 的时候被执行:
代码语言:javascript复制 1public void afterPropertiesSet() {
2 ...
3 //TransactionProxyFactoryBean 实际上使用 ProxyFactory 完成 AOP 的基本功能。
4 ProxyFactory proxyFactory = new ProxyFactory();
5
6 if (this.preInterceptors != null) {
7 for (int i = 0; i < this.preInterceptors.length; i ) {
8 proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(this.preInterceptors[i]));
9 }
10 }
11
12 //这里是 Spring 加入通知器的地方
13 //有两种通知器可以被加入 DefaultPointcutAdvisor 或者 TransactionAttributeSourceAdvisor
14 //这里把 Spring 处理声明式事务处理的 AOP 代码都放到 ProxyFactory 中去,怎样加入 advisor 我们可以参考 ProxyFactory 的父类 AdvisedSupport()
15 //由它来维护一个 advice 的链表,通过这个链表的增删改来抽象我们对整个通知器配置的增删改操作。
16 proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(createMainInterceptor()));
17
18 if (this.postInterceptors != null) {
19 for (int i = 0; i < this.postInterceptors.length; i ) {
20 proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(this.postInterceptors[i]));
21 }
22 }
23
24 proxyFactory.copyFrom(this);
25
26 //这里创建 AOP 的目标源
27 TargetSource targetSource = createTargetSource(this.target);
28 proxyFactory.setTargetSource(targetSource);
29
30 if (this.proxyInterfaces != null) {
31 proxyFactory.setInterfaces(this.proxyInterfaces);
32 }else if (!isProxyTargetClass()) {
33 proxyFactory.setInterfaces(ClassUtils.getAllInterfacesForClass(targetSource.getTargetClass()));
34 }
35
36 this.proxy = getProxy(proxyFactory);
37}
Spring 已经定义了一个 transctionInterceptor 作为拦截器或者 AOP advice 的实现,在 IOC 容器中定义的其他属性比如transactionManager 和事务管理的属性都会传到已经定义好的 TransactionInterceptor 那里去进行处理。以上反映了基本的 Spring AOP的定义过程,其中 pointcut 和 advice 都已经定义好,同时也通过通知器配置到 ProxyFactory 中去了。
下面让我们回到 TransactionProxyFactoryBean 中看看 TransactionAttributeSourceAdvisor 是怎样定义的,这样我们可以理解具体的属性是怎样起作用,这里我们分析一下类 TransactionAttributeSourceAdvisor:
代码语言:javascript复制 1public class TransactionAttributeSourceAdvisor extends AbstractPointcutAdvisor {
2 //和其他 Advisor 一样,同样需要定义 AOP 中的用到的 Interceptor 和 Pointcut
3 //Interceptor 使用传进来的 TransactionInterceptor
4 //而对于 pointcut,这里定义了一个内部类,参见下面的代码
5 private TransactionInterceptor transactionInterceptor;
6
7 private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut();
8
9 ...
10
11//定义的 PointCut 内部类
12private class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {
13 ...
14 //方法匹配的实现,使用了 TransactionAttributeSource 类
15 public boolean matches(Method method, Class targetClass) {
16 TransactionAttributeSource tas = getTransactionAttributeSource();
17 //这里使用 TransactionAttributeSource 来对配置属性进行处理
18 return (tas != null && tas.getTransactionAttribute(method, targetClass) != null);
19 }
20 ...省略了 equal,hashcode,tostring 的代码
21}
这里我们看看属性值是怎样被读入的:AbstractFallbackTransactionAttributeSource 负责具体的属性读入任务,我们可以有两种读入方式,比如 annotation 和直接配置.我们下面看看直接配置的读入方式,在 Spring 中同时对读入的属性值进行了缓存处理,这是一个decorator 模式:
代码语言:javascript复制 1public final TransactionAttribute getTransactionAttribute(Method method, Class targetClass) {
2 //这里先查一下缓存里有没有事务管理的属性配置,如果有从缓存中取得 TransactionAttribute
3 Object cacheKey = getCacheKey(method, targetClass);
4 Object cached = this.cache.get(cacheKey);
5 if (cached != null) {
6 if (cached == NULL_TRANSACTION_ATTRIBUTE) {
7 return null;
8 }else {
9 return (TransactionAttribute) cached;
10 }
11 }else {
12 // 这里通过对方法和目标对象的信息来计算事务缓存属性
13 TransactionAttribute txAtt = computeTransactionAttribute(method, targetClass);
14 //把得到的事务缓存属性存到缓存中,下次可以直接从缓存中取得。
15 if (txAtt == null) {
16 this.cache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
17 }else {
18 ...
19 this.cache.put(cacheKey, txAtt);
20 }
21 return txAtt;
22 }
23}
别急,基本的处理在 computeTransactionAttribute()中:
代码语言:javascript复制 1private TransactionAttribute computeTransactionAttribute(Method method, Class targetClass) {
2 //这里检测是不是 public 方法
3 if(allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
4 return null;
5 }
6
7 Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
8
9 // First try is the method in the target class.
10 TransactionAttribute txAtt = findTransactionAttribute(findAllAttributes(specificMethod));
11 if (txAtt != null) {
12 return txAtt;
13 }
14
15 // Second try is the transaction attribute on the target class.
16 txAtt = findTransactionAttribute(findAllAttributes(specificMethod.getDeclaringClass()));
17 if (txAtt != null) {
18 return txAtt;
19 }
20
21 if (specificMethod != method) {
22 // Fallback is to look at the original method.
23 txAtt = findTransactionAttribute(findAllAttributes(method));
24 if (txAtt != null) {
25 return txAtt;
26 }
27 // Last fallback is the class of the original method.
28 return findTransactionAttribute(findAllAttributes(method.getDeclaringClass()));
29 }
30 return null;
31}
经过一系列的尝试我们可以通过 findTransactionAttribute()通过调用 findAllAttribute()得到 TransactionAttribute 的对象,如果返回的是 null,这说明该方法不是我们需要事务处理的方法。
在完成把需要的通知器加到 ProxyFactory 中去的基础上,我们看看具体的看事务处理代码怎样起作用,在TransactionInterceptor 中:
代码语言:javascript复制 1public Object invoke(final MethodInvocation invocation) throws Throwable {
2 //这里得到目标对象
3 Class targetClass = (invocation.getThis() != null ? invocation.getThis().getClass() : null);
4
5 //这里同样的通过判断是否能够得到 TransactionAttribute 来决定是否对当前方法进行事务处理,有可能该属性已经被缓存,
6 //具体可以参考上面对 getTransactionAttribute 的分析,同样是通过 TransactionAttributeSource
7 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);
8 final String joinpointIdentification = methodIdentification(invocation.getMethod());
9
10 //这里判断我们使用了什么 TransactionManager
11 if (txAttr == null || !(getTransactionManager() instanceof CallbackPreferringPlatformTransactionManager)) {
12 // 这里创建事务,同时把创建事务过程中得到的信息放到 TransactionInfo 中去
13 TransactionInfo txInfo = createTransactionIfNecessary(txAttr, joinpointIdentification);
14 Object retVal = null;
15 try {
16 retVal = invocation.proceed();
17 }catch (Throwable ex) {
18 // target invocation exception
19 completeTransactionAfterThrowing(txInfo, ex);
20 throw ex;
21 }finally {
22 cleanupTransactionInfo(txInfo);
23 }
24 commitTransactionAfterReturning(txInfo);
25 return retVal;
26 }else {
27 // 使用的是 Spring 定义的 PlatformTransactionManager 同时实现了回调接口,我们通过其回调函数完成事务处理,就像我们使用编程式事务处理一样。
28 try {
29 Object result = ((CallbackPreferringPlatformTransactionManager) getTransactionManager()).execute(txAttr, new TransactionCallback() {
30 public Object doInTransaction(TransactionStatus status) {
31 //同样的需要一个 TransactonInfo
32 TransactionInfo txInfo = prepareTransactionInfo(txAttr, joinpointIdentification, status);
33 try {
34 return invocation.proceed();
35 }
36 ...这里省去了异常处理和事务信息的清理代码
37 });
38 ...
39 }
40 }
这里面涉及到事务的创建,我们可以在 TransactionAspectSupport 实现的事务管理代码:
代码语言:javascript复制 1protected TransactionInfo createTransactionIfNecessary(TransactionAttribute txAttr, final String joinpointIdentification) {
2 // If no name specified, apply method identification as transaction name.
3 if (txAttr != null && txAttr.getName() == null) {
4 txAttr = new DelegatingTransactionAttribute(txAttr) {
5 public String getName() {
6 return joinpointIdentification;
7 }
8 };
9 }
10
11 TransactionStatus status = null;
12 if (txAttr != null) {
13 //这里使用了我们定义好的事务配置信息,有事务管理器来创建事务,同时返回 TransactionInfo
14 status = getTransactionManager().getTransaction(txAttr);
15 }
16 return prepareTransactionInfo(txAttr, joinpointIdentification, status);
17}
首先通过 TransactionManager 得到需要的事务,事务的创建根据我们定义的事务配置决定,在AbstractTransactionManager 中给出一个标准的创建过程,当然创建什么样的事务还是需要具体的PlatformTransactionManager 来决定,但这里给出了创建事务的模板:
代码语言:javascript复制 1public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
2 Object transaction = doGetTransaction();
3 ...
4
5 if (definition == null) {
6 //如果事务信息没有被配置,我们使用 Spring 默认的配置方式
7 definition = new DefaultTransactionDefinition();
8 }
9
10 if (isExistingTransaction(transaction)) {
11 // Existing transaction found -> check propagation behavior to find out how to behave.
12 return handleExistingTransaction(definition, transaction, debugEnabled);
13 }
14
15 // Check definition settings for new transaction.
16 //下面就是使用配置信息来创建我们需要的事务;比如传播属性和同步属性等
17 //最后把创建过程中的信息收集起来放到 TransactionStatus 中返回;
18 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
19 throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
20 }
21
22 // No existing transaction found -> check propagation behavior to find out how to behave.
23 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
24 throw new IllegalTransactionStateException(
25 "Transaction propagation 'mandatory' but no existing transaction found");
26 }else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
27 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
28 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
29 //这里是事务管理器创建事务的地方,并将创建过程中得到的信息放到 TransactionStatus 中去,包括创建出来的事务
30 doBegin(transaction, definition);
31 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
32 return newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);
33 }else {
34 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
35 return newTransactionStatus(definition, null, false, newSynchronization, debugEnabled, null);
36 }
37}
接着通过调用 prepareTransactionInfo 完成事务创建的准备,创建过程中得到的信息存储在 TransactionInfo 对象中进行传递同时把信息和当前线程绑定;
代码语言:javascript复制 1protected TransactionInfo prepareTransactionInfo(TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
2 TransactionInfo txInfo = new TransactionInfo(txAttr, joinpointIdentification);
3 if (txAttr != null) {
4 ...
5 // 同样的需要把在 getTransaction 中得到的 TransactionStatus 放到 TransactionInfo 中来。
6 txInfo.newTransactionStatus(status);
7 }else {
8 ...
9 }
10
11 // 绑定事务创建信息到当前线程
12 txInfo.bindToThread();
13 return txInfo;
14}
将创建事务的信息返回,然后看到其他的事务管理代码:
代码语言:javascript复制1protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
2 if (txInfo != null && txInfo.hasTransaction()) {
3 if (logger.isDebugEnabled()) {
4 logger.debug("Invoking commit for transaction on " txInfo.getJoinpointIdentification());
5 }
6 this.transactionManager.commit(txInfo.getTransactionStatus());
7 }
8}
通过 transactionManager 对事务进行处理,包括异常抛出和正常的提交事务,具体的事务管理器由用户程序设定。
代码语言:javascript复制 1protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
2 if (txInfo != null && txInfo.hasTransaction()) {
3 if (txInfo.transactionAttribute.rollbackOn(ex)) {
4 ...
5 try {
6 this.transactionManager.rollback(txInfo.getTransactionStatus());
7 }
8 ...
9 } else {
10 ...
11 try {
12 this.transactionManager.commit(txInfo.getTransactionStatus());
13 }
14 ...
15 }
16
17 protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
18 if (txInfo != null && txInfo.hasTransaction()) {
19 ...
20 this.transactionManager.commit(txInfo.getTransactionStatus());
21 }
22 }
Spring 通过以上代码对 transactionManager 进行事务处理的过程进行了 AOP 包装,到这里我们看到为了方便客户实现声明式的事务处理,Spring 还是做了许多工作的。如果说使用编程式事务处理,过程其实比较清楚,我们可以参考书中的例子:
代码语言:javascript复制 1TransactionDefinition td = new DefaultTransactionDefinition();
2TransactionStatus status = transactionManager.getTransaction(td);
3try{
4 ...//这里是我们的业务方法
5}catch (ApplicationException e) {
6 transactionManager.rollback(status);
7 throw e
8}
9transactionManager.commit(status);
10...
我们看到这里选取了默认的事务配置 DefaultTransactionDefinition,同时在创建事物的过程中得到 TransactionStatus,然后通过直接调用事务管理器的相关方法就能完成事务处理。
声明式事务处理也同样实现了类似的过程,只是因为采用了声明的方法,需要增加对属性的读取处理,并且需要把整个过程整合到 Spring AOP 框架中和 IoC 容器中去的过程。
下面我们选取一个具体的 transactionManager - DataSourceTransactionManager 来看看其中事务处理的实现: 同样的通过使用 AbstractPlatformTransactionManager 使用模板方法,这些都体现了对具体平台相关的事务管理器操作的封装,比如commit:
代码语言:javascript复制 1public final void commit(TransactionStatus status) throws TransactionException {
2 ...
3 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
4 if (defStatus.isLocalRollbackOnly()) {
5 ...
6 processRollback(defStatus);
7 return;
8 }
9 ...
10 processRollback(defStatus);
11 ...
12}
13
14processCommit(defStatus);
15}
通过对 TransactionStatus 的具体状态的判断,来决定具体的事务处理:
代码语言:javascript复制 1private void processCommit(DefaultTransactionStatus status) throws TransactionException {
2 try {
3 boolean beforeCompletionInvoked = false;
4 try {
5 triggerBeforeCommit(status);
6 triggerBeforeCompletion(status);
7 beforeCompletionInvoked = true;
8 boolean globalRollbackOnly = false;
9 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
10 globalRollbackOnly = status.isGlobalRollbackOnly();
11 }
12 if (status.hasSavepoint()) {
13 ...
14 status.releaseHeldSavepoint();
15 } else if (status.isNewTransaction()) {
16 ...
17 doCommit(status);
18 }
19 ...
20 }
这些模板方法的实现由具体的 transactionManager 来实现,比如在 DataSourceTransactionManager:
代码语言:javascript复制 1protected void doCommit(DefaultTransactionStatus status) {
2 //这里得到存在 TransactionInfo 中已经创建好的事务
3 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
4
5 //这里得到和事务绑定的数据库连接
6 Connection con = txObject.getConnectionHolder().getConnection();
7 ...
8 try {
9 //这里通过数据库连接来提交事务
10 con.commit();
11 }
12 ...
13}
14
15protected void doRollback(DefaultTransactionStatus status) {
16 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
17 Connection con = txObject.getConnectionHolder().getConnection();
18 if (status.isDebug()) {
19 logger.debug("Rolling back JDBC transaction on Connection [" con "]");
20 }
21 try {
22 //这里通过数据库连接来回滚事务
23 con.rollback();
24 } catch (SQLException ex) {
25 throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
26 }
27}
我们看到在 DataSourceTransactionManager 中最后还是交给 connection 来实现事务的提交和 rollback。整个声明式事务处理是事务处理在 Spring AOP 中的应用,我们看到了一个很好的使用 Spring AOP 的例子,在 Spring 声明式事务处理的源代码中我们可以看到:
- 怎样封装各种不同平台下的事务处理代码
- 怎样读取属性值和结合事务处理代码来完成既定的事务处理策略
- 怎样灵活的使用 SpringAOP 框架。
如果能够结合前面的 Spring AOP 的源代码来学习,理解可能会更深刻些。