详细分析Java并发集合LinkedBlockingQueue的用法
在上一章我们讲解了ArrayBlockingQueue,用数组形式实现的阻塞队列。
数组的长度在创建时就必须确定,如果数组长度小了,那么ArrayBlockingQueue队列很容易就被阻塞,如果数组长度大了,就容易浪费内存。
而队列这个数据结构天然适合用链表这个形式,而LinkedBlockingQueue就是使用链表方式实现的阻塞队列。
一.链表实现
1.1Node内部类
/** *链表的节点,同时也是通过它来实现一个单向链表 */ staticclassNode{ Eitem; //指向链表的下一个节点 Node next; Node(Ex){item=x;} }
有一个变量e储存数据,有一个变量next指向下一个节点的引用。它可以实现最简单地单向列表。
1.2怎样实现链表
/** *它的next指向队列头,这个节点不存储数据 */ transientNodehead; /** *队列尾节点 */ privatetransientNode last;
要实现链表,必须有两个变量,一个表示链表头head,一个表示链表尾last。head和last都会在LinkedBlockingQueue对象创建的时候被初始化。
last=head=newNode(null);
注意这里用了一个小技巧,链表头head节点并没有存放数据,它指向的下一个节点,才真正存储了链表中第一个数据。而链表尾last的确储存了链表最后一个数据。
1.3插入和删除节点
/** *向队列尾插入节点 */ privatevoidenqueue(Nodenode){ //assertputLock.isHeldByCurrentThread();//当前线程肯定获取了putLock锁 //将原队列尾节点的next引用指向新节点node,然后再将node节点设置成队列尾节点last //这样就实现了向队列尾插入节点 last=last.next=node; }
在链表尾插入节点很简单,将原队列尾last的下一个节点next指向新节点node,再将新节点node赋值给队列尾last节点。这样就实现了插入一个新节点。
//移除队列头节点,并返回被删除的节点数据
privateEdequeue(){
//asserttakeLock.isHeldByCurrentThread();//当前线程肯定获取了takeLock锁
//asserthead.item==null;
Nodeh=head;
//first节点中才存储了队列中第一个元素的数据
Nodefirst=h.next;
h.next=h;//helpGC
//设置新的head值,相当于删除了first节点。因为head节点本身不储存数据
head=first;
//队列头的数据
Ex=first.item;
//移除原先的数据
first.item=null;
returnx;
}
要注意head并不是链表头,它的next才是指向链表头,所以删除链表头也很简单,就是将head.next赋值给head,然后返回原先head.next节点的数据。
删除的时候,就要注意链表为空的情况。head.next的值使用enqueue方法添加的。当head.next==last的时候,表示已经删除到最后一个元素了,当head.next==null的时候,就不能删除了,因为链表已经为空了。这里没有做判断,是因为在调用dequeue方法的地方已经做过判断了。
二.同步锁ReentrantLock和条件Condition
因为阻塞队列在队列为空和队列已满的情况下,都必须阻塞等待,那么就天然需要两个条件。而为了保证多线程并发安全,又需要一个同步锁。这个在ArrayBlockingQueue中已经说过了。
这里我们来说说LinkedBlockingQueue不一样的地方。
/**独占锁,用于处理插入队列操作的并发问题,即put与offer操作*/ privatefinalReentrantLockputLock=newReentrantLock(); /**队列不满的条件Condition,它是由putLock锁生成的*/ privatefinalConditionnotFull=putLock.newCondition(); /**独占锁,用于处理删除队列头操作的并发问题,即take与poll操作*/ privatefinalReentrantLocktakeLock=newReentrantLock(); /**队列不为空的条件Condition,它使用takeLock锁生成的*/ privatefinalConditionnotEmpty=takeLock.newCondition();
2.1putLock与takeLock
我们发现使用了两把锁:
- putLock同步所有插入元素的操作,即put与offer系列方法的操作。
- takeLock同步删除和获取元素的操作,即take与poll系列方法操作。
按照上面的说法,可能会出现同时插入元素和删除元素的操作,那么就不会出现问题么?
我们来具体分析一个下,对于队列来说操作分为三种:
- 在队列尾插入元素。即put与offer系列方法,它们会涉及两个变量,队列中元素个数count,和队列尾节点last。(不会涉及head节点)
- 移除队列头元素。即take与poll系列方法,它们会涉及两个变量,队列中元素个数count,和head节点。(不会涉及队列尾节点last)
- 查看队列头元素。即element()与peek()方法,它们会涉及两个变量,队列中元素个数count,和head节点。(不会涉及队列尾节点last)
因此使用putLock锁来保持last变量的安全,使用takeLock锁来保持head变量的安全。
对于都涉及了队列中元素个数count变量,所以使用AtomicInteger来保证并发安全问题。
/**队列中元素的个数,这里使用AtomicInteger变量,保证并发安全问题*/ privatefinalAtomicIntegercount=newAtomicInteger();
2.2notFull与notEmpty
- notFull是由putLock锁生成的,因为当插入元素时,我们要判断队列是不是已满。
- notEmpty是由takeLock锁生成的,因为当删除元素时,我们要判断队列是不是为空。
2.3控制流程
当插入元素时:
- 先使用putLock.lockInterruptibly()保证只有一个线程进行插入操作.
- 然后利用count变量,判断队列是否已满.
- 满了就调用notFull.await()方法,让当前线程等待。因为notFull是由putLock产生的,这里已经获取到锁了,所以可以调用await方法。
- 没满就调用enqueue方法,向队列尾插入新元素。
- 调用count.getAndIncrement()方法,将队列中元素个数加一,并保证多线程并发安全。
- 调用signalNotEmpty方法,唤醒正在等待获取元素的线程。
当删除元素时:
- 先使用takeLock.lockInterruptibly()保证只有一个线程进行删除操作.
- 然后利用count变量,判断队列是否为空.
- 队列为空就调用notEmpty.await()方法,让当前线程等待。因为notEmpty是由takeLock产生的,这里已经获取到锁了,所以可以调用await方法。
- 没满就调用dequeue方法,删除队列头元素。
- 调用count.getAndDecrement()方法,将队列中元素个数减一,并保证多线程并发安全。
- 调用signalNotFull方法,唤醒正在等待插入元素的线程。
还要注意一下,Condition的signal和await方法必须在获取锁的情况下调用。因此就有了signalNotEmpty和signalNotFull方法:
/**
*唤醒在notEmpty条件下等待的线程,即移除队列头时,发现队列为空而被迫等待的线程。
*注意,因为要调用Condition的signal方法,必须获取对应的锁,所以这里调用了takeLock.lock()方法。
*当队列中插入元素(即put或offer操作),那么队列肯定不为空,就会调用这个方法。
*/
privatevoidsignalNotEmpty(){
finalReentrantLocktakeLock=this.takeLock;
takeLock.lock();
try{
notEmpty.signal();
}finally{
takeLock.unlock();
}
}
/**
*唤醒在notFull条件下等待的线程,即队列尾添加元素时,发现队列已满而被迫等待的线程。
*注意,因为要调用Condition的signal方法,必须获取对应的锁,所以这里调用了putLock.lock()方法
*当队列中删除元素(即take或poll操作),那么队列肯定不满,就会调用这个方法。
*/
privatevoidsignalNotFull(){
finalReentrantLockputLock=this.putLock;
putLock.lock();
try{
notFull.signal();
}finally{
putLock.unlock();
}
}
三.插入元素方法
publicvoidput(Ee)throwsInterruptedException{
if(e==null)thrownewNullPointerException();
//记录插入操作前元素的个数
intc=-1;
//创建新节点node
Nodenode=newNode(e);
finalReentrantLockputLock=this.putLock;
finalAtomicIntegercount=this.count;
putLock.lockInterruptibly();
try{
//表示队列已满,那么就要调用notFull.await方法,让当前线程等待
while(count.get()==capacity){
notFull.await();
}
//向队列尾插入新元素
enqueue(node);
//将当前队列元素个数加1,并返回加1之前的元素个数。
c=count.getAndIncrement();
//c+1
以put方法为例,大体流程与我们前面介绍一样,这里有一个非常怪异的代码,当插入完元素时,如果发现队列未满,那么调用notFull.signal()唤醒等待插入的线程。
大家就很疑惑了,一般来说,这个方法应该放在删除元素(take系列的方法里),因为当我们删除一个元素,那么队列肯定是未满的,那么调用notFull.signal()方法,唤醒等待插入的线程。
这里这么做主要是因为调用signal方法,必须先获取对应的锁,而在take系列的方法里使用的锁是takeLock,那么想调用notFull.signal()方法,必须先获取putLock锁,这样的话会性能就会下降,所以用了另一种方式。
- 首先我们应该知道signal方法,当有线程在这个条件下等待时,才会唤醒其中一个线程,当没有线程等待时,这个方法相当于什么都没做。所以这个方法的意义是可能会唤醒等待的一个线程。
- 当队列未满时,我们都调用notFull.signal()尝试去唤醒一个等待插入线程。而且这里已经获取putLock锁了,所以不耗时。
- 但是有一个问题,当队列已满的时候,所有插入操作的线程,都会等待,就没有机会调用notFull.signal()方法,那么唤醒这些等待线程呢?
- 唤醒这些线程的启动条件,必须是由删除元素操作触发的,因为只有删除队列才会不满。因为在take方法中if(c==capacity)signalNotFull();
四.删除队列头元素
publicEtake()throwsInterruptedException{
Ex;
intc=-1;
finalAtomicIntegercount=this.count;
finalReentrantLocktakeLock=this.takeLock;
takeLock.lockInterruptibly();
try{
//表示队列为空,那么就要调用notEmpty.await方法,让当前线程等待
while(count.get()==0){
notEmpty.await();
}
//删除队列头元素,并返回它
x=dequeue();
//返回当前队列个数,然后将队列个数减一
c=count.getAndDecrement();
//c>1表示队列不为空,就唤醒可能等待删除操作的线程
if(c>1)
notEmpty.signal();
}finally{
takeLock.unlock();
}
/**
*c==capacity表示删除操作之前,队列是满的。只有从满队列中删除一个元素时,
*才唤醒等待插入的线程
*防止频繁获取putLock锁,消耗性能
*/
if(c==capacity)
signalNotFull();
returnx;
}
为什么调用notEmpty.signal()方法原因,对比一下我们在插入元素方法中的解释。
五.查看队列头元素
//查看队列头元素
publicEpeek(){
//队列为空,返回null
if(count.get()==0)
returnnull;
finalReentrantLocktakeLock=this.takeLock;
takeLock.lock();
try{
//获取队列头节点first
Nodefirst=head.next;
//first==null表示队列为空,返回null
if(first==null)
returnnull;
else
//返回队列头元素
returnfirst.item;
}finally{
takeLock.unlock();
}
}
查看队列头元素,涉及到head节点,所以必须使用takeLock锁。
六.其他重要方法
6.1remove(Objecto)方法
//从队列中删除指定元素o
publicbooleanremove(Objecto){
if(o==null)returnfalse;
//因为不是删除列表头元素,所以就涉及到head和last两个变量,
//putLock与takeLock都要加锁
fullyLock();
try{
//遍历整个队列,p表示当前节点,trail表示当前节点的前一个节点
//因为是单向链表,所以需要记录两个节点
for(Nodetrail=head,p=trail.next;
p!=null;
trail=p,p=p.next){
//如果找到了指定元素,那么删除节点p
if(o.equals(p.item)){
unlink(p,trail);
returntrue;
}
}
returnfalse;
}finally{
fullyUnlock();
}
}
从列表中删除指定元素,因为删除的元素不一定在列表头,所以可能会head和last两个变量,所以必须同时使用putLock与takeLock两把锁。因为是单向链表,需要一个辅助变量trail来记录前一个节点,这样才能删除当前节点p。
6.2unlink(Nodep,Nodetrail)方法
//删除当前节点p,trail代表p的前一个节点
voidunlink(Nodep,Nodetrail){
//将当前节点的数据设置为null
p.item=null;
//这样就在链表中删除了节点p
trail.next=p.next;
//如果节点p是队列尾last,而它被删除了,那么就要将trail设置为last
if(last==p)
last=trail;
//将元素个数减一,如果原队列是满的,那么就调用notFull.signal()方法
//其实这个不用判断直接调用的,因为这里肯定获取了putLock锁
if(count.getAndDecrement()==capacity)
notFull.signal();
}
要在链表中删除一个节点p,只需要将p的前一个节点trail的next指向节点p的下一个节点next。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。