python线程池如何使用
线程池的使用
线程池的基类是concurrent.futures模块中的Executor,Executor提供了两个子类,即ThreadPoolExecutor和ProcessPoolExecutor,其中ThreadPoolExecutor用于创建线程池,而ProcessPoolExecutor用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的task函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
Exectuor提供了如下常用方法:
- submit(fn,*args,**kwargs):将fn函数提交给线程池。*args代表传给fn函数的参数,*kwargs代表以关键字参数的形式为fn函数传入参数。
- map(func,*iterables,timeout=None,chunksize=1):该函数类似于全局函数map(func,*iterables),只是该函数将会启动多个线程,以异步方式立即对iterables执行map处理。
- shutdown(wait=True):关闭线程池。
程序将task函数提交(submit)给线程池后,submit方法会返回一个Future对象,Future类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以Python使用Future来代表。
实际上,在Java的多线程编程中同样有Future,此处的Future与Java的Future大同小异。
Future提供了如下方法:
- cancel():取消该Future代表的线程任务。如果该任务正在执行,不可取消,则该方法返回False;否则,程序会取消该任务,并返回True。
- cancelled():返回Future代表的线程任务是否被成功取消。
- running():如果该Future代表的线程任务正在执行、不可被取消,该方法返回True。
- done():如果该Funture代表的线程任务被成功取消或执行完成,则该方法返回True。
- result(timeout=None):获取该Future代表的线程任务最后返回的结果。如果Future代表的线程任务还未完成,该方法将会阻塞当前线程,其中timeout参数指定最多阻塞多少秒。
- exception(timeout=None):获取该Future代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回None。
- add_done_callback(fn):为该Future代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该fn函数。
在用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列。调用shutdown()方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
a、调用ThreadPoolExecutor类的构造器创建一个线程池。
b、定义一个普通函数作为线程任务。
c、调用ThreadPoolExecutor对象的submit()方法来提交线程任务。
d、当不想提交任何任务时,调用ThreadPoolExecutor对象的shutdown()方法来关闭线程池。
下面程序示范了如何使用线程池来执行线程任务:
fromconcurrent.futuresimportThreadPoolExecutor importthreading importtime #定义一个准备作为线程任务的函数 defaction(max): my_sum=0 foriinrange(max): print(threading.current_thread().name+''+str(i)) my_sum+=i returnmy_sum #创建一个包含2条线程的线程池 pool=ThreadPoolExecutor(max_workers=2) #向线程池提交一个task,50会作为action()函数的参数 future1=pool.submit(action,50) #向线程池再提交一个task,100会作为action()函数的参数 future2=pool.submit(action,100) #判断future1代表的任务是否结束 print(future1.done()) time.sleep(3) #判断future2代表的任务是否结束 print(future2.done()) #查看future1代表的任务返回的结果 print(future1.result()) #查看future2代表的任务返回的结果 print(future2.result()) #关闭线程池 pool.shutdown()
上面程序中,第13行代码创建了一个包含两个线程的线程池,接下来的两行代码只要将action()函数提交(submit)给线程池,该线程池就会负责启动线程来执行action()函数。这种启动线程的方法既优雅,又具有更高的效率。
当程序把action()函数提交给线程池时,submit()方法会返回该任务所对应的Future对象,程序立即判断futurel的done()方法,该方法将会返回False(表明此时该任务还未完成)。接下来主程序暂停3秒,然后判断future2的done()方法,如果此时该任务已经完成,那么该方法将会返回True。
程序最后通过Future的result()方法来获取两个异步任务返回的结果。
读者可以自己运行此代码查看运行结果,这里不再演示。
当程序使用Future的result()方法来获取结果时,该方法会阻塞当前线程,如果没有指定timeout参数,当前线程将一直处于阻塞状态,直到Future代表的任务返回。
获取执行结果
前面程序调用了Future的result()方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result()方法的阻塞才会被解除。
如果程序不希望直接调用result()方法阻塞线程,则可通过Future的add_done_callback()方法来添加回调函数,该回调函数形如fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的Future对象作为参数传给该回调函数。
下面程序使用add_done_callback()方法来获取线程任务的返回值:
fromconcurrent.futuresimportThreadPoolExecutor importthreading importtime #定义一个准备作为线程任务的函数 defaction(max): my_sum=0 foriinrange(max): print(threading.current_thread().name+''+str(i)) my_sum+=i returnmy_sum #创建一个包含2条线程的线程池 withThreadPoolExecutor(max_workers=2)aspool: #向线程池提交一个task,50会作为action()函数的参数 future1=pool.submit(action,50) #向线程池再提交一个task,100会作为action()函数的参数 future2=pool.submit(action,100) defget_result(future): print(future.result()) #为future1添加线程完成的回调函数 future1.add_done_callback(get_result) #为future2添加线程完成的回调函数 future2.add_done_callback(get_result) print('--------------')
上面主程序分别为future1、future2添加了同一个回调函数,该回调函数会在线程任务结束时获取其返回值。
主程序的最后一行代码打印了一条横线。由于程序并未直接调用future1、future2的result()方法,因此主线程不会被阻塞,可以立即看到输出主线程打印出的横线。接下来将会看到两个新线程并发执行,当线程任务执行完成后,get_result()函数被触发,输出线程任务的返回值。
另外,由于线程池实现了上下文管理协议(ContextManageProtocol),因此,程序可以使用with语句来管理线程池,这样即可避免手动关闭线程池,如上面的程序所示。
此外,Exectuor还提供了一个map(func,*iterables,timeout=None,chunksize=1)方法,该方法的功能类似于全局函数map(),区别在于线程池的map()方法会为iterables的每个元素启动一个线程,以并发方式来执行func函数。这种方式相当于启动len(iterables)个线程,井收集每个线程的执行结果。
例如,如下程序使用Executor的map()方法来启动线程,并收集线程任务的返回值:
fromconcurrent.futuresimportThreadPoolExecutor importthreading importtime #定义一个准备作为线程任务的函数 defaction(max): my_sum=0 foriinrange(max): print(threading.current_thread().name+''+str(i)) my_sum+=i returnmy_sum #创建一个包含4条线程的线程池 withThreadPoolExecutor(max_workers=4)aspool: #使用线程执行map计算 #后面元组有3个元素,因此程序启动3条线程来执行action函数 results=pool.map(action,(50,100,150)) print('--------------') forrinresults: print(r)
上面程序使用map()方法来启动3个线程(该程序的线程池包含4个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map()方法的返回值将会收集每个线程任务的返回结果。
运行上面程序,同样可以看到3个线程并发执行的结果,最后通过results可以看到3个线程任务的返回结果。
通过上面程序可以看出,使用map()方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行action()函数,但最后收集的action()函数的执行结果,依然与传入参数的结果保持一致。也就是说,上面results的第一个元素是action(50)的结果,第二个元素是action(100)的结果,第三个元素是action(150)的结果。
实例扩展:
#coding:utf-8 importQueue importthreading importsys importtime importmath classWorkThread(threading.Thread): def__init__(self,task_queue): threading.Thread.__init__(self) self.setDaemon(True) self.task_queue=task_queue self.start() self.idle=True defrun(self): sleep_time=0.01#第1次无任务可做时休息10毫秒 multiply=0 whileTrue: try: #从队列中取一个任务 func,args,kwargs=self.task_queue.get(block=False) self.idle=False multiply=0 #执行之 func(*args,**kwargs) exceptQueue.Empty: time.sleep(sleep_time*math.pow(2,multiply)) self.idle=True multiply+=1 continue except: printsys.exc_info() raise classThreadPool: def__init__(self,thread_num=10,max_queue_len=1000): self.max_queue_len=max_queue_len self.task_queue=Queue.Queue(max_queue_len)#任务等待队列 self.threads=[] self.__create_pool(thread_num) def__create_pool(self,thread_num): foriinxrange(thread_num): thread=WorkThread(self.task_queue) self.threads.append(thread) defadd_task(self,func,*args,**kwargs): '''添加一个任务,返回任务等待队列的长度 调用该方法前最后先调用isSafe()判断一下等待的任务是不是很多,以防止提交的任务被拒绝 ''' try: self.task_queue.put((func,args,kwargs)) exceptQueue.Full: raise#队列已满时直接抛出异常,不给执行 returnself.task_queue.qsize() defisSafe(self): '''等待的任务数量离警界线还比较远 ''' returnself.task_queue.qsize()<0.9*self.max_queue_len defwait_for_complete(self): '''等待提交到线程池的所有任务都执行完毕 ''' #首先任务等待队列要变成空 whilenotself.task_queue.empty(): time.sleep(1) #其次,所以计算线程要变成idle状态 whileTrue: all_idle=True forthinself.threads: ifnotth.idle: all_idle=False break ifall_idle: break else: time.sleep(1) if__name__=='__main__': deffoo(a,b): printa+b time.sleep(0.01) thread_pool=ThreadPool(10,100) '''在Windows上测试不通过,Windows上Queue.Queue不是线程安全的''' size=0 foriinxrange(10000): try: size=thread_pool.add_task(foo,i,2*i) exceptQueue.Full: print'queuefull,queuesizeis',size time.sleep(2)
到此这篇关于python线程池如何使用的文章就介绍到这了,更多相关python中的线程池详解内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!