浅谈python 线程池threadpool之实现
首先介绍一下自己使用到的名词:
工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;
任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过 makeRequests来创建
任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;
任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体 的 处理任务,并返回处理结果;
任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);
任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;
任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;
上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:
(1)线程池的创建
(2)工作线程的启动
(3)任务的创建
(4)任务的推送到线程池
(5)线程处理任务
(6)任务结束处理
(7)工作线程的退出
下面是threadpool的定义:
classThreadPool: """Athreadpool,distributingworkrequestsandcollectingresults. Seethemoduledocstringformoreinformation. """ def__init__(self,num_workers,q_size=0,resq_size=0,poll_timeout=5): pass defcreateWorkers(self,num_workers,poll_timeout=5): pass defdismissWorkers(self,num_workers,do_join=False): pass defjoinAllDismissedWorkers(self): pass defputRequest(self,request,block=True,timeout=None): pass defpoll(self,block=False): pass defwait(self): pass
1、线程池的创建(ThreadPool(args))
task_pool=threadpool.ThreadPool(num_works)
task_pool=threadpool.ThreadPool(num_works) def__init__(self,num_workers,q_size=0,resq_size=0,poll_timeout=5): """Setupthethreadpoolandstartnum_workersworkerthreads. ``num_workers``isthenumberofworkerthreadstostartinitially. If``q_size>0``thesizeofthework*requestqueue*islimitedand thethreadpoolblockswhenthequeueisfullandittriestoput moreworkrequestsinit(see``putRequest``method),unlessyoualso useapositive``timeout``valuefor``putRequest``. If``resq_size>0``thesizeofthe*resultsqueue*islimitedandthe workerthreadswillblockwhenthequeueisfullandtheytrytoput newresultsinit. ..warning: Ifyousetboth``q_size``and``resq_size``to``!=0``thereis thepossibiltyofadeadlock,whentheresultsqueueisnotpulled regularlyandtoomanyjobsareputintheworkrequestsqueue. Topreventthis,alwaysset``timeout>0``whencalling ``ThreadPool.putRequest()``andcatch``Queue.Full``exceptions. """ self._requests_queue=Queue.Queue(q_size)#任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中 self._results_queue=Queue.Queue(resq_size)#字典,任务对应的任务执行结果 self.workers=[]#工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中 self.dismissedWorkers=[]#被设置线程事件并且没有被join的工作线程 self.workRequests={}#字典,记录任务被分配到哪个工作线程中 self.createWorkers(num_workers,poll_timeout)
其中,初始化参数为:
num_works:线程池中线程个数
q_size:任务队列的长度限制,如果限制了队列的长度,那么当调用putRequest()添加任务时,到达限制长度后,那么putRequest将会不断尝试添加任务,除非在putRequest()设置了超时或者阻塞;
esq_size: 任务结果队列的长度;
pool_timeout:工作线程如果从request队列中,读取不到request,则会阻塞pool_timeout,如果仍没request则直接返回;
其中,成员变量:
self._requests_queue: 任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中;
self._results_queue: 字典,任务对应的任务执行
self.workers: 工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中;
self.dismisssedWorkers: 被设置线程事件,并且没有被join的工作线程
self.workRequests: 字典,记录推送到线程池的任务,结构为requestID:request。其中requestID是任务的唯一标识,会在后面作介绍。
2、工作线程的启动(self.createWorks(args))
函数定义:
defcreateWorkers(self,num_workers,poll_timeout=5): """Addnum_workersworkerthreadstothepool. ``poll_timout``setstheintervalinseconds(intorfloat)forhow oftethreadsshouldcheckwhethertheyaredismissed,whilewaitingfor requests. """ foriinrange(num_workers): self.workers.append(WorkerThread(self._requests_queue, self._results_queue,poll_timeout=poll_timeout))
其中WorkerThread()继承自thread,即python内置的线程类,将创建的WorkerThread对象放入到self.workers队列中。下面看一下WorkerThread类的定义:
从self.__init__(args)可看出:
classWorkerThread(threading.Thread): """Backgroundthreadconnectedtotherequests/resultsqueues. Aworkerthreadsitsinthebackgroundandpicksupworkrequestsfrom onequeueandputstheresultsinanotheruntilitisdismissed. """ def__init__(self,requests_queue,results_queue,poll_timeout=5,**kwds): """Setupthreadindaemonicmodeandstartitimmediatedly. ``requests_queue``and``results_queue``areinstancesof ``Queue.Queue``passedbythe``ThreadPool``classwhenitcreatesa newworkerthread. """ threading.Thread.__init__(self,**kwds) self.setDaemon(1)# self._requests_queue=requests_queue#任务队列 self._results_queue=results_queue#任务结果队列 self._poll_timeout=poll_timeout#run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true); self._dismissed=threading.Event()#线程事件,如果set线程事件则run会执行break,直接退出工作线程; self.start() defrun(self): """Repeatedlyprocessthejobqueueuntiltoldtoexit.""" whileTrue: ifself._dismissed.isSet():#如果设置了self._dismissed则退出工作线程 #wearedismissed,breakoutofloop break #getnextworkrequest.Ifwedon'tgetanewrequestfromthe #queueafterself._poll_timoutseconds,wejumptothestartof #thewhileloopagain,togivethethreadachancetoexit. try: request=self._requests_queue.get(True,self._poll_timeout) exceptQueue.Empty:#尝从任务队列self._requests_queue中get任务,如果队列为空,则continue continue else: ifself._dismissed.isSet():#检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程 #wearedismissed,putbackrequestinqueueandexitloop self._requests_queue.put(request) break try:"""Setsaflagtotellthethreadtoexitwhendonewithcurrentjob. """ self._dismissed.set()
初始化中变量:
self._request_queue:任务队列;
self._resutls_queuqe,:任务结果队列;
self._pool_timeout:run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true);
self._dismissed:线程事件,如果set线程事件则run会执行break,直接退出工作线程;
最后调用self.start()启动线程,run函数定义见上面:
从上面run函数while执行步骤如下:
(1)如果设置了self._dismissed则退出工作线程,否则执行第2步
(2)尝从任务队列self._requests_queue中get任务,如果队列为空,则continue执行下一次while循环,否则执行第3步
(3)检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程。如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中,如果任务处理函数出现异常,则将异常压入到队列中。最后跳转第4步
(4)继续循环,返回1
到此工作线程创建完毕,根据设置的线程池线程数量,创建工作线程,工作线程从任务队列中get任务,进行任务处理,并将任务处理结果压入到任务结果队列中。
3、任务的创建(makeRequests)
任务的创建函数为threadpool.makeRequests(callable_,args_list,callback=None):
#utilityfunctions defmakeRequests(callable_,args_list,callback=None, exc_callback=_handle_thread_exception): """Createseveralworkrequestsforsamecallablewithdifferentarguments. Conveniencefunctionforcreatingseveralworkrequestsforthesame callablewhereeachinvocationofthecallablereceivesdifferentvalues foritsarguments. ``args_list``containstheparametersforeachinvocationofcallable. Eachitemin``args_list``shouldbeeithera2-itemtupleofthelistof positionalargumentsandadictionaryofkeywordargumentsorasingle, non-tupleargument. Seedocstringfor``WorkRequest``forinfoon``callback``and ``exc_callback``. """ requests=[] foriteminargs_list: ifisinstance(item,tuple): requests.append( WorkRequest(callable_,item[0],item[1],callback=callback, exc_callback=exc_callback) ) else: requests.append( WorkRequest(callable_,[item],None,callback=callback, exc_callback=exc_callback) ) returnrequests
其中创建任务的函数参数具体意义为下:
callable_:注册的任务处理函数,当任务被放到任务队列后,工作线程中获取到该任务的线程,会执行此callable_
args_list:首先args_list是列表,列表元素类型为元组,元组中有两个元素item[0],item[1],item[0]为位置参数,item[1]为字典类型关键字参数。列表中元组的个数,代表启动的任务个数,在使用的时候一般都为单个元组,即一个makerequest()创建一个任务。
callback:回调函数,在poll函数中调用(后面讲解此函数),callable_调用结束后,会就任务结果放入到任务结果队列中(self._resutls_queue),在poll函数中,当从self._resutls_queue队列中get某个结果后,会执行此callback(request,result),其中result是request任务返回的结果。
exc_callback:异常回调函数,在poll函数中,如果某个request对应有执行异常,那么会调用此异常回调。
创建完成任务后,返回创建的任务。
外层记录此任务,放入到任务列表中。
上面是创建任务的函数,下面讲解任务对象的结构:
classWorkRequest: """Arequesttoexecuteacallableforputtingintherequestqueuelater. Seethemodulefunction``makeRequests``forthecommoncase whereyouwanttobuildseveral``WorkRequest``objectsforthesame callablebutwithdifferentargumentsforeachcall. """ def__init__(self,callable_,args=None,kwds=None,requestID=None, callback=None,exc_callback=_handle_thread_exception): """Createaworkrequestforacallableandattachcallbacks. Aworkrequestconsistsoftheacallabletobeexecutedbya workerthread,alistofpositionalarguments,adictionary ofkeywordarguments. A``callback``functioncanbespecified,thatiscalledwhenthe resultsoftherequestarepickedupfromtheresultqueue.Itmust accepttwoanonymousarguments,the``WorkRequest``objectandthe resultsofthecallable,inthatorder.Ifyouwanttopassadditional informationtothecallback,juststickitontherequestobject. Youcanalsogivecustomcallbackforwhenanexceptionoccurswith the``exc_callback``keywordparameter.Itshouldalsoaccepttwo anonymousarguments,the``WorkRequest``andatuplewiththeexception detailsasreturnedby``sys.exc_info()``.Thedefaultimplementation ofthiscallbackjustprintstheexceptioninfovia ``traceback.print_exception``.Ifyouwantnoexceptionhandler callback,justpassin``None``. ``requestID``,ifgiven,mustbehashablesinceitisusedby ``ThreadPool``objecttostoretheresultsofthatworkrequestina dictionary.Itdefaultstothereturnvalueof``id(self)``. """ ifrequestIDisNone: self.requestID=id(self) else: try: self.requestID=hash(requestID) exceptTypeError: raiseTypeError("requestIDmustbehashable.") self.exception=False self.callback=callback self.exc_callback=exc_callback self.callable=callable_ self.args=argsor[] self.kwds=kwdsor{} def__str__(self): return""%\ (self.requestID,self.args,self.kwds,self.exception)
上面self.callback以及self.exc_callback,和self.callable_,args,dwds都已经讲解,就不在啰嗦了。
其中有一个任务的全局唯一标识,即self.requestID,通过获取自身内存首地址作为自己的唯一标识id(self)
self.exception初始化为False,如果执行self.callable()过程中出现异常,那么此变量会标设置为True。
至此,任务创建完毕,调用makeRequests()的上层记录有任务列表request_list.
4、任务的推送到线程池(putRequest)
上面小节中介绍了任务的创建,任务的个数可以成千上百,但是处理任务的线程数量只有我们在创建线程池的时候制定的线程数量来处理,指定的线程数量往往比任务的数量小得多,因此,每个线程必须处理多个任务。
本节介绍如何将创建的任务推送的线程池中,以让线程池由阻塞状态,获取任务,然后去处理任务。
任务的推送使用ThreadPool线程池类中的putRequest(self,request,block,timeout)来创建:
defputRequest(self,request,block=True,timeout=None): """Putworkrequestintoworkqueueandsaveitsidforlater.""" assertisinstance(request,WorkRequest) #don'treuseoldworkrequests assertnotgetattr(request,'exception',None) self._requests_queue.put(request,block,timeout) self.workRequests[request.requestID]=request
函数的主要作用就是将request任务,也就是上一小节中创建的任务,put到线程池的任务队列中(self._request_queue)。然后记录已经推送到线程池的任务,通过线程池的self.workReuests字典来存储,结构为request.requestID:request。
至此,任务创建完成,并且已经将任务推送到线程池中。
5、线程处理任务
通过上一小节,任务已经推送到了线程中。在任务没有被推送到线程池中时,线程池中的线程都处在处在阻塞状态中,即在线程的self.run()函数中,一直处于一下状态:
try: request=self._requests_queue.get(True,self._poll_timeout) exceptQueue.Empty:#尝从任务队列self._requests_queue中get任务,如果队列为空,则continue continue
现在任务已经推送到线程池中,那么get任务将会正常返回,会执行下面的步骤:
defrun(self): """Repeatedlyprocessthejobqueueuntiltoldtoexit.""" whileTrue: ifself._dismissed.isSet():#如果设置了self._dismissed则退出工作线程 #wearedismissed,breakoutofloop break #getnextworkrequest.Ifwedon'tgetanewrequestfromthe #queueafterself._poll_timoutseconds,wejumptothestartof #thewhileloopagain,togivethethreadachancetoexit. try: request=self._requests_queue.get(True,self._poll_timeout) exceptQueue.Empty:#尝从任务队列self._requests_queue中get任务,如果队列为空,则continue continue else: ifself._dismissed.isSet():#检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程 #wearedismissed,putbackrequestinqueueandexitloop self._requests_queue.put(request) break try:#如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中 result=request.callable(*request.args,**request.kwds) self._results_queue.put((request,result)) except: request.exception=True self._results_queue.put((request,sys.exc_info()))#如果任务处理函数出现异常,则将异常压入到队列中
获取任务--->调用任务的处理函数callable()处理任务--->将任务request以及任务返回的结果压入到self.results_queue队列中------>如果任务处理函数异常,那么将任务异常标识设置为True,并将任务request以及任务异常压入到self.results_queue队列中---->再次返回获取任务
如果,在while循环过程中,外部设置了线程事件,即self._dismissed.isSet为True,那么意味着此线程将会结束处理任务,那么会将get到的任务返回的任务队列中,并且退出线程。
6、任务结束处理
上面小节中,介绍了线程池不断的get任务,并且不断的处理任务。那么每个任务结束之后我们该怎么处理呢,线程池提供了wait()以及poll()函数。
当我们把任务提交个线程池之后,我们会调用wait()来等待任务处理结束,结束后wait()将会返回,返回后我们可以进行下一步操作,例如重新创建任务,将任务继续推送到线程池中,或者结束线程池。结束线程池会在下一小节介绍,这一小节主要介绍wait()和poll()操作。
先来看看wait()操作:
defwait(self): """Waitforresults,blockinguntilallhavearrived.""" while1: try: self.poll(True) exceptNoResultsPending: break
等待任务处理结束,在所有任务处理完成之前一直处于block阶段,如果self.poll()返回异常NoResultsPending异常,然后wait返回,任务处理结束。
下面看看poll函数:
defpoll(self,block=False): """Processanynewresultsinthequeue.""" whileTrue: #stillresultspending? ifnotself.workRequests: raiseNoResultsPending #aretherestillworkerstoprocessremainingrequests? elifblockandnotself.workers: raiseNoWorkersAvailable try: #getbacknextresults request,result=self._results_queue.get(block=block) #hasanexceptionoccured? ifrequest.exceptionandrequest.exc_callback: request.exc_callback(request,result) #handresultstocallback,ifany ifrequest.callbackandnot\ (request.exceptionandrequest.exc_callback): request.callback(request,result) delself.workRequests[request.requestID] exceptQueue.Empty: break
(1)首先,检测任务字典({request.requestID:request})是否为空,如果为空则抛出异常NoResultPending结束,否则到第2步;
(2)检测工作线程是否为空(如果某个线程的线程事件被设置,那么工作线程退出,并从self.workers中pop出),如果为空则抛出NoWorkerAvailable异常结束,否则进入第3步;
(3)从任务结果队列中get任务结果,如果抛出队列为空,那么break,返回,否则进入第4步;
(4)如果任务处理过程中出现异常,即设置了request.exception,并且设置了异常处理回调即request.exc_callback则执行异常回调,再回调中处理异常,返回后将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。否则进入第5步;
(5)如果设置了任务结果回调即request.callback不为空,则执行任务结果回调即request.callbacl(request,result),并
将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。
(6)重复进行上面的步骤直到抛出异常,或者任务队列为空,则poll返会;
至此抛出NoResultPendingwait操作接受此异常后,至此wait()返回。
7、工作线程的退出
threadpool提供的工作线程退出的的操作有dismissWorkers()和joinAllDismissedWorker()操作:
defdismissWorkers(self,num_workers,do_join=False): """Tellnum_workersworkerthreadstoquitaftertheircurrenttask.""" dismiss_list=[] foriinrange(min(num_workers,len(self.workers))): worker=self.workers.pop() worker.dismiss() dismiss_list.append(worker) ifdo_join: forworkerindismiss_list: worker.join() else: self.dismissedWorkers.extend(dismiss_list) defjoinAllDismissedWorkers(self): """PerformThread.join()onallworkerthreadsthathavebeendismissed. """ forworkerinself.dismissedWorkers: worker.join() self.dismissedWorkers=[]
从dismissWorkers可看出,主要工作是从self.workers工作线程中pop出指定的线程数量,并且设置此线程的线程事件,设置线程事件后,此线程self.run()函数,则会检测到此设置,并结束线程。
如果设置了在do_join,即设置了在此函数中join退出的线程,那么对退出的线程执行join操作。否则将pop出的线程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去处理join线程。
8、总结
到此为止,threadpool线程池中所有的操作介绍完毕,其实现也做了具体的介绍。从上面可看出,线程池并没有那么复杂,只有几个简单的操作,主要是了解整个处理流程即可。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。