详解Java回环屏障CyclicBarrier
上一篇说的CountDownLatch是一个计数器,类似线程的join方法,但是有一个缺陷,就是当计数器的值到达0之后,再调用CountDownLatch的await和countDown方法就会立刻返回,就没有作用了,那么反正是一个计数器,为什么不能重复使用呢?于是就出现了这篇说的CyclicBarrier,它的状态可以被重用;
一.简单例子
用法其实和CountDownLatch差不多,也就是一个计数器,当计数器的值变为0之后,就会把阻塞的线程唤醒:
packagecom.example.demo.study;
importjava.util.concurrent.CyclicBarrier;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
publicclassStudy0216{
//注意这里的构造器,第一个参数表示计数器初始值
//第二个参数表示当计数器的值变为0的时候就触发的任务
staticCyclicBarriercyclicBarrier=newCyclicBarrier(2,()->{
System.out.println("cyclicBarriertask");
});
publicstaticvoidmain(String[]args){
//新建两个线程的线程池
ExecutorServicepool=Executors.newFixedThreadPool(2);
//线程1放入线程池中
pool.submit(()->{
try{
System.out.println("Thread1----await-begin");
cyclicBarrier.await();
System.out.println("Thread1----await-end");
}catch(Exceptione){
e.printStackTrace();
}
});
//线程2放到线程池中
pool.submit(()->{
try{
System.out.println("Thread2----await-begin");
cyclicBarrier.await();
System.out.println("Thread2----await-end");
}catch(Exceptione){
e.printStackTrace();
}
});
//关闭线程池,此时还在执行的任务会继续执行
pool.shutdown();
}
}
我们再看看CyclicBarrier的复用性,这里比如有一个任务,有三部分组成,分别是A,B,C,然后创建两个线程去执行这个任务,必须要等到两个线程都执行完成A部分,然后才能开始执行B,只有两个线程都执行完成B部分,才能执行C:
packagecom.example.demo.study;
importjava.util.concurrent.CyclicBarrier;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
publicclassStudy0216{
//这里的构造器,只有一个参数,表示计数器初始值
staticCyclicBarriercyclicBarrier=newCyclicBarrier(2);
publicstaticvoidmain(String[]args){
//新建两个线程的线程池
ExecutorServicepool=Executors.newFixedThreadPool(2);
//线程1放入线程池中
pool.submit(()->{
try{
System.out.println("Thread1----stepA-start");
cyclicBarrier.await();
System.out.println("Thread1----stepB-start");
cyclicBarrier.await();
System.out.println("Thread1----stepC-start");
}catch(Exceptione){
e.printStackTrace();
}
});
//线程2放到线程池中
pool.submit(()->{
try{
System.out.println("Thread2----stepA-start");
cyclicBarrier.await();
System.out.println("Thread2----stepB-start");
cyclicBarrier.await();
System.out.println("Thread2----stepC-start");
}catch(Exceptione){
e.printStackTrace();
}
});
//关闭线程池,此时还在执行的任务会继续执行
pool.shutdown();
}
}
二.基本原理
我们看看一些重要属性:
publicclassCyclicBarrier{
//这个内部类只有一个boolean值
privatestaticclassGeneration{
booleanbroken=false;
}
//独占锁
privatefinalReentrantLocklock=newReentrantLock();
//条件变量
privatefinalConditiontrip=lock.newCondition();
//保存线程的总数
privatefinalintparties;
//这是一个任务,通过构造器传递一个任务,当计数器变为0之后,就可以执行这个任务
privatefinalRunnablebarrierCommand;
//这类内部之后一个boolean的值,表示屏障是否被打破
privateGenerationgeneration=newGeneration();
//计数器
privateintcount;
}
构造器:
//我们的构造器初始值设置的是parties
publicCyclicBarrier(intparties){
this(parties,null);
}
//注意,这里开始的时候是count等于parties
//为什么要有两个变量呢?我们每次调用await方法的时候count减一,当count的值变为0之后,怎么又还原成初始值呢?
//直接就把parties的值赋值给count就行了呀,简单吧!
publicCyclicBarrier(intparties,RunnablebarrierAction){
if(parties<=0)thrownewIllegalArgumentException();
this.parties=parties;
this.count=parties;
this.barrierCommand=barrierAction;
}
然后再看看await方法:
publicintawait()throwsInterruptedException,BrokenBarrierException{
try{
//调用的是dowait方法
returndowait(false,0L);
}catch(TimeoutExceptiontoe){
thrownewError(toe);//cannothappen
}
}
//假设count等于3,有三个线程都在调用这个方法,默认超时时间为0,那么首每次都只有一个线程可以获取锁,将count减一,不为0
//就会到下面的for循环中扔到条件队列中挂起;直到第三个线程调用这个dowait方法,count减一等于0,那么当前线程执行任务之后,
//就会唤醒条件变量中阻塞的线程,并重置count为初始值3
privateintdowait(booleantimed,longnanos)throwsInterruptedException,BrokenBarrierException,TimeoutException{
//获取锁
finalReentrantLocklock=this.lock;
lock.lock();
try{
//g中只有一个boolean值
finalGenerationg=generation;
//如果g中的值为true的时候,抛错
if(g.broken)
thrownewBrokenBarrierException();
//如果当前线程中断,就抛错
if(Thread.interrupted()){
breakBarrier();
thrownewInterruptedException();
}
//count减一,再赋值给index
intindex=--count;
//如果index等于0的时候,说明所有的线程已经到屏障点了,就可以
if(index==0){//tripped
booleanranAction=false;
try{
//执行当前线程的任务
finalRunnablecommand=barrierCommand;
if(command!=null)
command.run();
ranAction=true;
//唤醒其他因为调用了await方法阻塞的线程
nextGeneration();
return0;
}finally{
if(!ranAction)
breakBarrier();
}
}
//能到这里来,说明是count不等于0,也就是还有的线程没有到屏障点
for(;;){
try{
//wait方法有两种情况,一种是设置超时时间,一种是不设置超时时间
//这里就是对超时时间进行的一个判断,如果设置的超时时间为0,则会在条件队列中无限的等待下去,直到被唤醒
//设置了超时时间,那就等待该时间
if(!timed)
trip.await();
elseif(nanos>0L)
nanos=trip.awaitNanos(nanos);
}catch(InterruptedExceptionie){
if(g==generation&&!g.broken){
breakBarrier();
throwie;
}else{
Thread.currentThread().interrupt();
}
}
if(g.broken)
thrownewBrokenBarrierException();
if(g!=generation)
returnindex;
if(timed&&nanos<=0L){
breakBarrier();
thrownewTimeoutException();
}
}
}finally{
//释放锁
lock.unlock();
}
}
//唤醒其他因为调用了await方法阻塞的线程
privatevoidnextGeneration(){
//唤醒条件变量中所有线程
trip.signalAll();
//重置count的值
count=parties;
generation=newGeneration();
}
privatevoidbreakBarrier(){
generation.broken=true;
//重置count为初始值parties
count=parties;
//唤醒条件队列中的所有线程
trip.signalAll();
}
以上就是详解Java回环屏障CyclicBarrier的详细内容,更多关于JavaCyclicBarrier的资料请关注毛票票其它相关文章!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。