Java高并发BlockingQueue重要的实现类详解
ArrayBlockingQueue
有界的阻塞队列,内部是一个数组,有边界的意思是:容量是有限的,必须进行初始化,指定它的容量大小,以先进先出的方式存储数据,最新插入的在对尾,最先移除的对象在头部。
publicclassArrayBlockingQueueextendsAbstractQueue implementsBlockingQueue ,java.io.Serializable{ /**队列元素*/ finalObject[]items; /**下一次读取操作的位置,poll,peekorremove*/ inttakeIndex; /**下一次写入操作的位置,offer,oradd*/ intputIndex; /**元素数量*/ intcount; /* *Concurrencycontrolusestheclassictwo-conditionalgorithm *foundinanytextbook. *它采用一个ReentrantLock和相应的两个Condition来实现。 */ /**Mainlockguardingallaccess*/ finalReentrantLocklock; /**Conditionforwaitingtakes*/ privatefinalConditionnotEmpty; /**Conditionforwaitingputs*/ privatefinalConditionnotFull; /**指定大小*/ publicArrayBlockingQueue(intcapacity){ this(capacity,false); } /** *指定容量大小与指定访问策略 *@paramfair指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁; */ publicArrayBlockingQueue(intcapacity,booleanfair){} /** *指定容量大小、指定访问策略与最初包含给定集合中的元素 *@paramc将此集合中的元素在构造方法期间就先添加到队列中 */ publicArrayBlockingQueue(intcapacity,booleanfair, Collectionc){} }
- ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为ArrayBlockingQueue的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
- 通过构造函数得知,参数fair控制对象内部是否采用公平锁,默认采用非公平锁。
- items、takeIndex、putIndex、count等属性并没有使用volatile修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如size()。
- 另外有个独占锁lock用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。
Put源码分析
/**进行入队操作*/ publicvoidput(Ee)throwsInterruptedException{ //e为null,则抛出NullPointerException异常 checkNotNull(e); //获取独占锁 finalReentrantLocklock=this.lock; /** *lockInterruptibly() *获取锁定,除非当前线程为interrupted *如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。 *如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。 *如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态 * */ lock.lockInterruptibly(); try{ //空队列 while(count==items.length) //进行条件等待处理 notFull.await(); //入队操作 enqueue(e); }finally{ //释放锁 lock.unlock(); } } /**真正的入队*/ privatevoidenqueue(Ex){ //assertlock.getHoldCount()==1; //assertitems[putIndex]==null; //获取当前元素 finalObject[]items=this.items; //按下一个插入索引进行元素添加 items[putIndex]=x; //计算下一个元素应该存放的下标,可以理解为循环队列 if(++putIndex==items.length) putIndex=0; count++; //唤起消费者 notEmpty.signal(); }
这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加锁后获取的共享变量都是从主内存中获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新到主内存。
另外这个队列使用循环数组实现,所以在计算下一个元素存放下标时候有些特殊。另外insert后调用notEmpty.signal();是为了激活调用notEmpty.await();阻塞后放入notEmpty条件队列的线程。
Take源码分析
publicEtake()throwsInterruptedException{ finalReentrantLocklock=this.lock; lock.lockInterruptibly(); try{ while(count==0) notEmpty.await(); returndequeue(); }finally{ lock.unlock(); } } privateEdequeue(){ //assertlock.getHoldCount()==1; //assertitems[takeIndex]!=null; finalObject[]items=this.items; @SuppressWarnings("unchecked") Ex=(E)items[takeIndex]; items[takeIndex]=null; if(++takeIndex==items.length) takeIndex=0; count--; //这里有些特殊 if(itrs!=null) //保持队列中的元素和迭代器的元素一致 itrs.elementDequeued(); notFull.signal(); returnx; }
Take操作和Put操作很类似
//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器 transientItrsitrs=null;//其存放了目前所创建的所有迭代器。 /** *迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。 */ classItrs{ voidelementDequeued(){ //assertlock.getHoldCount()==1; if(count==0) //队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除 queueIsEmpty(); //takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取 elseif(takeIndex==0) takeIndexWrapped(); } /** *当队列为空的时候做的事情 *1.通知所有迭代器队列已经为空 *2.清空所有的弱引用,并且将迭代器置空 */ voidqueueIsEmpty(){} /** *将takeIndex包装成0 *并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象) *也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。 */ voidtakeIndexWrapped(){} }
Itrs迭代器创建的时机
//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象 //那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空 publicIteratoriterator(){ returnnewItr(); } privateclassItrimplementsIterator { Itr(){ //这里就是生产它的地方 //count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。 //否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。 if(count==0){ //assertitrs==null; cursor=NONE; nextIndex=NONE; prevTakeIndex=DETACHED; }else{ finalinttakeIndex=ArrayBlockingQueue.this.takeIndex; prevTakeIndex=takeIndex; nextItem=itemAt(nextIndex=takeIndex); cursor=incCursor(takeIndex); if(itrs==null){ itrs=newItrs(this); }else{ itrs.register(this);//inthisorder itrs.doSomeSweeping(false); } prevCycles=itrs.cycles; //asserttakeIndex>=0; //assertprevTakeIndex==takeIndex; //assertnextIndex>=0; //assertnextItem!=null; } } }
代码演示
packagecom.rumenz.task; importjava.util.concurrent.ArrayBlockingQueue; importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; /** *@className:BlockingQuqueExample *@description:TODO类描述 *@author:mac *@date:2021/1/20 **/ publicclassBlockingQueueExample{ privatestaticvolatileBooleanflag=false; publicstaticvoidmain(String[]args){ BlockingQueueblockingQueue=newArrayBlockingQueue(1024); ExecutorServiceexecutorService=Executors.newFixedThreadPool(2); executorService.execute(()->{ try{ blockingQueue.put(1); Thread.sleep(2000); blockingQueue.put(3); flag=true; }catch(Exceptione){ e.printStackTrace(); } }); executorService.execute(()->{ try{ while(!flag){ Integeri=(Integer)blockingQueue.take(); System.out.println(i); } }catch(Exceptione){ e.printStackTrace(); } }); executorService.shutdown(); } }
LinkedBlockingQueue
基于链表的阻塞队列,通ArrayBlockingQueue类似,其内部也维护这一个数据缓冲队列(该队列由一个链表构成),当生产者往队列放入一个数据时,队列会从生产者手上获取数据,并缓存在队列的内部,而生产者立即返回,只有当队列缓冲区到达最大值容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞队列,直到消费者从队列中消费掉一份数据,生产者会被唤醒,反之对于消费者这端的处理也基于同样的原理。
LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行的操作队列中的数据,以调高整个队列的并发能力。
如果构造一个LinkedBlockingQueue对象,而没有指定容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量Integer.MAX_VALUE,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。
LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。
publicclassLinkedBlockingQueueextendsAbstractQueue implementsBlockingQueue ,java.io.Serializable{ //队列的容量,指定大小或为默认值Integer.MAX_VALUE privatefinalintcapacity; //元素的数量 privatefinalAtomicIntegercount=newAtomicInteger(); //队列头节点,始终满足head.item==null transientNode head; //队列的尾节点,始终满足last.next==null privatetransientNode last; /**Lockheldbytake,poll,etc*/ //出队的锁:take,poll,peek等读操作的方法需要获取到这个锁 privatefinalReentrantLocktakeLock=newReentrantLock(); /**Waitqueueforwaitingtakes*/ //当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待notEmpty条件 privatefinalConditionnotEmpty=takeLock.newCondition(); /**Lockheldbyput,offer,etc*/ //入队的锁:put,offer等写操作的方法需要获取到这个锁 privatefinalReentrantLockputLock=newReentrantLock(); /**Waitqueueforwaitingputs*/ //当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待notFull条件 privatefinalConditionnotFull=putLock.newCondition(); //传说中的无界队列 publicLinkedBlockingQueue(){} //传说中的有界队列 publicLinkedBlockingQueue(intcapacity){ if(capacity<=0)thrownewIllegalArgumentException(); this.capacity=capacity; last=head=newNode (null); } //传说中的无界队列 publicLinkedBlockingQueue(Collectionc){} /** *链表节点类 */ staticclassNode { Eitem; /** *Oneof: *-真正的继任者节点 *-这个节点,意味着继任者是head.next *-空,意味着没有后继者(这是最后一个节点) */ Node next; Node(Ex){item=x;} } }
通过其构造函数,得知其可以当做无界队列也可以当做有界队列来使用。
这里用了两把锁分别是takeLock和putLock,而Condition分别是notEmpty和notFull,它们是这样搭配的。
takeLock
putLock
从上面的构造函数中可以看到,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也是获取头结点后面的一个元素。count的计数值不包含这个头结点。
Put源码分析
publicclassLinkedBlockingQueueextendsAbstractQueue implementsBlockingQueue ,java.io.Serializable{ /** *将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。 */ publicvoidput(Ee)throwsInterruptedException{ if(e==null)thrownewNullPointerException(); //如果你纠结这里为什么是-1,可以看看offer方法。这就是个标识成功、失败的标志而已。 intc=-1; //包装成node节点 Node node=newNode (e); finalReentrantLockputLock=this.putLock; finalAtomicIntegercount=this.count; //获取锁定 putLock.lockInterruptibly(); try{ /**如果队列满,等待notFull的条件满足。*/ while(count.get()==capacity){ notFull.await(); } //入队 enqueue(node); //原子性自增 c=count.getAndIncrement(); //如果这个元素入队后,还有至少一个槽可以使用,调用notFull.signal()唤醒等待线程。 //哪些线程会等待在notFull这个Condition上呢? if(c+1 node){ //assertputLock.isHeldByCurrentThread(); //assertlast.next==null; //入队的代码非常简单,就是将last属性指向这个新元素,并且让原队尾的next指向这个元素 //last.next=node; //last=node; //这里入队没有并发问题,因为只有获取到putLock独占锁以后,才可以进行此操作 last=last.next=node; } /** *等待PUT信号 *仅在take/poll中调用 *也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读 */ privatevoidsignalNotFull(){ finalReentrantLockputLock=this.putLock; putLock.lock(); try{ notFull.signal();//唤醒 }finally{ putLock.unlock(); } } }
Take源码分析
publicclassLinkedBlockingQueueextendsAbstractQueue implementsBlockingQueue ,java.io.Serializable{ publicEtake()throwsInterruptedException{ Ex; intc=-1; finalAtomicIntegercount=this.count; finalReentrantLocktakeLock=this.takeLock; //首先,需要获取到takeLock才能进行出队操作 takeLock.lockInterruptibly(); try{ //如果队列为空,等待notEmpty这个条件满足再继续执行 while(count.get()==0){ notEmpty.await(); } ////出队 x=dequeue(); //count进行原子减1 c=count.getAndDecrement(); //如果这次出队后,队列中至少还有一个元素,那么调用notEmpty.signal()唤醒其他的读线程 if(c>1) notEmpty.signal(); }finally{ takeLock.unlock(); } if(c==capacity) signalNotFull(); returnx; } /** *出队 */ privateEdequeue(){ //asserttakeLock.isHeldByCurrentThread(); //asserthead.item==null; Node h=head; Node first=h.next; h.next=h;//helpGC head=first; Ex=first.item; first.item=null; returnx; } /** *Signalsawaitingput.Calledonlyfromtake/poll. */ privatevoidsignalNotFull(){ finalReentrantLockputLock=this.putLock; putLock.lock(); try{ notFull.signal(); }finally{ putLock.unlock(); } } }
与ArrayBlockingQueue对比
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。
LinkedBlockingQueue实现一个线程添加文件对象,四个线程读取文件对象
packageconcurrent; importjava.io.File; importjava.io.FileFilter; importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; importjava.util.concurrent.LinkedBlockingQueue; importjava.util.concurrent.atomic.AtomicInteger; publicclassTestBlockingQueue{ staticlongrandomTime(){ return(long)(Math.random()*1000); } publicstaticvoidmain(String[]args){ //能容纳100个文件 finalBlockingQueuequeue=newLinkedBlockingQueue (100); //线程池 finalExecutorServiceexec=Executors.newFixedThreadPool(5); finalFileroot=newFile("F:\\JavaLib"); //完成标志 finalFileexitFile=newFile(""); //读个数 finalAtomicIntegerrc=newAtomicInteger(); //写个数 finalAtomicIntegerwc=newAtomicInteger(); //读线程 Runnableread=newRunnable(){ publicvoidrun(){ scanFile(root); scanFile(exitFile); } publicvoidscanFile(Filefile){ if(file.isDirectory()){ File[]files=file.listFiles(newFileFilter(){ publicbooleanaccept(Filepathname){ returnpathname.isDirectory() ||pathname.getPath().endsWith(".java"); } }); for(Fileone:files) scanFile(one); }else{ try{ intindex=rc.incrementAndGet(); System.out.println("Read0:"+index+"" +file.getPath()); queue.put(file); }catch(InterruptedExceptione){ } } } }; exec.submit(read); //四个写线程 for(intindex=0;index<4;index++){ //writethread finalintNO=index; Runnablewrite=newRunnable(){ StringthreadName="Write"+NO; publicvoidrun(){ while(true){ try{ Thread.sleep(randomTime()); intindex=wc.incrementAndGet(); Filefile=queue.take(); //队列已经无对象 if(file==exitFile){ //再次添加"标志",以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName+":"+index+"" +file.getPath()); }catch(InterruptedExceptione){ } } } }; exec.submit(write); } exec.shutdown(); } }
总结
到此这篇关于Java高并发BlockingQueue重要实现类的文章就介绍到这了,更多相关Java高并发BlockingQueue实现类内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。