现在的位置: 首页 > 综合 > 正文

Spring事务处理的实现

2014年02月19日 ⁄ 综合 ⁄ 共 19834字 ⁄ 字号 评论关闭

1. 编程式事务处理:

Spring 提供编程式和声明式两种事务处理方式,我们首先通过一个编程式事务的小例子了解 Spring 中编程式事务处理的基本实现:


  1. //通过DefaultTransactionDefinition对象来持有事务处理属性  
  2. TransactionDefinition td = new DefaultTransactionDefinition();  
  3. //获取事务的状态  
  4. TransactionStatus status = transactionManager.getTransaction(td);  
  5. Try{  
  6.     //调用需要进行事务处理的目标方法  
  7. }catch(ApplicationException e){  
  8.     //调用目标方法过程中产生异常,则对事务进行回滚处理  
  9.     transactionManager.rollback(status);  
  10.     throw e;  
  11. }  
  12. //成功调用目标方法之后,对事务进行提交处理  
  13. transactionManager.commit(status);  

  1. //通过DefaultTransactionDefinition对象来持有事务处理属性  
  2. TransactionDefinition td = new DefaultTransactionDefinition();  
  3. //获取事务的状态  
  4. TransactionStatus status = transactionManager.getTransaction(td);  
  5. Try{  
  6.     //调用需要进行事务处理的目标方法  
  7. }catch(ApplicationException e){  
  8.     //调用目标方法过程中产生异常,则对事务进行回滚处理  
  9.     transactionManager.rollback(status);  
  10.     throw e;  
  11. }  
  12. //成功调用目标方法之后,对事务进行提交处理  
  13. transactionManager.commit(status);  

Spring 的声明式事务处理的即开即用特性为用户提供了很大的方便,在使用 Spring 时,我们绝大多数情况下还是使用其声明式事务处理。声明式事务处理涉及 Spring 框架对事务处理的统一管理,以及对并发事务和事务属性的处理,是一个比较复杂的过程,下面我们通过源码分析,了解 Spring 框架声明式事务处理功能的具体实现。

2. 事务的创建:

对 Spring 事务拦截器 TransactionInterceptor 回调方法 invoke 的源码分析中,我们了解到在进行事务处理前,首先根据是否是CallbackPreferringPlatformTransactionManager 类型的事务处理器分别 通过下面两个方法创建事务信息对象:


  1. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);   
  2. TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);   
  1. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);   
  2. TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);   

