python队列queue模块详解
队列queue多应用在多线程应用中,多线程访问共享变量。对于多线程而言,访问共享变量时,队列queue是线程安全的。从queue队列的具体实现中,可以看出queue使用了1个线程互斥锁(pthread.Lock()),以及3个条件标量(pthread.condition()),来保证了线程安全。
queue队列的互斥锁和条件变量,可以参考另一篇文章:python线程中同步锁
queue的用法如下:
importQueque a=[1,2,3] device_que=Queque.queue() device_que.put(a) device=device_que.get()
先看看它的初始化函数__init__(self,maxsize=0):
def__init__(self,maxsize=0): self.maxsize=maxsize self._init(maxsize) #mutexmustbeheldwheneverthequeueismutating.Allmethods #thatacquiremutexmustreleaseitbeforereturning.mutex #issharedbetweenthethreeconditions,soacquiringand #releasingtheconditionsalsoacquiresandreleasesmutex. self.mutex=_threading.Lock() #Notifynot_emptywheneveranitemisaddedtothequeue;a #threadwaitingtogetisnotifiedthen. self.not_empty=_threading.Condition(self.mutex) #Notifynot_fullwheneveranitemisremovedfromthequeue; #athreadwaitingtoputisnotifiedthen. self.not_full=_threading.Condition(self.mutex) #Notifyall_tasks_donewheneverthenumberofunfinishedtasks #dropstozero;threadwaitingtojoin()isnotifiedtoresume self.all_tasks_done=_threading.Condition(self.mutex) self.unfinished_tasks=0
定义队列时有一个默认的参数maxsize,如果不指定队列的长度,即manxsize=0,那么队列的长度为无限长,如果定义了大于0的值,那么队列的长度就是maxsize。
self._init(maxsize):使用了python自带的双端队列deque,来存储元素。
self.mutex互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。共有两种操作require获取锁,release释放锁。同时该互斥锁被三个共享变量同时享有,即操作conditiond时的require和release操作也就是操作了该互斥锁。
self.not_full条件变量:当队列中有元素添加后,会通知notify其他等待添加元素的线程,唤醒等待require互斥锁,或者有线程从队列中取出一个元素后,通知其它线程唤醒以等待require互斥锁。
self.notempty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,唤醒等待require互斥锁后,读取队列。
self.all_tasks_done条件变量:消费者线程从队列中get到任务后,任务处理完成,当所有的队列中的任务处理完成后,会使调用queue.join()的线程返回,表示队列中任务以处理完毕。
queue.put(self,item,block=True,timeout=None)函数:
申请获得互斥锁,获得后,如果队列未满,则向队列中添加数据,并通知notify其它阻塞的某个线程,唤醒等待获取require互斥锁。如果队列已满,则会wait等待。最后处理完成后释放互斥锁。其中还有阻塞block以及非阻塞,超时等逻辑,可以自己看一下:
defput(self,item,block=True,timeout=None): """Putanitemintothequeue. Ifoptionalargs'block'istrueand'timeout'isNone(thedefault), blockifnecessaryuntilafreeslotisavailable.If'timeout'is anon-negativenumber,itblocksatmost'timeout'secondsandraises theFullexceptionifnofreeslotwasavailablewithinthattime. Otherwise('block'isfalse),putanitemonthequeueifafreeslot isimmediatelyavailable,elseraisetheFullexception('timeout' isignoredinthatcase). """ self.not_full.acquire() try: ifself.maxsize>0: ifnotblock: ifself._qsize()==self.maxsize: raiseFull eliftimeoutisNone: whileself._qsize()==self.maxsize: self.not_full.wait() eliftimeout<0: raiseValueError("'timeout'mustbeanon-negativenumber") else: endtime=_time()+timeout whileself._qsize()==self.maxsize: remaining=endtime-_time() ifremaining<=0.0: raiseFull self.not_full.wait(remaining) self._put(item) self.unfinished_tasks+=1 self.not_empty.notify() finally: self.not_full.release()
queue.get(self,block=True,timeout=None)函数:
从队列中获取任务,并且从队列中移除此任务。首先尝试获取互斥锁,获取成功则队列中get任务,如果此时队列为空,则wait等待生产者线程添加数据。get到任务后,会调用self.not_full.notify()通知生产者线程,队列可以添加元素了。最后释放互斥锁。
defget(self,block=True,timeout=None): """Removeandreturnanitemfromthequeue. Ifoptionalargs'block'istrueand'timeout'isNone(thedefault), blockifnecessaryuntilanitemisavailable.If'timeout'is anon-negativenumber,itblocksatmost'timeout'secondsandraises theEmptyexceptionifnoitemwasavailablewithinthattime. Otherwise('block'isfalse),returnanitemifoneisimmediately available,elseraisetheEmptyexception('timeout'isignored inthatcase). """ self.not_empty.acquire() try: ifnotblock: ifnotself._qsize(): raiseEmpty eliftimeoutisNone: whilenotself._qsize(): self.not_empty.wait() eliftimeout<0: raiseValueError("'timeout'mustbeanon-negativenumber") else: endtime=_time()+timeout whilenotself._qsize(): remaining=endtime-_time() ifremaining<=0.0: raiseEmpty self.not_empty.wait(remaining) item=self._get() self.not_full.notify() returnitem finally: self.not_empty.release()
queue.put_nowait():无阻塞的向队列中添加任务,当队列为满时,不等待,而是直接抛出full异常,重点是理解block=False:
defput_nowait(self,item): """Putanitemintothequeuewithoutblocking. Onlyenqueuetheitemifafreeslotisimmediatelyavailable. OtherwiseraisetheFullexception. """ returnself.put(item,False)
queue.get_nowait():无阻塞的向队列中get任务,当队列为空时,不等待,而是直接抛出empty异常,重点是理解block=False:
defget_nowait(self): """Removeandreturnanitemfromthequeuewithoutblocking. Onlygetanitemifoneisimmediatelyavailable.Otherwise raisetheEmptyexception. """ returnself.get(False)
queue.qsizeemptyfull分别获取队列的长度,是否为空,是否已满等:
defqsize(self): """Returntheapproximatesizeofthequeue(notreliable!).""" self.mutex.acquire() n=self._qsize() self.mutex.release() returnn defempty(self): """ReturnTrueifthequeueisempty,Falseotherwise(notreliable!).""" self.mutex.acquire() n=notself._qsize() self.mutex.release() returnn deffull(self): """ReturnTrueifthequeueisfull,Falseotherwise(notreliable!).""" self.mutex.acquire() n=0queue.join()阻塞等待队列中任务全部处理完毕,需要配合queue.task_done使用:
deftask_done(self): """Indicatethataformerlyenqueuedtaskiscomplete. UsedbyQueueconsumerthreads.Foreachget()usedtofetchatask, asubsequentcalltotask_done()tellsthequeuethattheprocessing onthetaskiscomplete. Ifajoin()iscurrentlyblocking,itwillresumewhenallitems havebeenprocessed(meaningthatatask_done()callwasreceived foreveryitemthathadbeenput()intothequeue). RaisesaValueErrorifcalledmoretimesthantherewereitems placedinthequeue. """ self.all_tasks_done.acquire() try: unfinished=self.unfinished_tasks-1 ifunfinished<=0: ifunfinished<0: raiseValueError('task_done()calledtoomanytimes') self.all_tasks_done.notify_all() self.unfinished_tasks=unfinished finally: self.all_tasks_done.release() defjoin(self): """BlocksuntilallitemsintheQueuehavebeengottenandprocessed. Thecountofunfinishedtasksgoesupwheneveranitemisaddedtothe queue.Thecountgoesdownwheneveraconsumerthreadcallstask_done() toindicatetheitemwasretrievedandallworkonitiscomplete. Whenthecountofunfinishedtasksdropstozero,join()unblocks. """ self.all_tasks_done.acquire() try: whileself.unfinished_tasks: self.all_tasks_done.wait() finally: self.all_tasks_done.release()Queue模块除了queue线性安全队列(先进先出),还有优先级队列LifoQueue(后进先出),也就是新添加的先被get到。PriorityQueue具有优先级的队列,即队列中的元素是一个元祖类型,(优先级级别,数据)。
classPriorityQueue(Queue): '''''VariantofQueuethatretrievesopenentriesinpriorityorder(lowestfirst). Entriesaretypicallytuplesoftheform:(prioritynumber,data). ''' def_init(self,maxsize): self.queue=[] def_qsize(self,len=len): returnlen(self.queue) def_put(self,item,heappush=heapq.heappush): heappush(self.queue,item) def_get(self,heappop=heapq.heappop): returnheappop(self.queue) classLifoQueue(Queue): '''''VariantofQueuethatretrievesmostrecentlyaddedentriesfirst.''' def_init(self,maxsize): self.queue=[] def_qsize(self,len=len): returnlen(self.queue) def_put(self,item): self.queue.append(item) def_get(self): returnself.queue.pop()至此queue模块介绍完毕,重点是理解互斥锁,条件变量如果协同工作,保证队列的线程安全。
下面是queue的完全代码:
classQueue: """Createaqueueobjectwithagivenmaximumsize. Ifmaxsizeis<=0,thequeuesizeisinfinite. """ def__init__(self,maxsize=0): self.maxsize=maxsize self._init(maxsize) #mutexmustbeheldwheneverthequeueismutating.Allmethods #thatacquiremutexmustreleaseitbeforereturning.mutex #issharedbetweenthethreeconditions,soacquiringand #releasingtheconditionsalsoacquiresandreleasesmutex. self.mutex=_threading.Lock() #Notifynot_emptywheneveranitemisaddedtothequeue;a #threadwaitingtogetisnotifiedthen. self.not_empty=_threading.Condition(self.mutex) #Notifynot_fullwheneveranitemisremovedfromthequeue; #athreadwaitingtoputisnotifiedthen. self.not_full=_threading.Condition(self.mutex) #Notifyall_tasks_donewheneverthenumberofunfinishedtasks #dropstozero;threadwaitingtojoin()isnotifiedtoresume self.all_tasks_done=_threading.Condition(self.mutex) self.unfinished_tasks=0 deftask_done(self): """Indicatethataformerlyenqueuedtaskiscomplete. UsedbyQueueconsumerthreads.Foreachget()usedtofetchatask, asubsequentcalltotask_done()tellsthequeuethattheprocessing onthetaskiscomplete. Ifajoin()iscurrentlyblocking,itwillresumewhenallitems havebeenprocessed(meaningthatatask_done()callwasreceived foreveryitemthathadbeenput()intothequeue). RaisesaValueErrorifcalledmoretimesthantherewereitems placedinthequeue. """ self.all_tasks_done.acquire() try: unfinished=self.unfinished_tasks-1 ifunfinished<=0: ifunfinished<0: raiseValueError('task_done()calledtoomanytimes') self.all_tasks_done.notify_all() self.unfinished_tasks=unfinished finally: self.all_tasks_done.release() defjoin(self): """BlocksuntilallitemsintheQueuehavebeengottenandprocessed. Thecountofunfinishedtasksgoesupwheneveranitemisaddedtothe queue.Thecountgoesdownwheneveraconsumerthreadcallstask_done() toindicatetheitemwasretrievedandallworkonitiscomplete. Whenthecountofunfinishedtasksdropstozero,join()unblocks. """ self.all_tasks_done.acquire() try: whileself.unfinished_tasks: self.all_tasks_done.wait() finally: self.all_tasks_done.release() defqsize(self): """Returntheapproximatesizeofthequeue(notreliable!).""" self.mutex.acquire() n=self._qsize() self.mutex.release() returnn defempty(self): """ReturnTrueifthequeueisempty,Falseotherwise(notreliable!).""" self.mutex.acquire() n=notself._qsize() self.mutex.release() returnn deffull(self): """ReturnTrueifthequeueisfull,Falseotherwise(notreliable!).""" self.mutex.acquire() n=00: ifnotblock: ifself._qsize()==self.maxsize: raiseFull eliftimeoutisNone: whileself._qsize()==self.maxsize: self.not_full.wait() eliftimeout<0: raiseValueError("'timeout'mustbeanon-negativenumber") else: endtime=_time()+timeout whileself._qsize()==self.maxsize: remaining=endtime-_time() ifremaining<=0.0: raiseFull self.not_full.wait(remaining) self._put(item) self.unfinished_tasks+=1 self.not_empty.notify() finally: self.not_full.release() defput_nowait(self,item): """Putanitemintothequeuewithoutblocking. Onlyenqueuetheitemifafreeslotisimmediatelyavailable. OtherwiseraisetheFullexception. """ returnself.put(item,False) defget(self,block=True,timeout=None): """Removeandreturnanitemfromthequeue. Ifoptionalargs'block'istrueand'timeout'isNone(thedefault), blockifnecessaryuntilanitemisavailable.If'timeout'is anon-negativenumber,itblocksatmost'timeout'secondsandraises theEmptyexceptionifnoitemwasavailablewithinthattime. Otherwise('block'isfalse),returnanitemifoneisimmediately available,elseraisetheEmptyexception('timeout'isignored inthatcase). """ self.not_empty.acquire() try: ifnotblock: ifnotself._qsize(): raiseEmpty eliftimeoutisNone: whilenotself._qsize(): self.not_empty.wait() eliftimeout<0: raiseValueError("'timeout'mustbeanon-negativenumber") else: endtime=_time()+timeout whilenotself._qsize(): remaining=endtime-_time() ifremaining<=0.0: raiseEmpty self.not_empty.wait(remaining) item=self._get() self.not_full.notify() returnitem finally: self.not_empty.release() defget_nowait(self): """Removeandreturnanitemfromthequeuewithoutblocking. Onlygetanitemifoneisimmediatelyavailable.Otherwise raisetheEmptyexception. """ returnself.get(False) #Overridethesemethodstoimplementotherqueueorganizations #(e.g.stackorpriorityqueue). #Thesewillonlybecalledwithappropriatelocksheld #Initializethequeuerepresentation def_init(self,maxsize): self.queue=deque() def_qsize(self,len=len): returnlen(self.queue) #Putanewiteminthequeue def_put(self,item): self.queue.append(item) #Getanitemfromthequeue def_get(self): returnself.queue.popleft() 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。