python多线程抽象编程模型详解
最近需要完成一个多线程下载的工具,对其中的多线程下载进行了一个抽象,可以对所有需要使用到多线程编程的地方统一使用这个模型来进行编写。
主要结构:
1、基于Queue标准库实现了一个类似线程池的工具,用户指定提交任务线程submitter与工作线程worker数目,所有线程分别设置为后台运行,提供等待线程运行完成的接口。
2、所有需要完成的任务抽象成task,提供单独的无参数调用方式,供worker线程调用;task以生成器的方式作为参数提供,供submitter调用。
3、所有需要进行线程交互的信息放在context类中。
主要实现代码如下:
#Submitter线程类实现,主要是`task_generator`调用
classSubmitterThread(threading.Thread):
_DEFAULT_WAIT_TIMEOUT=2#seconds
def__init__(self,queue,task_gen,timeout=2):
super(SubmitterThread,self).__init__()
self.queue=queue
ifnotisinstance(timeout,int):
_logger.error('Threadwaittimeoutvalueerror:%s,'
'usedefaultinstead.'%timeout)
self.timeout=self._DEFAULT_WAIT_TIMEOUT
self.timeout=timeout
self.task_generator=task_gen
defrun(self):
whileTrue:
try:
task=self.task_generator.next()
self.queue.put(task,True,self.timeout)
exceptQueue.Full:
_logger.debug('Taskqueueisfull.%swait%dsecond%stimeout'%
(self.name,self.timeout,'s'if(self.timeout>1)else''))
break
except(StopIteration,ValueError)ase:
_logger.debug('Taskfinished')
break
#Worker线程实现,主要就是try块内的func调用
classWorkerThread(threading.Thread):
_DEFAULT_WAIT_TIMEOUT=2#seconds
def__init__(self,queue,timeout=2):
super(WorkerThread,self).__init__()
self.queue=queue
ifnotisinstance(timeout,int):
_logger.error('Threadwaittimeoutvalueerror:%s,'
'usedefaultinstead.'%timeout)
self.timeout=self._DEFAULT_WAIT_TIMEOUT
self.timeout=timeout
defrun(self):
whileTrue:
try:
func=self.queue.get(True,self.timeout)
exceptQueue.Empty:
_logger.debug('Taskqueueisempty.%swait%dsecond%stimeout'%
(self.name,self.timeout,'s'if(self.timeout>1)else''))
break
ifnotcallable(func):
time.sleep(1)
try:
func()
exceptExceptionase:
_logger.error('Thread%srunningoccurserror:%s'%
(self.name,e))
print('Threadrunningerror:%s'%e)
classExecutor(object):
"""
Thereallyplacetoexecuteexecutor
"""
thread_list=[]
submitters=0
workers=0
queue=None
task_generator=None
timeout=0
def__init__(self,task_gen,submitters=1,workers=1,timeout=2):
iflen(self.thread_list)!=0:
raiseRuntimeError('Executorcanonlyinstanceonce.')
self.queue=Queue.Queue(maxsize=submitters*2+workers*2)
self.submitters=submitters
self.workers=workers
self.task_generator=task_gen
self.timeout=timeout
defstart(self):
foriinrange(self.submitters):
submitter=SubmitterThread(self.queue,self.task_generator,self.timeout)
self.thread_list.append(submitter)
submitter.setName('Submitter-%d'%i)
submitter.setDaemon(True)
submitter.start()
foriinrange(self.workers):
worker=WorkerThread(self.queue,self.timeout)
self.thread_list.append(worker)
worker.setName('Worker-%d'%i)
worker.setDaemon(True)
worker.start()
defis_alive(self):
alive=False
fortinself.thread_list:
ift.isAlive():
alive=True
break
returnalive
defwait_to_shutdown(self):
_logger.debug('Starttowaittoshutdown')
fortinself.thread_list:
t.join()
_logger.debug('Shutdownthread:%s'%t.name)
Executor类保存了线程池,提供相应接口。有了这个抽象之后,只需要实例化Executor类的对象,然后调用start方法进行多线程任务的运行。并可以用is_alive等接口再主线程内进行其他处理。
后续再使用这个抽象进行实际多线程任务的实现。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。