CountDownLatch源码解析之countDown()
CountDownLatch源码解析——countDown()
上一篇文章从源码层面说了一下CountDownLatch中await()的原理。这篇文章说一下countDown()。
publicvoidcountDown(){//CountDownLatch
sync.releaseShared(1);
}
↓
publicfinalbooleanreleaseShared(intarg){//AQS
if(tryReleaseShared(arg)){
doReleaseShared();
returntrue;
}
returnfalse;
}
↓
protectedbooleantryReleaseShared(intreleases){//CountDownLatch.Sync
//Decrementcount;signalwhentransitiontozero
for(;;){
intc=getState();
if(c==0)
returnfalse;
intnextc=c-1;
if(compareAndSetState(c,nextc))
returnnextc==0;
}
}
通过构造器CountDownLatchend=newCountDownLatch(2); state被设置为2,所以c==2,nextc=2-1,
然后通过下面这个CAS操作将state设置为1。
protectedfinalbooleancompareAndSetState(intexpect,intupdate){
//Seebelowforintrinsicssetuptosupportthis
returnunsafe.compareAndSwapInt(this,stateOffset,expect,update);
}
此时nextc还不为0,返回false。一直等到countDown() 方法被调用两次,state==0,nextc==0,此时返回true。
进入doReleaseShared()方法。
doReleaseShared();
↓
privatevoiddoReleaseShared(){
/*
*Ensurethatareleasepropagates,evenifthereareother
*in-progressacquires/releases.Thisproceedsintheusual
*wayoftryingtounparkSuccessorofheadifitneeds
*signal.Butifitdoesnot,statusissettoPROPAGATEto
*ensurethatuponrelease,propagationcontinues.
*Additionally,wemustloopincaseanewnodeisadded
*whilewearedoingthis.Also,unlikeotherusesof
*unparkSuccessor,weneedtoknowifCAStoresetstatus
*fails,ifsorechecking.
*/
for(;;){
Nodeh=head;
if(h!=null&&h!=tail){
intws=h.waitStatus;
if(ws==Node.SIGNAL){
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;//looptorecheckcases
unparkSuccessor(h);
}
elseif(ws==0&&
!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue;//looponfailedCAS
}
if(h==head)//loopifheadchanged
break;
}
}
回顾一下此时的等待队列模型。
+--------------------------+prev+------------------+ head|waitStatus=Node.SIGNAL|<----node(tail)|currentThread| +--------------------------++------------------+
此时head不为null,也不为tail,waitStatus==Node.SIGNAL,所以进入if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))这个判断。
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
↓
/**
*CASwaitStatusfieldofanode.
*/
privatestaticfinalbooleancompareAndSetWaitStatus(Nodenode,
intexpect,
intupdate){
returnunsafe.compareAndSwapInt(node,waitStatusOffset,
expect,update);
}
这个CAS操作将state设置为0,也就是说此时Head中的waitStatus是0.此时队列模型如下所示
+----------------+prev+------------------+ head|waitStatus=0|<----node(tail)|currentThread| +----------------++------------------+
该方法返回true。进入unparkSuccessor(h);
unparkSuccessor(h);
↓
privatevoidunparkSuccessor(Nodenode){
/*
*Ifstatusisnegative(i.e.,possiblyneedingsignal)try
*toclearinanticipationofsignalling.ItisOKifthis
*failsorifstatusischangedbywaitingthread.
*/
intws=node.waitStatus;
if(ws<0)
compareAndSetWaitStatus(node,ws,0);
/*
*Threadtounparkisheldinsuccessor,whichisnormally
*justthenextnode.Butifcancelledorapparentlynull,
*traversebackwardsfromtailtofindtheactual
*non-cancelledsuccessor.
*/
Nodes=node.next;
if(s==null||s.waitStatus>0){
s=null;
for(Nodet=tail;t!=null&&t!=node;t=t.prev)
if(t.waitStatus<=0)
s=t;
}
if(s!=null)
LockSupport.unpark(s.thread);
}
s就是head的后继结点,也就是装有当前线程的结点。s!=null,并且s.waitStatus==0,所以进入LockSupport.unpark(s.thread);
publicstaticvoidunpark(Threadthread){
if(thread!=null)
UNSAFE.unpark(thread);
}
也就是unlock被阻塞的线程。裁判被允许吹哨了!
countDown()的原理就此就非常清晰了。
每执行一次countDown()方法,state就是减1,直到state==0,则开始释放被阻塞在队列中的线程,根据前驱结点中waitStatus的状态,释放后续结点中的线程。
OK,回到上一篇文章的问题,什么时候跳出下面这个循环(await方法中的循环)
for(;;){
finalNodep=node.predecessor();
if(p==head){
intr=tryAcquireShared(arg);
if(r>=0){
setHeadAndPropagate(node,r);
p.next=null;//helpGC
failed=false;
return;
}
}
if(shouldParkAfterFailedAcquire(p,node)&&
parkAndCheckInterrupt())
thrownewInterruptedException();
}
此时state==0,所以进入setHeadAndPropagate方法。
setHeadAndPropagate(node,r);
↓
privatevoidsetHeadAndPropagate(Nodenode,intpropagate){
Nodeh=head;//Recordoldheadforcheckbelow
setHead(node);
/*
*Trytosignalnextqueuednodeif:
*Propagationwasindicatedbycaller,
*orwasrecorded(ash.waitStatuseitherbefore
*oraftersetHead)byapreviousoperation
*(note:thisusessign-checkofwaitStatusbecause
*PROPAGATEstatusmaytransitiontoSIGNAL.)
*and
*Thenextnodeiswaitinginsharedmode,
*orwedon'tknow,becauseitappearsnull
*
*Theconservatisminbothofthesechecksmaycause
*unnecessarywake-ups,butonlywhentherearemultiple
*racingacquires/releases,somostneedsignalsnoworsoon
*anyway.
*/
if(propagate>0||h==null||h.waitStatus<0||
(h=head)==null||h.waitStatus<0){
Nodes=node.next;
if(s==null||s.isShared())
doReleaseShared();
}
}
↓
privatevoidsetHead(Nodenode){
head=node;
node.thread=null;
node.prev=null;
}
这个方法将head的后继结点变为head。该方法过后,又将node的next结点设置为null,模型变成下图
prev+---------+next null<----node(tail/head)|null|---->null +---------+
也就是nodeheadtail什么的都被置为null,等待GC回收了,这个时候return,跳出了for循环,队列被清空。
下面演示一下整个过程
setHeadAndPropagate(node,r); +----------------+ head(tail)|waitStatus=0| |thread=null| +----------------+ ↓ +----------------++----------------+ |waitStatus=0|prev|waitStatus=0| head(tail)|thread=null|<----node|currentThread| +----------------++----------------+ ↓ +----------------++----------------+ |waitStatus=0|prev|waitStatus=0| head|thread=null|<----node(tail)|currentThread| +----------------++----------------+ ↓ +----------------++----------------+ |Node.SIGNAL|prev|waitStatus=0| head|thread=null|<----node(tail)|currentThread| +----------------++----------------+ ↓ +----------------++----------------+ |waitStatus=0|prev|waitStatus=0| head|thread=null|<----node(tail)|currentThread| +----------------++----------------+ ↓ +----------------+ prev|waitStatus=0|next null<----node(tail/head)|null|---->null +----------------+
CountDownLatch的核心就是一个阻塞线程队列,这是由链表构造而成的队列,里面包含thread和waitStatus,其中waitStatus说明了后继结点线程状态。
state是一个非常重要的标志,构造时,设置为对应的n值,如果n!=0,阻塞队列将一直阻塞,除非中断线程。
每次调用countDown() 方法,就是将state-1,而调用await()方法就是将调用该方法的线程加入到阻塞队列,直到state==0,才能释放线程。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。