事务拦截器 TransactionInterceptor 回调方法 invoke 通过调用 TransactionAspectSupport 事务切面支持类中的 createTransactionIfNecessary 和prepareTransactionInfo 方法创建事务对象:


  1. //根据给定的事务属性创建事务对象  
  2. protected TransactionInfo createTransactionIfNecessary(  
  3.             PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {  
  4.         //读取事务方法调用的事务配置属性  
  5.         if (txAttr != null && txAttr.getName() == null) {  
  6.             //如果事务名称为null,则使用方法的名称(事务连接点标识)作为事务名称,  
  7.             //调用一个实现DelegatingTransactionAttribute接口的匿名内部类  
  8.             txAttr = new DelegatingTransactionAttribute(txAttr) {  
  9.                 //使用方法名称作为事务名称  
  10.                 public String getName() {  
  11.                     return joinpointIdentification;  
  12.                 }  
  13.             };  
  14.         }  
  15.         //事务状态封装了事务执行的状态信息  
  16.         TransactionStatus status = null;  
  17.         if (txAttr != null) {  
  18.             if (tm != null) {  
  19.                 //事务处理器创建事务,并且返回当前事务的状态信息  
  20.                 status = tm.getTransaction(txAttr);  
  21.             }  
  22.             else {  
  23.                 if (logger.isDebugEnabled()) {  
  24.                     logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +  
  25.                             "] because no transaction manager has been configured");  
  26.                 }  
  27.             }  
  28.         }  
  29.         //准备事务信息,事务信息TransactionInfo封装了事务配置和状态信息  
  30.         return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);  
  31.     }   
  32. //准备事务信息  
  33. protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,  
  34.             TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {  
  35.         //创建事务信息对象  
  36.         TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);  
  37.         //如果事务属性不为null,需要为方法使用事务  
  38.         if (txAttr != null) {  
  39.             if (logger.isTraceEnabled()) {  
  40.                 logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");  
  41.             }  
  42.             //为事务信息对象设置事务状态  
  43.             txInfo.newTransactionStatus(status);  
  44.         }  
  45.         //如果事务属性为null,不需要为方法使用事务  
  46.         else {  
  47.             if (logger.isTraceEnabled())  
  48.                 logger.trace("Don't need to create transaction for [" + joinpointIdentification +  
  49.                         "]: This method isn't transactional.");  
  50.         }  
  51.         //把当前创建的事务信息对象和线程绑定  
  52.         txInfo.bindToThread();  
  53.         return txInfo;  
  54.     }   
  1. //根据给定的事务属性创建事务对象  
  2. protected TransactionInfo createTransactionIfNecessary(  
  3.             PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {  
  4.         //读取事务方法调用的事务配置属性  
  5.         if (txAttr != null && txAttr.getName() == null) {  
  6.             //如果事务名称为null,则使用方法的名称(事务连接点标识)作为事务名称,  
  7.             //调用一个实现DelegatingTransactionAttribute接口的匿名内部类  
  8.             txAttr = new DelegatingTransactionAttribute(txAttr) {  
  9.                 //使用方法名称作为事务名称  
  10.                 public String getName() {  
  11.                     return joinpointIdentification;  
  12.                 }  
  13.             };  
  14.         }  
  15.         //事务状态封装了事务执行的状态信息  
  16.         TransactionStatus status = null;  
  17.         if (txAttr != null) {  
  18.             if (tm != null) {  
  19.                 //事务处理器创建事务,并且返回当前事务的状态信息  
  20.                 status = tm.getTransaction(txAttr);  
  21.             }  
  22.             else {  
  23.                 if (logger.isDebugEnabled()) {  
  24.                     logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +  
  25.                             "] because no transaction manager has been configured");  
  26.                 }  
  27.             }  
  28.         }  
  29.         //准备事务信息,事务信息TransactionInfo封装了事务配置和状态信息  
  30.         return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);  
  31.     }   
  32. //准备事务信息  
  33. protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,  
  34.             TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {  
  35.         //创建事务信息对象  
  36.         TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);  
  37.         //如果事务属性不为null,需要为方法使用事务  
  38.         if (txAttr != null) {  
  39.             if (logger.isTraceEnabled()) {  
  40.                 logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");  
  41.             }  
  42.             //为事务信息对象设置事务状态  
  43.             txInfo.newTransactionStatus(status);  
  44.         }  
  45.         //如果事务属性为null,不需要为方法使用事务  
  46.         else {  
  47.             if (logger.isTraceEnabled())  
  48.                 logger.trace("Don't need to create transaction for [" + joinpointIdentification +  
  49.                         "]: This method isn't transactional.");  
  50.         }  
  51.         //把当前创建的事务信息对象和线程绑定  
  52.         txInfo.bindToThread();  
  53.         return txInfo;  
  54.     }   

通过上面对 TransactionAspectSupport 事务切面支持类创建事务信息对象的源码分析,我们了解了在创建事务信息对象过程中创建事务状态,将创建的事务信息对象和当前线程资源绑定等基本的处理流程。在创建事务信息对象的方法中,事务处理器的 tm.getTransaction(txAttr); 是真正底层创建事务对象的方法,下面我们继续分析事务处理器创建事务对象的过程。

3. 抽象事务管理器 AbstractPlatformTransactionManager 获取事务:

