Python多进程通信Queue、Pipe、Value、Array实例
queue和pipe的区别:pipe用来在两个进程间通信。queue用来在多个进程间实现通信。此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。
1)Queue&JoinableQueue
queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。
multiprocessing.JoinableQueue是Queue的子类,增加了task_done()和join()方法。
task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
join()阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。
代码:
importmultiprocessing importtime
classConsumer(multiprocessing.Process): def__init__(self,task_queue,result_queue): multiprocessing.Process.__init__(self) self.task_queue=task_queue self.result_queue=result_queue
defrun(self): proc_name=self.name whileTrue: next_task=self.task_queue.get() ifnext_taskisNone: #Poisonpillmeansshutdown print('%s:Exiting'%proc_name) self.task_queue.task_done() break print('%s:%s'%(proc_name,next_task)) answer=next_task()#__call__() self.task_queue.task_done() self.result_queue.put(answer) return
classTask(object): def__init__(self,a,b): self.a=a self.b=b def__call__(self): time.sleep(0.1)#pretendtotakesometimetodothework return'%s*%s=%s'%(self.a,self.b,self.a*self.b) def__str__(self): return'%s*%s'%(self.a,self.b)
if__name__=='__main__': #Establishcommunicationqueues tasks=multiprocessing.JoinableQueue() results=multiprocessing.Queue() #Startconsumers num_consumers=multiprocessing.cpu_count() print('Creating%dconsumers'%num_consumers) consumers=[Consumer(tasks,results) foriinrange(num_consumers)] forwinconsumers: w.start() #Enqueuejobs num_jobs=10 foriinrange(num_jobs): tasks.put(Task(i,i)) #Addapoisonpillforeachconsumer foriinrange(num_consumers): tasks.put(None)
#Waitforallofthetaskstofinish tasks.join() #Startprintingresults whilenum_jobs: result=results.get() print('Result:',result) num_jobs-=1