这一次搞懂Spring事务是如何传播的
前言
上一篇分析了事务注解的解析过程,本质上是将事务封装为切面加入到AOP的执行链中,因此会调用到MethodInceptor的实现类的invoke方法,而事务切面的Interceptor就是TransactionInterceptor,所以本篇直接从该类开始。
正文
事务切面的调用过程
publicObjectinvoke(MethodInvocationinvocation)throwsThrowable{ //Workoutthetargetclass:maybe{@codenull}. //TheTransactionAttributeSourceshouldbepassedthetargetclass //aswellasthemethod,whichmaybefromaninterface. Class>targetClass=(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null); //AdapttoTransactionAspectSupport'sinvokeWithinTransaction... returninvokeWithinTransaction(invocation.getMethod(),targetClass,invocation::proceed); }
这个方法本身没做什么事,主要是调用了父类的invokeWithinTransaction方法,注意最后一个参数,传入的是一个lambda表达式,而这个表达式中的调用的方法应该不陌生,在分析AOP调用链时,就是通过这个方法传递到下一个切面或是调用被代理实例的方法,忘记了的可以回去看看。
protectedObjectinvokeWithinTransaction(Methodmethod,@NullableClass>targetClass, finalInvocationCallbackinvocation)throwsThrowable{ //Ifthetransactionattributeisnull,themethodisnon-transactional. //获取事务属性类AnnotationTransactionAttributeSource TransactionAttributeSourcetas=getTransactionAttributeSource(); //获取方法上面有@Transactional注解的属性 finalTransactionAttributetxAttr=(tas!=null?tas.getTransactionAttribute(method,targetClass):null); //获取事务管理器 finalPlatformTransactionManagertm=determineTransactionManager(txAttr); finalStringjoinpointIdentification=methodIdentification(method,targetClass,txAttr); if(txAttr==null||!(tminstanceofCallbackPreferringPlatformTransactionManager)){ TransactionInfotxInfo=createTransactionIfNecessary(tm,txAttr,joinpointIdentification); ObjectretVal=null; try{ //调用proceed方法 retVal=invocation.proceedWithInvocation(); } catch(Throwableex){ //targetinvocationexception //事务回滚 completeTransactionAfterThrowing(txInfo,ex); throwex; } finally{ cleanupTransactionInfo(txInfo); } //事务提交 commitTransactionAfterReturning(txInfo); returnretVal; } //省略了else }
这个方法逻辑很清晰,一目了然,if里面就是对声明式事务的处理,先调用createTransactionIfNecessary方法开启事务,然后通过invocation.proceedWithInvocation调用下一个切面,如果没有其它切面了,就是调用被代理类的方法,出现异常就回滚,否则提交事务,这就是Spring事务切面的执行过程。但是,我们主要要搞懂的就是在这些方法中是如何管理事务以及事务在多个方法之间是如何传播的。
事务的传播性概念
传播性是Spring自己搞出来的,数据库是没有的,因为涉及到方法间的调用,那么必然就需要考虑事务在这些方法之间如何流转,所以Spring提供了7个传播属性供选择,可以将其看成两大类,即是否支持当前事务:
支持当前事务(在同一个事务中):
PROPAGATION_REQUIRED:支持当前事务,如果不存在,就新建一个事务。
PROPAGATION_MANDATORY:支持当前事务,如果不存在,就抛出异常。
PROPAGATION_SUPPORTS:支持当前事务,如果不存在,就不使用事务。
不支持当前事务(不在同一个事务中):
PROPAGATION_NEVER:以非事务的方式运行,如果有事务,则抛出异常。
PROPAGATION_NOT_SUPPORTED:以非事务的方式运行,如果有事务,则挂起当前事务。
PROPAGATION_REQUIRES_NEW:新建事务,如果有事务,挂起当前事务(两个事务相互独立,父事务回滚不影响子事务)。
PROPAGATION_NESTED:如果当前事务存在,则嵌套事务执行(指必须依存父事务,子事务不能单独提交且父事务回滚则子事务也必须回滚,而子事务若回滚,父事务可以回滚也可以捕获异常)。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。
别看属性这么多,实际上我们主要用的是PROPAGATION_REQUIRED默认属性,一些特殊业务下可能会用到PROPAGATION_REQUIRES_NEW以及PROPAGATION_NESTED。
下面我会假设一个场景,并主要分析这三个属性。
publicclassA{ @Autowired privateBb; @Transactional publicvoidaddA(){ b.addB(); } } publicclassB{ @Transactional publicvoidaddB(){ //doSomething... } }
上面我创建了A、B两个类,每个类中有一个事务方法,使用了声明式事务并采用的默认传播属性,在A中调用了B的方法。
当请求来了调用addA时,首先调用的是代理对象的方法,因此会进入createTransactionIfNecessary方法开启事务:
protectedTransactionInfocreateTransactionIfNecessary(@NullablePlatformTransactionManagertm, @NullableTransactionAttributetxAttr,finalStringjoinpointIdentification){ //Ifnonamespecified,applymethodidentificationastransactionname. if(txAttr!=null&&txAttr.getName()==null){ txAttr=newDelegatingTransactionAttribute(txAttr){ @Override publicStringgetName(){ returnjoinpointIdentification; } }; } TransactionStatusstatus=null; if(txAttr!=null){ if(tm!=null){ //开启事务,这里重点看 status=tm.getTransaction(txAttr); } else{ } } //创建事务信息对象,记录新老事务信息对象 returnprepareTransactionInfo(tm,txAttr,joinpointIdentification,status); }
实际上开启事务是通过AbstractPlatformTransactionManager做的,而这个类是一个抽象类,具体实例化的对象就是我们在项目里常配置的DataSourceTransactionManager对象。
publicfinalTransactionStatusgetTransaction(@NullableTransactionDefinitiondefinition)throwsTransactionException{ //这里重点看,.DataSourceTransactionObject拿到对象 Objecttransaction=doGetTransaction(); //Cachedebugflagtoavoidrepeatedchecks. booleandebugEnabled=logger.isDebugEnabled(); if(definition==null){ //Usedefaultsifnotransactiondefinitiongiven. definition=newDefaultTransactionDefinition(); } //第一次进来connectionHolder为空的,所以不存在事务 if(isExistingTransaction(transaction)){ //Existingtransactionfound->checkpropagationbehaviortofindouthowtobehave. returnhandleExistingTransaction(definition,transaction,debugEnabled); } //Checkdefinitionsettingsfornewtransaction. if(definition.getTimeout()checkpropagationbehaviortofindouthowtoproceed. if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_MANDATORY){ thrownewIllegalTransactionStateException( "Noexistingtransactionfoundfortransactionmarkedwithpropagation'mandatory'"); } //第一次进来大部分会走这里 elseif(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRED|| definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW|| definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){ //先挂起 SuspendedResourcesHoldersuspendedResources=suspend(null); if(debugEnabled){ logger.debug("Creatingnewtransactionwithname["+definition.getName()+"]:"+definition); } try{ booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER); //创建事务状态对象,其实就是封装了事务对象的一些信息,记录事务状态的 DefaultTransactionStatusstatus=newTransactionStatus( definition,transaction,true,newSynchronization,debugEnabled,suspendedResources); //开启事务,重点看看DataSourceTransactionObject doBegin(transaction,definition); //开启事务后,改变事务状态 prepareSynchronization(status,definition); returnstatus; } catch(RuntimeException|Errorex){ resume(null,suspendedResources); throwex; } } else{ booleannewSynchronization=(getTransactionSynchronization()==SYNCHRONIZATION_ALWAYS); returnprepareTransactionStatus(definition,null,true,newSynchronization,debugEnabled,null); } }
这个方法流程比较长,一步步来看,先调用doGetTransaction方法获取一个DataSourceTransactionObject对象,这个类是JdbcTransactionObjectSupport的子类,在父类中持有了一个ConnectionHolder对象,见名知意,这个对象保存了当前的连接。
protectedObjectdoGetTransaction(){ //管理connection对象,创建回滚点,按照回滚点回滚,释放回滚点 DataSourceTransactionObjecttxObject=newDataSourceTransactionObject(); //DataSourceTransactionManager默认是允许嵌套事务的 txObject.setSavepointAllowed(isNestedTransactionAllowed()); //obtainDataSource()获取数据源对象,其实就是数据库连接块对象 ConnectionHolderconHolder= (ConnectionHolder)TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder,false); returntxObject; }
追溯getResource方法可以看到ConnectionHolder是从ThreadLocal里获取的,也就是当前线程,key是DataSource对象;但是仔细思考下我们是第一次进来,所以这里肯定获取不到的,反之,要从这里获取到值,那必然是同一个线程第二次及以后进入到这里,也就是在addA调用addB时,另外需要注意这里保存ConnectionHolder到DataSourceTransactionObject对象时是将newConnectionHolder属性设置为false了的。
继续往后,创建完transaction对象后,会调用isExistingTransaction判断是否已经存在一个事务,如果存在就会调用handleExistingTransaction方法,这个方法就是处理事务传播的核心方法,因为我们是第一次进来,肯定不存在事务,所以先跳过。
再往后,可以看到就是处理不同的传播属性,主要看到下面这个部分:
elseif(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRED|| definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW|| definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){ //先挂起 SuspendedResourcesHoldersuspendedResources=suspend(null); if(debugEnabled){ logger.debug("Creatingnewtransactionwithname["+definition.getName()+"]:"+definition); } try{ booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER); //创建事务状态对象,其实就是封装了事务对象的一些信息,记录事务状态的 DefaultTransactionStatusstatus=newTransactionStatus( definition,transaction,true,newSynchronization,debugEnabled,suspendedResources); //开启事务,重点看看DataSourceTransactionObject doBegin(transaction,definition); //开启事务后,改变事务状态 prepareSynchronization(status,definition); returnstatus; } catch(RuntimeException|Errorex){ resume(null,suspendedResources); throwex; } }
第一次进来时,PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED都会进入到这里,首先会调用suspend挂起当前存在的事务,在这里没啥作用。接下来通过newTransactionStatus创建了DefaultTransactionStatus对象,这个对象主要就是存储当前事务的一些状态信息,需要特别注意newTransaction属性设置为了true,表示是一个新事务。
状态对象创建好之后就是通过doBegin开启事务,这是一个模板方法:
protectedvoiddoBegin(Objecttransaction,TransactionDefinitiondefinition){ DataSourceTransactionObjecttxObject=(DataSourceTransactionObject)transaction; Connectioncon=null; try{ //如果没有数据库连接 if(!txObject.hasConnectionHolder()|| txObject.getConnectionHolder().isSynchronizedWithTransaction()){ //从连接池里面获取连接 ConnectionnewCon=obtainDataSource().getConnection(); if(logger.isDebugEnabled()){ logger.debug("AcquiredConnection["+newCon+"]forJDBCtransaction"); } //把连接包装成ConnectionHolder,然后设置到事务对象中 txObject.setConnectionHolder(newConnectionHolder(newCon),true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con=txObject.getConnectionHolder().getConnection(); //从数据库连接中获取隔离级别 IntegerpreviousIsolationLevel=DataSourceUtils.prepareConnectionForTransaction(con,definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); //Switchtomanualcommitifnecessary.ThisisveryexpensiveinsomeJDBCdrivers, //sowedon'twanttodoitunnecessarily(forexampleifwe'veexplicitly //configuredtheconnectionpooltosetitalready). if(con.getAutoCommit()){ txObject.setMustRestoreAutoCommit(true); if(logger.isDebugEnabled()){ logger.debug("SwitchingJDBCConnection["+con+"]tomanualcommit"); } //关闭连接的自动提交,其实这步就是开启了事务 con.setAutoCommit(false); } //设置只读事务从这一点设置的时间点开始(时间点a)到这个事务结束的过程中,其他事务所提交的数据,该事务将看不见! //设置只读事务就是告诉数据库,我这个事务内没有新增,修改,删除操作只有查询操作,不需要数据库锁等操作,减少数据库压力 prepareTransactionalConnection(con,definition); //自动提交关闭了,就说明已经开启事务了,事务是活动的 txObject.getConnectionHolder().setTransactionActive(true); inttimeout=determineTimeout(definition); if(timeout!=TransactionDefinition.TIMEOUT_DEFAULT){ txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } //Bindtheconnectionholdertothethread. if(txObject.isNewConnectionHolder()){ //如果是新创建的事务,则建立当前线程和数据库连接的关系 TransactionSynchronizationManager.bindResource(obtainDataSource(),txObject.getConnectionHolder()); } } catch(Throwableex){ if(txObject.isNewConnectionHolder()){ DataSourceUtils.releaseConnection(con,obtainDataSource()); txObject.setConnectionHolder(null,false); } thrownewCannotCreateTransactionException("CouldnotopenJDBCConnectionfortransaction",ex); } }
这个方法里面主要做了六件事:
首先从连接池获取连接并保存到DataSourceTransactionObject对象中。
关闭数据库的自动提交,也就是开启事务。
获取数据库的隔离级别。
根据属性设置该事务是否为只读事务。
将该事务标识为活动事务(transactionActive=true)。
将ConnectionHolder对象与当前线程绑定。
完成之后通过prepareSynchronization将事务的属性和状态设置到TransactionSynchronizationManager对象中进行管理。最后返回到createTransactionIfNecessary方法中创建TransactionInfo对象与当前线程绑定并返回。
通过以上的步骤就开启了事务,接下来就是通过proceedWithInvocation调用其它切面,这里我们先假设没有其它切面了,那么就是直接调用到A类的addA方法,在这个方法中又调用了B类的addB方法,那么肯定也是调用到代理类的方法,因此又会进入到createTransactionIfNecessary方法中。但这次进来通过isExistingTransaction判断是存在事务的,因此会进入到handleExistingTransaction方法:
privateTransactionStatushandleExistingTransaction( TransactionDefinitiondefinition,Objecttransaction,booleandebugEnabled) throwsTransactionException{ //不允许有事务,直接异常 if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NEVER){ thrownewIllegalTransactionStateException( "Existingtransactionfoundfortransactionmarkedwithpropagation'never'"); } //以非事务方式执行操作,如果当前存在事务,就把当前事务挂起 if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NOT_SUPPORTED){ if(debugEnabled){ logger.debug("Suspendingcurrenttransaction"); } //挂起当前事务 ObjectsuspendedResources=suspend(transaction); booleannewSynchronization=(getTransactionSynchronization()==SYNCHRONIZATION_ALWAYS); //修改事务状态信息,把事务的一些信息存储到当前线程中,ThreadLocal中 returnprepareTransactionStatus( definition,null,false,newSynchronization,debugEnabled,suspendedResources); } if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW){ if(debugEnabled){ logger.debug("Suspendingcurrenttransaction,creatingnewtransactionwithname["+ definition.getName()+"]"); } //挂起 SuspendedResourcesHoldersuspendedResources=suspend(transaction); try{ booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER); DefaultTransactionStatusstatus=newTransactionStatus( definition,transaction,true,newSynchronization,debugEnabled,suspendedResources); doBegin(transaction,definition); prepareSynchronization(status,definition); returnstatus; } catch(RuntimeException|ErrorbeginEx){ resumeAfterBeginException(transaction,suspendedResources,beginEx); throwbeginEx; } } if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){ if(!isNestedTransactionAllowed()){ thrownewNestedTransactionNotSupportedException( "Transactionmanagerdoesnotallownestedtransactionsbydefault-"+ "specify'nestedTransactionAllowed'propertywithvalue'true'"); } if(debugEnabled){ logger.debug("Creatingnestedtransactionwithname["+definition.getName()+"]"); } //默认是可以嵌套事务的 if(useSavepointForNestedTransaction()){ //CreatesavepointwithinexistingSpring-managedtransaction, //throughtheSavepointManagerAPIimplementedbyTransactionStatus. //UsuallyusesJDBC3.0savepoints.NeveractivatesSpringsynchronization. DefaultTransactionStatusstatus= prepareTransactionStatus(definition,transaction,false,false,debugEnabled,null); //创建回滚点 status.createAndHoldSavepoint(); returnstatus; } else{ //Nestedtransactionthroughnestedbeginandcommit/rollbackcalls. //UsuallyonlyforJTA:Springsynchronizationmightgetactivatedhere //incaseofapre-existingJTAtransaction. booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER); DefaultTransactionStatusstatus=newTransactionStatus( definition,transaction,true,newSynchronization,debugEnabled,null); doBegin(transaction,definition); prepareSynchronization(status,definition); returnstatus; } } //省略 booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER); returnprepareTransactionStatus(definition,transaction,false,newSynchronization,debugEnabled,null); }
这里面也是对每个传播属性的判断,先看PROPAGATION_REQUIRES_NEW的处理,因为该属性要求每次调用都开启一个新的事务,所以首先会将当前事务挂起,怎么挂起呢?
protectedfinalSuspendedResourcesHoldersuspend(@NullableObjecttransaction)throwsTransactionException{ if(TransactionSynchronizationManager.isSynchronizationActive()){ ListsuspendedSynchronizations=doSuspendSynchronization(); try{ ObjectsuspendedResources=null; //第一次进来,肯定为null的 if(transaction!=null){ //吧connectionHolder设置为空 suspendedResources=doSuspend(transaction); } //做数据还原操作 Stringname=TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); booleanreadOnly=TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); IntegerisolationLevel=TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); booleanwasActive=TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); returnnewSuspendedResourcesHolder( suspendedResources,suspendedSynchronizations,name,readOnly,isolationLevel,wasActive); } catch(RuntimeException|Errorex){ //doSuspendfailed-originaltransactionisstillactive... doResumeSynchronization(suspendedSynchronizations); throwex; } } elseif(transaction!=null){ //Transactionactivebutnosynchronizationactive. ObjectsuspendedResources=doSuspend(transaction); returnnewSuspendedResourcesHolder(suspendedResources); } else{ //Neithertransactionnorsynchronizationactive. returnnull; } } protectedObjectdoSuspend(Objecttransaction){ DataSourceTransactionObjecttxObject=(DataSourceTransactionObject)transaction; txObject.setConnectionHolder(null); //解除绑定关系, returnTransactionSynchronizationManager.unbindResource(obtainDataSource()); }
这里明显是进入第一个if并且会调用到doSuspend方法,整体来说挂起事务很简单:首先将DataSourceTransactionObject的ConnectionHolder设置为空并解除与当前线程的绑定,之后将解除绑定的ConnectionHolder和其它属性(事务名称、隔离级别、只读属性)通通封装到SuspendedResourcesHolder对象,并将当前事务的活动状态设置为false。挂起事务之后又通过newTransactionStatus创建了一个新的事务状态并调用doBegin开启事务,这里不再重复分析。
接着来看PROPAGATION_NESTED:
if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){ if(!isNestedTransactionAllowed()){ thrownewNestedTransactionNotSupportedException( "Transactionmanagerdoesnotallownestedtransactionsbydefault-"+ "specify'nestedTransactionAllowed'propertywithvalue'true'"); } if(debugEnabled){ logger.debug("Creatingnestedtransactionwithname["+definition.getName()+"]"); } //默认是可以嵌套事务的 if(useSavepointForNestedTransaction()){ //CreatesavepointwithinexistingSpring-managedtransaction, //throughtheSavepointManagerAPIimplementedbyTransactionStatus. //UsuallyusesJDBC3.0savepoints.NeveractivatesSpringsynchronization. DefaultTransactionStatusstatus= prepareTransactionStatus(definition,transaction,false,false,debugEnabled,null); //创建回滚点 status.createAndHoldSavepoint(); returnstatus; } else{ //Nestedtransactionthroughnestedbeginandcommit/rollbackcalls. //UsuallyonlyforJTA:Springsynchronizationmightgetactivatedhere //incaseofapre-existingJTAtransaction. booleannewSynchronization=(getTransactionSynchronization()!=SYNCHRONIZATION_NEVER); DefaultTransactionStatusstatus=newTransactionStatus( definition,transaction,true,newSynchronization,debugEnabled,null); doBegin(transaction,definition); prepareSynchronization(status,definition); returnstatus; } }
这里面可以看到如果允许嵌套事务,就会创建一个DefaultTransactionStatus对象(注意newTransaction是false,表明不是一个新事务)和回滚点;如果不允许嵌套,就会创建新事务并开启。
当上面的判断都不满足时,也就是传播属性为默认PROPAGATION_REQUIRED时,则只是创建了一个newTransaction为false的DefaultTransactionStatus返回。
完成之后又是调用proceedWithInvocation,那么就是执行B类的addB方法,假如没有发生异常,那么就会回到切面调用commitTransactionAfterReturning提交addB的事务:
protectedvoidcommitTransactionAfterReturning(@NullableTransactionInfotxInfo){ if(txInfo!=null&&txInfo.getTransactionStatus()!=null){ txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } } publicfinalvoidcommit(TransactionStatusstatus)throwsTransactionException{ processCommit(defStatus); } privatevoidprocessCommit(DefaultTransactionStatusstatus)throwsTransactionException{ try{ booleanbeforeCompletionInvoked=false; try{ booleanunexpectedRollback=false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked=true; if(status.hasSavepoint()){ if(status.isDebug()){ logger.debug("Releasingtransactionsavepoint"); } //如果是nested,没有提交,只是将savepoint清除掉了 unexpectedRollback=status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } //如果都是PROPAGATION_REQUIRED,最外层的才会走进来统一提交,如果是PROPAGATION_REQUIRES_NEW,每一个事务都会进来 elseif(status.isNewTransaction()){ if(status.isDebug()){ logger.debug("Initiatingtransactioncommit"); } unexpectedRollback=status.isGlobalRollbackOnly(); doCommit(status); } elseif(isFailEarlyOnGlobalRollbackOnly()){ unexpectedRollback=status.isGlobalRollbackOnly(); } } } finally{ cleanupAfterCompletion(status); } }
主要逻辑在processCommit方法中。如果存在回滚点,可以看到并没有提交事务,只是将当前事务的回滚点清除了;而如果是新事务,就会调用doCommit提交事务,也就是只有PROPAGATION_REQUIRED属性下的最外层事务和PROPAGATION_REQUIRES_NEW属性下的事务能提交。事务提交完成后会调用cleanupAfterCompletion清除当前事务的状态,如果有挂起的事务还会通过resume恢复挂起的事务(将解绑的连接和当前线程绑定以及将之前保存的事务状态重新设置回去)。当前事务正常提交后,那么就会轮到addA方法提交,处理逻辑同上,不再赘述。
如果调用addB发生异常,就会通过completeTransactionAfterThrowing进行回滚:
protectedvoidcompleteTransactionAfterThrowing(@NullableTransactionInfotxInfo,Throwableex){ if(txInfo!=null&&txInfo.getTransactionStatus()!=null){ if(logger.isTraceEnabled()){ logger.trace("Completingtransactionfor["+txInfo.getJoinpointIdentification()+ "]afterexception:"+ex); } if(txInfo.transactionAttribute!=null&&txInfo.transactionAttribute.rollbackOn(ex)){ try{ txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } } } } publicfinalvoidrollback(TransactionStatusstatus)throwsTransactionException{ DefaultTransactionStatusdefStatus=(DefaultTransactionStatus)status; processRollback(defStatus,false); } privatevoidprocessRollback(DefaultTransactionStatusstatus,booleanunexpected){ try{ booleanunexpectedRollback=unexpected; try{ triggerBeforeCompletion(status); //按照嵌套事务按照回滚点回滚 if(status.hasSavepoint()){ if(status.isDebug()){ logger.debug("Rollingbacktransactiontosavepoint"); } status.rollbackToHeldSavepoint(); } //都为PROPAGATION_REQUIRED最外层事务统一回滚 elseif(status.isNewTransaction()){ if(status.isDebug()){ logger.debug("Initiatingtransactionrollback"); } doRollback(status); } else{ //Participatinginlargertransaction if(status.hasTransaction()){ if(status.isLocalRollbackOnly()||isGlobalRollbackOnParticipationFailure()){ if(status.isDebug()){ logger.debug("Participatingtransactionfailed-markingexistingtransactionasrollback-only"); } doSetRollbackOnly(status); } else{ if(status.isDebug()){ logger.debug("Participatingtransactionfailed-lettingtransactionoriginatordecideonrollback"); } } } else{ logger.debug("Shouldrollbacktransactionbutcannot-notransactionavailable"); } //Unexpectedrollbackonlymattershereifwe'reaskedtofailearly if(!isFailEarlyOnGlobalRollbackOnly()){ unexpectedRollback=false; } } } catch(RuntimeException|Errorex){ triggerAfterCompletion(status,TransactionSynchronization.STATUS_UNKNOWN); throwex; } triggerAfterCompletion(status,TransactionSynchronization.STATUS_ROLLED_BACK); //RaiseUnexpectedRollbackExceptionifwehadaglobalrollback-onlymarker if(unexpectedRollback){ thrownewUnexpectedRollbackException( "Transactionrolledbackbecauseithasbeenmarkedasrollback-only"); } } finally{ cleanupAfterCompletion(status); } }
流程和提交是一样的,先是判断有没有回滚点,如果有就回到到回滚点并清除该回滚点;如果没有则判断是不是新事务(PROPAGATION_REQUIRED属性下的最外层事务和PROPAGATION_REQUIRES_NEW属性下的事务),满足则直接回滚当前事务。回滚完成后同样需要清除掉当前的事务状态并恢复挂起的连接。另外需要特别注意的是在catch里面调用完回滚逻辑后,还通过throw抛出了异常,这意味着什么?意味着即使是嵌套事务,内层事务的回滚也会导致外层事务的回滚,也就是addA的事务也会跟着回滚。
至此,事务的传播原理分析完毕,深入看每个方法的实现是很复杂的,但如果仅仅是分析各个传播属性对事务的影响,则有一个简单的方法。我们可以将内层事务切面等效替换掉invocation.proceedWithInvocation方法,比如上面两个类的调用可以看作是下面这样:
//addA的事务 TransactionInfotxInfo=createTransactionIfNecessary(tm,txAttr,joinpointIdentification); ObjectretVal=null; try{ //addB的事务 TransactionInfotxInfo=createTransactionIfNecessary(tm,txAttr,joinpointIdentification); ObjectretVal=null; try{ retVal=invocation.proceedWithInvocation(); } catch(Throwableex){ //targetinvocationexception //事务回滚 completeTransactionAfterThrowing(txInfo,ex); throwex; } finally{ cleanupTransactionInfo(txInfo); } //事务提交 commitTransactionAfterReturning(txInfo); } catch(Throwableex){ //事务回滚 completeTransactionAfterThrowing(txInfo,ex); throwex; } //事务提交 commitTransactionAfterReturning(txInfo);
这样看是不是很容易就能分析出事务之间的影响以及是提交还是回滚了?下面来看几个实例分析。
实例分析
我再添加一个C类,和addC的方法,然后在addA里面调用这个方法。
TransactionInfotxInfo=createTransactionIfNecessary(tm,txAttr,joinpointIdentification); ObjectretVal=null; try{ //addB的事务 TransactionInfotxInfo=createTransactionIfNecessary(tm,txAttr,joinpointIdentification); ObjectretVal=null; try{ b.addB(); } catch(Throwableex){ //targetinvocationexception //事务回滚 completeTransactionAfterThrowing(txInfo,ex); throwex; } //事务提交 commitTransactionAfterReturning(txInfo); //addC的事务 TransactionInfotxInfo=createTransactionIfNecessary(tm,txAttr,joinpointIdentification); ObjectretVal=null; try{ c.addC(); } catch(Throwableex){ //targetinvocationexception //事务回滚 completeTransactionAfterThrowing(txInfo,ex); throwex; } //事务提交 commitTransactionAfterReturning(txInfo); } catch(Throwableex){ //事务回滚 completeTransactionAfterThrowing(txInfo,ex); throwex; } //事务提交 commitTransactionAfterReturning(txInfo);
等效替换后就是上面这个代码,我们分别来分析。
都是PROPAGATION_REQUIRED属性:通过上面的分析,我们知道三个方法都是同一个连接和事务,那么任何一个出现异常则都会回滚。
addB为PROPAGATION_REQUIRES_NEW:
如果B中抛出异常,那么B中肯定会回滚,接着异常向上抛,导致A事务整体回滚;
如果C中抛出异常,不难看出C和A都会回滚,但B已经提交了,因此不会受影响。
addC为PROPAGATION_NESTED,addB为PROPAGATION_REQUIRES_NEW:
如果B中抛出异常,那么B回滚并抛出异常,A也回滚,C不会执行;
如果C中抛出异常,先是回滚到回滚点并抛出异常,所以A也回滚,但B此时已经提交,不受影响。
都是PROPAGATION_NESTED:虽然创建了回滚点,但是仍然是同一个连接,任何一个发生异常都会回滚,如果不想影响彼此,可以try-catch生吞子事务的异常实现。
还有其它很多情况,这里就不一一列举了,只要使用上面的分析方法都能够很轻松的分析出来。
总结
本篇详细分析了事务的传播原理,另外还有隔离级别,这在Spring中没有体现,需要我们自己结合数据库的知识进行分析设置。最后我们还需要考虑声明式事务和编程式事务的优缺点,声明式事务虽然简单,但不适合用在长事务中,会占用大量连接资源,这时就需要考虑利用编程式事务的灵活性了。
总而言之,事务的使用并不是一律默认就好,接口的一致性和吞吐量与事务有着直接关系,严重情况下可能会导致系统崩溃。
以上这篇这一次搞懂Spring事务是如何传播的就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。