详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别
前言
CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。
内部实现差异
前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。
publicclass{
//SynchronizationcontrolForCountDownLatch.
//UsesAQSstatetorepresentcount.
privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{
privatestaticfinallongserialVersionUID=4982264981922014374L;
Sync(intcount){
setState(count);
}
intgetCount(){
returngetState();
}
protectedinttryAcquireShared(intacquires){
return(getState()==0)?1:-1;
}
protectedbooleantryReleaseShared(intreleases){
//Decrementcount;signalwhentransitiontozero
for(;;){
intc=getState();
if(c==0)
returnfalse;
intnextc=c-1;
if(compareAndSetState(c,nextc))
returnnextc==0;
}
}
}
privatefinalSyncsync;
......//
}
publicclassCyclicBarrier{
/**
*Eachuseofthebarrierisrepresentedasagenerationinstance.
*Thegenerationchangeswheneverthebarrieristripped,or
*isreset.Therecanbemanygenerationsassociatedwiththreads
*usingthebarrier-duetothenon-deterministicwaythelock
*maybeallocatedtowaitingthreads-butonlyoneofthese
*canbeactiveatatime(theonetowhich{@codecount}applies)
*andalltherestareeitherbrokenortripped.
*Thereneednotbeanactivegenerationiftherehasbeenabreak
*butnosubsequentreset.
*/
privatestaticclassGeneration{
booleanbroken=false;
}
/**Thelockforguardingbarrierentry*/
privatefinalReentrantLocklock=newReentrantLock();
/**Conditiontowaitonuntiltripped*/
privatefinalConditiontrip=lock.newCondition();
/**Thenumberofparties*/
privatefinalintparties;
/*Thecommandtorunwhentripped*/
privatefinalRunnablebarrierCommand;
/**Thecurrentgeneration*/
privateGenerationgeneration=newGeneration();
/**
*Numberofpartiesstillwaiting.Countsdownfrompartiesto0
*oneachgeneration.Itisresettopartiesoneachnew
*generationorwhenbroken.
*/
privateintcount;
/**
*Updatesstateonbarriertripandwakesupeveryone.
*Calledonlywhileholdinglock.
*/
privatevoidnextGeneration(){
//signalcompletionoflastgeneration
trip.signalAll();
//setupnextgeneration
count=parties;
generation=newGeneration();
}
/**
*Setscurrentbarriergenerationasbrokenandwakesupeveryone.
*Calledonlywhileholdinglock.
*/
privatevoidbreakBarrier(){
generation.broken=true;
count=parties;
trip.signalAll();
}
/**
*Mainbarriercode,coveringthevariouspolicies.
*/
privateintdowait(booleantimed,longnanos)
throwsInterruptedException,BrokenBarrierException,
TimeoutException{
finalReentrantLocklock=this.lock;
lock.lock();
try{
finalGenerationg=generation;
if(g.broken)
thrownewBrokenBarrierException();
if(Thread.interrupted()){
breakBarrier();
thrownewInterruptedException();
}
intindex=--count;
if(index==0){//tripped
booleanranAction=false;
try{
finalRunnablecommand=barrierCommand;
if(command!=null)
command.run();
ranAction=true;
nextGeneration();
return0;
}finally{
if(!ranAction)
breakBarrier();
}
}
//loopuntiltripped,broken,interrupted,ortimedout
for(;;){
try{
if(!timed)
trip.await();
elseif(nanos>0L)
nanos=trip.awaitNanos(nanos);
}catch(InterruptedExceptionie){
if(g==generation&&!g.broken){
breakBarrier();
throwie;
}else{
//We'reabouttofinishwaitingevenifwehadnot
//beeninterrupted,sothisinterruptisdeemedto
//"belong"tosubsequentexecution.
Thread.currentThread().interrupt();
}
}
if(g.broken)
thrownewBrokenBarrierException();
if(g!=generation)
returnindex;
if(timed&&nanos<=0L){
breakBarrier();
thrownewTimeoutException();
}
}
}finally{
lock.unlock();
}
}
......//
}
实战-展示各自的使用场景
/**
*类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
*/
publicclassUseCountDownLatch{
staticCountDownLatchlatch=newCountDownLatch(6);
/*初始化线程*/
privatestaticclassInitThreadimplementsRunnable{
publicvoidrun(){
System.out.println("Thread_"+Thread.currentThread().getId()
+"readyinitwork......");
latch.countDown();
for(inti=0;i<2;i++){
System.out.println("Thread_"+Thread.currentThread().getId()
+"........continuedoitswork");
}
}
}
/*业务线程等待latch的计数器为0完成*/
privatestaticclassBusiThreadimplementsRunnable{
publicvoidrun(){
try{
latch.await();
}catch(InterruptedExceptione){
e.printStackTrace();
}
for(inti=0;i<3;i++){
System.out.println("BusiThread_"+Thread.currentThread().getId()
+"dobusiness-----");
}
}
}
publicstaticvoidmain(String[]args)throwsInterruptedException{
newThread(newRunnable(){
publicvoidrun(){
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+"readyinitworkstep1st......");
latch.countDown();
System.out.println("beginstep2nd.......");
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+"readyinitworkstep2nd......");
latch.countDown();
}
}).start();
newThread(newBusiThread()).start();
for(inti=0;i<=3;i++){
Threadthread=newThread(newInitThread());
thread.start();
}
latch.await();
System.out.println("Maindoiteswork........");
}
}
/**
*类说明:共4个子线程,他们全部完成工作后,交出自己结果,
*再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
*/
classUseCyclicBarrier{
privatestaticCyclicBarrierbarrier
=newCyclicBarrier(4,newCollectThread());
//存放子线程工作结果的容器
privatestaticConcurrentHashMapresultMap
=newConcurrentHashMap();
publicstaticvoidmain(String[]args){
for(inti=0;i<4;i++){
Threadthread=newThread(newSubThread());
thread.start();
}
}
/*汇总的任务*/
privatestaticclassCollectThreadimplementsRunnable{
@Override
publicvoidrun(){
StringBuilderresult=newStringBuilder();
for(Map.EntryworkResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println("theresult="+result);
System.out.println("dootherbusiness........");
}
}
/*相互等待的子线程*/
privatestaticclassSubThreadimplementsRunnable{
@Override
publicvoidrun(){
longid=Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId()+"",id);
try{
Thread.sleep(1000+id);
System.out.println("Thread_"+id+"....dosomething");
barrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+"....doitsbusiness");
barrier.await();
}catch(Exceptione){
e.printStackTrace();
}
}
}
}
两者总结
1.Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;
2.Newcyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;
3.协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;
4.从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;
5.countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;
6.就使用场景而言,countdownlatch更适用于框架加载前的一系列初始化工作等场景;cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;
到此这篇关于详解javaCountDownLatch和CyclicBarrier在内部实现和场景上的区别的文章就介绍到这了,更多相关javaCountDownLatch和CyclicBarrier区别内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!