抽象事务管理器 AbstractPlatformTransactionManager 提供了创建事务的模板,这个模板会被具体的事务处理器所使用,抽象事务管理器根据事务属性配置和当前线程绑定信息对事务是否需要创建以及如何创建进行一些通用的处理,然后把事务创建的底层细节交给具体的事务处理器实现。抽象事务管理器创建事务的模板方法如下:


  1. public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {  
  2.         //doGetTransaction()方法是抽象方法,具体的实现由具体的事务处理器提供  
  3.         Object transaction = doGetTransaction();  
  4.         boolean debugEnabled = logger.isDebugEnabled();  
  5.         //如果没有配置事务属性,则使用默认的事务属性  
  6.         if (definition == null) {  
  7.             definition = new DefaultTransactionDefinition();  
  8.         }  
  9.         //检查当前线程是否存在事务  
  10.         if (isExistingTransaction(transaction)) {  
  11.             //处理已存在的事务  
  12.             return handleExistingTransaction(definition, transaction, debugEnabled);  
  13.         }  
  14.         //检查事务属性中timeout超时属性设置是否合理  
  15.         if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {  
  16.             throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());  
  17.         }  
  18.         //对事务属性中配置的事务传播特性处理  
  19.         //如果事务传播特性配置的是mandatory,当前没有事务存在,抛出异常  
  20.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {  
  21.             throw new IllegalTransactionStateException(  
  22.                     "No existing transaction found for transaction marked with propagation 'mandatory'");  
  23.         }  
  24.         //如果事务传播特性为required、required_new或nested  
  25.         else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||  
  26.                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||  
  27.             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {  
  28.             SuspendedResourcesHolder suspendedResources = suspend(null);  
  29.             if (debugEnabled) {  
  30.                 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);  
  31.             }  
  32.             //创建事务  
  33.             try {  
  34.                 //不激活和当前线程绑定的事务,因为事务传播特性配置要求创建新的事务  
  35.                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);  
  36.                 //创建一个新的事务状态  
  37.                 DefaultTransactionStatus status = newTransactionStatus(  
  38.                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);  
  39.                 //创建事务的调用,具体实现由具体的事务处理器提供  
  40.                 doBegin(transaction, definition);  
  41.                 //初始化和同步事务状态  
  42.                 prepareSynchronization(status, definition);  
  43.                 return status;  
  44.             }  
  45.             catch (RuntimeException ex) {  
  46.                 resume(null, suspendedResources);  
  47.                 throw ex;  
  48.             }  
  49.             catch (Error err) {  
  50.                 resume(null, suspendedResources);  
  51.                 throw err;  
  52.             }  
  53.         }  
  54.         else {  
  55.         //创建空事务,针对supported类型的事务传播特性,激活和当前线程绑定的事务  
  56.             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);  
  57.             //准备事务状态  
  58.             return prepareTransactionStatus(definition, nulltrue, newSynchronization, debugEnabled, null);  
  59.         }  
  60.     }  
  61. //准备事务状态  
  62. protected final DefaultTransactionStatus prepareTransactionStatus(  
  63.             TransactionDefinition definition, Object transaction, boolean newTransaction,  
  64.             boolean newSynchronization, boolean debug, Object suspendedResources) {  
  65.         //创建事务状态  
  66.         DefaultTransactionStatus status = newTransactionStatus(  
  67.                 definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);  
  68.         //准备事务状态  
  69.         prepareSynchronization(status, definition);  
  70.         return status;  
  71.     }  
  72.     //创建事务状态  
  73.     protected DefaultTransactionStatus newTransactionStatus(  
  74.             TransactionDefinition definition, Object transaction, boolean newTransaction,  
  75.             boolean newSynchronization, boolean debug, Object suspendedResources) {  
  76.         //判断是否是新事务,如果是新事务,则需要把事务属性存放到当前线程中  
  77.         boolean actualNewSynchronization = newSynchronization &&  
  78.         !TransactionSynchronizationManager.isSynchronizationActive();  
  79.         return new DefaultTransactionStatus(  
  80.                 transaction, newTransaction, actualNewSynchronization,  
  81.                 definition.isReadOnly(), debug, suspendedResources);  
  82.     }  
  83.     //初始化事务属性  
  84.     protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {  
  85.         if (status.isNewSynchronization()) {  
  86.         //设置当前是否有活跃事务   TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());  
  87.         //设置当前事务隔离级别    TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(  
  88.                     (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) ?  
  89.                             definition.getIsolationLevel() : null);  
  90.             TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());  
  91.     //设置当前事务名称  TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());  
  92.             TransactionSynchronizationManager.initSynchronization();  
  93.         }  
  94.     }  

  1. public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {  
  2.         //doGetTransaction()方法是抽象方法,具体的实现由具体的事务处理器提供  
  3.         Object transaction = doGetTransaction();  
  4.         boolean debugEnabled = logger.isDebugEnabled();  
  5.         //如果没有配置事务属性,则使用默认的事务属性  
  6.         if (definition == null) {  
  7.             definition = new DefaultTransactionDefinition();  
  8.         }  
  9.         //检查当前线程是否存在事务  
  10.         if (isExistingTransaction(transaction)) {  
  11.             //处理已存在的事务  
  12.             return handleExistingTransaction(definition, transaction, debugEnabled);  
  13.         }  
  14.         //检查事务属性中timeout超时属性设置是否合理  
  15.         if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {  
  16.             throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());  
  17.         }  
  18.         //对事务属性中配置的事务传播特性处理  
  19.         //如果事务传播特性配置的是mandatory,当前没有事务存在,抛出异常  
  20.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {  
  21.             throw new IllegalTransactionStateException(  
  22.                     "No existing transaction found for transaction marked with propagation 'mandatory'");  
  23.         }  
  24.         //如果事务传播特性为required、required_new或nested  
  25.         else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||  
  26.                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||  
  27.             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {  
  28.             SuspendedResourcesHolder suspendedResources = suspend(null);  
  29.             if (debugEnabled) {  
  30.                 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);  
  31.             }  
  32.             //创建事务  
  33.             try {  
  34.                 //不激活和当前线程绑定的事务,因为事务传播特性配置要求创建新的事务  
  35.                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);  
  36.                 //创建一个新的事务状态  
  37.                 DefaultTransactionStatus status = newTransactionStatus(  
  38.                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);  
  39.                 //创建事务的调用,具体实现由具体的事务处理器提供  
  40.                 doBegin(transaction, definition);  
  41.                 //初始化和同步事务状态  
  42.                 prepareSynchronization(status, definition);  
  43.                 return status;  
  44.             }  
  45.             catch (RuntimeException ex) {  
  46.                 resume(null, suspendedResources);  
  47.                 throw ex;  
  48.             }  
  49.             catch (Error err) {  
  50.                 resume(null, suspendedResources);  
  51.                 throw err;  
  52.             }  
  53.         }  
  54.         else {  
  55.         //创建空事务,针对supported类型的事务传播特性,激活和当前线程绑定的事务  
  56.             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);  
  57.             //准备事务状态  
  58.             return prepareTransactionStatus(definition, nulltrue, newSynchronization, debugEnabled, null);  
  59.         }  
  60.     }  
  61. //准备事务状态  
  62. protected final DefaultTransactionStatus prepareTransactionStatus(  
  63.             TransactionDefinition definition, Object transaction, boolean newTransaction,  
  64.             boolean newSynchronization, boolean debug, Object suspendedResources) {  
  65.         //创建事务状态  
  66.         DefaultTransactionStatus status = newTransactionStatus(  
  67.                 definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);  
  68.         //准备事务状态  
  69.         prepareSynchronization(status, definition);  
  70.         return status;  
  71.     }  
  72.     //创建事务状态  
  73.     protected DefaultTransactionStatus newTransactionStatus(  
  74.             TransactionDefinition definition, Object transaction, boolean newTransaction,  
  75.             boolean newSynchronization, boolean debug, Object suspendedResources) {  
  76.         //判断是否是新事务,如果是新事务,则需要把事务属性存放到当前线程中  
  77.         boolean actualNewSynchronization = newSynchronization &&  
  78.         !TransactionSynchronizationManager.isSynchronizationActive();  
  79.         return new DefaultTransactionStatus(  
  80.                 transaction, newTransaction, actualNewSynchronization,  
  81.                 definition.isReadOnly(), debug, suspendedResources);  
  82.     }  
  83.     //初始化事务属性  
  84.     protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {  
  85.         if (status.isNewSynchronization()) {  
  86.         //设置当前是否有活跃事务   TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());  
  87.         //设置当前事务隔离级别    TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(  
  88.                     (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) ?  
  89.                             definition.getIsolationLevel() : null);  
  90.             TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());  
  91.     //设置当前事务名称  TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());  
  92.             TransactionSynchronizationManager.initSynchronization();  
  93.         }  
  94.     }  

