CountDownLatch源码解析之countDown()
CountDownLatch源码解析——countDown()
上一篇文章从源码层面说了一下CountDownLatch中await()的原理。这篇文章说一下countDown()。
publicvoidcountDown(){//CountDownLatch sync.releaseShared(1); } ↓ publicfinalbooleanreleaseShared(intarg){//AQS if(tryReleaseShared(arg)){ doReleaseShared(); returntrue; } returnfalse; } ↓ protectedbooleantryReleaseShared(intreleases){//CountDownLatch.Sync //Decrementcount;signalwhentransitiontozero for(;;){ intc=getState(); if(c==0) returnfalse; intnextc=c-1; if(compareAndSetState(c,nextc)) returnnextc==0; } }
通过构造器CountDownLatchend=newCountDownLatch(2); state被设置为2,所以c==2,nextc=2-1,
然后通过下面这个CAS操作将state设置为1。
protectedfinalbooleancompareAndSetState(intexpect,intupdate){ //Seebelowforintrinsicssetuptosupportthis returnunsafe.compareAndSwapInt(this,stateOffset,expect,update); }
此时nextc还不为0,返回false。一直等到countDown() 方法被调用两次,state==0,nextc==0,此时返回true。
进入doReleaseShared()方法。
doReleaseShared(); ↓ privatevoiddoReleaseShared(){ /* *Ensurethatareleasepropagates,evenifthereareother *in-progressacquires/releases.Thisproceedsintheusual *wayoftryingtounparkSuccessorofheadifitneeds *signal.Butifitdoesnot,statusissettoPROPAGATEto *ensurethatuponrelease,propagationcontinues. *Additionally,wemustloopincaseanewnodeisadded *whilewearedoingthis.Also,unlikeotherusesof *unparkSuccessor,weneedtoknowifCAStoresetstatus *fails,ifsorechecking. */ for(;;){ Nodeh=head; if(h!=null&&h!=tail){ intws=h.waitStatus; if(ws==Node.SIGNAL){ if(!compareAndSetWaitStatus(h,Node.SIGNAL,0)) continue;//looptorecheckcases unparkSuccessor(h); } elseif(ws==0&& !compareAndSetWaitStatus(h,0,Node.PROPAGATE)) continue;//looponfailedCAS } if(h==head)//loopifheadchanged break; } }
回顾一下此时的等待队列模型。
+--------------------------+prev+------------------+ head|waitStatus=Node.SIGNAL|<----node(tail)|currentThread| +--------------------------++------------------+
此时head不为null,也不为tail,waitStatus==Node.SIGNAL,所以进入if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))这个判断。
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0)) ↓ /** *CASwaitStatusfieldofanode. */ privatestaticfinalbooleancompareAndSetWaitStatus(Nodenode, intexpect, intupdate){ returnunsafe.compareAndSwapInt(node,waitStatusOffset, expect,update); }
这个CAS操作将state设置为0,也就是说此时Head中的waitStatus是0.此时队列模型如下所示
+----------------+prev+------------------+ head|waitStatus=0|<----node(tail)|currentThread| +----------------++------------------+
该方法返回true。进入unparkSuccessor(h);
unparkSuccessor(h); ↓ privatevoidunparkSuccessor(Nodenode){ /* *Ifstatusisnegative(i.e.,possiblyneedingsignal)try *toclearinanticipationofsignalling.ItisOKifthis *failsorifstatusischangedbywaitingthread. */ intws=node.waitStatus; if(ws<0) compareAndSetWaitStatus(node,ws,0); /* *Threadtounparkisheldinsuccessor,whichisnormally *justthenextnode.Butifcancelledorapparentlynull, *traversebackwardsfromtailtofindtheactual *non-cancelledsuccessor. */ Nodes=node.next; if(s==null||s.waitStatus>0){ s=null; for(Nodet=tail;t!=null&&t!=node;t=t.prev) if(t.waitStatus<=0) s=t; } if(s!=null) LockSupport.unpark(s.thread); }
s就是head的后继结点,也就是装有当前线程的结点。s!=null,并且s.waitStatus==0,所以进入LockSupport.unpark(s.thread);
publicstaticvoidunpark(Threadthread){ if(thread!=null) UNSAFE.unpark(thread); }
也就是unlock被阻塞的线程。裁判被允许吹哨了!
countDown()的原理就此就非常清晰了。
每执行一次countDown()方法,state就是减1,直到state==0,则开始释放被阻塞在队列中的线程,根据前驱结点中waitStatus的状态,释放后续结点中的线程。
OK,回到上一篇文章的问题,什么时候跳出下面这个循环(await方法中的循环)
for(;;){ finalNodep=node.predecessor(); if(p==head){ intr=tryAcquireShared(arg); if(r>=0){ setHeadAndPropagate(node,r); p.next=null;//helpGC failed=false; return; } } if(shouldParkAfterFailedAcquire(p,node)&& parkAndCheckInterrupt()) thrownewInterruptedException(); }
此时state==0,所以进入setHeadAndPropagate方法。
setHeadAndPropagate(node,r); ↓ privatevoidsetHeadAndPropagate(Nodenode,intpropagate){ Nodeh=head;//Recordoldheadforcheckbelow setHead(node); /* *Trytosignalnextqueuednodeif: *Propagationwasindicatedbycaller, *orwasrecorded(ash.waitStatuseitherbefore *oraftersetHead)byapreviousoperation *(note:thisusessign-checkofwaitStatusbecause *PROPAGATEstatusmaytransitiontoSIGNAL.) *and *Thenextnodeiswaitinginsharedmode, *orwedon'tknow,becauseitappearsnull * *Theconservatisminbothofthesechecksmaycause *unnecessarywake-ups,butonlywhentherearemultiple *racingacquires/releases,somostneedsignalsnoworsoon *anyway. */ if(propagate>0||h==null||h.waitStatus<0|| (h=head)==null||h.waitStatus<0){ Nodes=node.next; if(s==null||s.isShared()) doReleaseShared(); } } ↓ privatevoidsetHead(Nodenode){ head=node; node.thread=null; node.prev=null; }
这个方法将head的后继结点变为head。该方法过后,又将node的next结点设置为null,模型变成下图
prev+---------+next null<----node(tail/head)|null|---->null +---------+
也就是nodeheadtail什么的都被置为null,等待GC回收了,这个时候return,跳出了for循环,队列被清空。
下面演示一下整个过程
setHeadAndPropagate(node,r); +----------------+ head(tail)|waitStatus=0| |thread=null| +----------------+ ↓ +----------------++----------------+ |waitStatus=0|prev|waitStatus=0| head(tail)|thread=null|<----node|currentThread| +----------------++----------------+ ↓ +----------------++----------------+ |waitStatus=0|prev|waitStatus=0| head|thread=null|<----node(tail)|currentThread| +----------------++----------------+ ↓ +----------------++----------------+ |Node.SIGNAL|prev|waitStatus=0| head|thread=null|<----node(tail)|currentThread| +----------------++----------------+ ↓ +----------------++----------------+ |waitStatus=0|prev|waitStatus=0| head|thread=null|<----node(tail)|currentThread| +----------------++----------------+ ↓ +----------------+ prev|waitStatus=0|next null<----node(tail/head)|null|---->null +----------------+
CountDownLatch的核心就是一个阻塞线程队列,这是由链表构造而成的队列,里面包含thread和waitStatus,其中waitStatus说明了后继结点线程状态。
state是一个非常重要的标志,构造时,设置为对应的n值,如果n!=0,阻塞队列将一直阻塞,除非中断线程。
每次调用countDown() 方法,就是将state-1,而调用await()方法就是将调用该方法的线程加入到阻塞队列,直到state==0,才能释放线程。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。