通过对上面创建事务对象的源码分析,我们看到这抽象事务管理器获取事务对象的这个模板方法主要功能是处理事务属性中配置的事务传播特性,对于判断是否存在事务的 isExistingTransaction 方法和创建事务对象的 doBegin 方法,均委派给具体的事务处理器实现。

4. 抽象事务管理器 AbstractPlatformTransactionManager 处理已存在的事务:

对于新事务的处理相对比较简单,只需根据事务属性配置创建,同时将事务隔离级别等属性保存到事务绑定的线程资源中。而对于已存在的事务处理相对比较复杂一些,在抽象事务管理器 AbstractPlatformTransactionManager 中通过 handleExistingTransaction 方法来处理已存在的事务:


  1. private TransactionStatus handleExistingTransaction(  
  2.             TransactionDefinition definition, Object transaction, boolean debugEnabled)  
  3.             throws TransactionException {  
  4.         //如果事务传播特性为:never,则抛出异常  
  5.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {  
  6.             throw new IllegalTransactionStateException(  
  7.                     "Existing transaction found for transaction marked with propagation 'never'");  
  8.         }  
  9.         //如果事务传播特性是not_supported,同时当前线程存在事务,则将事务挂起  
  10.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {  
  11.             if (debugEnabled) {  
  12.                 logger.debug("Suspending current transaction");  
  13.             }  
  14.             //挂起事务  
  15.             Object suspendedResources = suspend(transaction);  
  16.             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);  
  17.             //创建非事务的事务状态,让方法非事务地执行  
  18.             return prepareTransactionStatus(  
  19.                     definition, nullfalse, newSynchronization, debugEnabled, suspendedResources);  
  20.         }  
  21.         //如果事务传播特性是required_new,则创建新事务,同时把当前线程中存在的  
  22. //事务挂起  
  23.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {  
  24.             if (debugEnabled) {  
  25.                 logger.debug("Suspending current transaction, creating new transaction with name [" +  
  26.                         definition.getName() + "]");  
  27.             }  
  28.             //挂起已存在的事务  
  29.             SuspendedResourcesHolder suspendedResources = suspend(transaction);  
  30.             try {  
  31.                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);  
  32.                 //将挂起的事务状态保存起来  
  33.                 DefaultTransactionStatus status = newTransactionStatus(  
  34.                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);  
  35.                 //创建新事务  
  36.                 doBegin(transaction, definition);  
  37.                 prepareSynchronization(status, definition);  
  38.                 return status;  
  39.             }  
  40.             catch (RuntimeException beginEx) {  
  41.                 resumeAfterBeginException(transaction, suspendedResources, beginEx);  
  42.                 throw beginEx;  
  43.             }  
  44.             catch (Error beginErr) {  
  45.                 resumeAfterBeginException(transaction, suspendedResources, beginErr);  
  46.                 throw beginErr;  
  47.             }  
  48.         }  
  49.         //如果事务传播特性是nested嵌套事务  
  50.         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {  
  51.             //如果不允许事务嵌套,则抛出异常  
  52.             

抱歉!评论已关闭.