Python多进程multiprocessing用法实例分析
本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:
mutilprocess简介
像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
简单的创建进程:
importmultiprocessing defworker(num): """threadworkerfunction""" print'Worker:',num return if__name__=='__main__': jobs=[] foriinrange(5): p=multiprocessing.Process(target=worker,args=(i,)) jobs.append(p) p.start()
确定当前的进程,即是给进程命名,方便标识区分,跟踪
importmultiprocessing importtime defworker(): name=multiprocessing.current_process().name printname,'Starting' time.sleep(2) printname,'Exiting' defmy_service(): name=multiprocessing.current_process().name printname,'Starting' time.sleep(3) printname,'Exiting' if__name__=='__main__': service=multiprocessing.Process(name='my_service', target=my_service) worker_1=multiprocessing.Process(name='worker1', target=worker) worker_2=multiprocessing.Process(target=worker)#defaultname worker_1.start() worker_2.start() service.start()
守护进程就是不阻挡主程序退出,自己干自己的mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了
守护进程:
importmultiprocessing importtime importsys defdaemon(): name=multiprocessing.current_process().name print'Starting:',name time.sleep(2) print'Exiting:',name defnon_daemon(): name=multiprocessing.current_process().name print'Starting:',name print'Exiting:',name if__name__=='__main__': d=multiprocessing.Process(name='daemon', target=daemon) d.daemon=True n=multiprocessing.Process(name='non-daemon', target=non_daemon) n.daemon=False d.start() n.start() d.join(1) print'd.is_alive()',d.is_alive() n.join()
最好使用poisonpill,强制的使用terminate()注意terminate之后要join,使其可以更新状态
终止进程:
importmultiprocessing importtime defslow_worker(): print'Startingworker' time.sleep(0.1) print'Finishedworker' if__name__=='__main__': p=multiprocessing.Process(target=slow_worker) print'BEFORE:',p,p.is_alive() p.start() print'DURING:',p,p.is_alive() p.terminate() print'TERMINATED:',p,p.is_alive() p.join() print'JOINED:',p,p.is_alive()
①.==0未生成任何错误
②.0进程有一个错误,并以该错误码退出
③.<0进程由一个-1*exitcode信号结束
进程的退出状态:
importmultiprocessing importsys importtime defexit_error(): sys.exit(1) defexit_ok(): return defreturn_value(): return1 defraises(): raiseRuntimeError('Therewasanerror!') defterminated(): time.sleep(3) if__name__=='__main__': jobs=[] forfin[exit_error,exit_ok,return_value,raises,terminated]: print'Startingprocessfor',f.func_name j=multiprocessing.Process(target=f,name=f.func_name) jobs.append(j) j.start() jobs[-1].terminate() forjinjobs: j.join() print'%15s.exitcode=%s'%(j.name,j.exitcode)
方便的调试,可以用logging
日志:
importmultiprocessing importlogging importsys defworker(): print'Doingsomework' sys.stdout.flush() if__name__=='__main__': multiprocessing.log_to_stderr() logger=multiprocessing.get_logger() logger.setLevel(logging.INFO) p=multiprocessing.Process(target=worker) p.start() p.join()
利用class来创建进程,定制子类
派生进程:
importmultiprocessing classWorker(multiprocessing.Process): defrun(self): print'In%s'%self.name return if__name__=='__main__': jobs=[] foriinrange(5): p=Worker() jobs.append(p) p.start() forjinjobs: j.join()
python进程间传递消息:
importmultiprocessing classMyFancyClass(object): def__init__(self,name): self.name=name defdo_something(self): proc_name=multiprocessing.current_process().name print'Doingsomethingfancyin%sfor%s!'%\ (proc_name,self.name) defworker(q): obj=q.get() obj.do_something() if__name__=='__main__': queue=multiprocessing.Queue() p=multiprocessing.Process(target=worker,args=(queue,)) p.start() queue.put(MyFancyClass('FancyDan')) #Waitfortheworkertofinish queue.close() queue.join_thread() p.join() importmultiprocessing importtime classConsumer(multiprocessing.Process): def__init__(self,task_queue,result_queue): multiprocessing.Process.__init__(self) self.task_queue=task_queue self.result_queue=result_queue defrun(self): proc_name=self.name whileTrue: next_task=self.task_queue.get() ifnext_taskisNone: #Poisonpillmeansshutdown print'%s:Exiting'%proc_name self.task_queue.task_done() break print'%s:%s'%(proc_name,next_task) answer=next_task() self.task_queue.task_done() self.result_queue.put(answer) return classTask(object): def__init__(self,a,b): self.a=a self.b=b def__call__(self): time.sleep(0.1)#pretendtotakesometimetodothework return'%s*%s=%s'%(self.a,self.b,self.a*self.b) def__str__(self): return'%s*%s'%(self.a,self.b) if__name__=='__main__': #Establishcommunicationqueues tasks=multiprocessing.JoinableQueue() results=multiprocessing.Queue() #Startconsumers num_consumers=multiprocessing.cpu_count()*2 print'Creating%dconsumers'%num_consumers consumers=[Consumer(tasks,results) foriinxrange(num_consumers)] forwinconsumers: w.start() #Enqueuejobs num_jobs=10 foriinxrange(num_jobs): tasks.put(Task(i,i)) #Addapoisonpillforeachconsumer foriinxrange(num_consumers): tasks.put(None) #Waitforallofthetaskstofinish tasks.join() #Startprintingresults whilenum_jobs: result=results.get() print'Result:',result num_jobs-=1
Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
进程间信号传递:
importmultiprocessing importtime defwait_for_event(e): """Waitfortheeventtobesetbeforedoinganything""" print'wait_for_event:starting' e.wait() print'wait_for_event:e.is_set()->',e.is_set() defwait_for_event_timeout(e,t): """Waittsecondsandthentimeout""" print'wait_for_event_timeout:starting' e.wait(t) print'wait_for_event_timeout:e.is_set()->',e.is_set() if__name__=='__main__': e=multiprocessing.Event() w1=multiprocessing.Process(name='block', target=wait_for_event, args=(e,)) w1.start() w2=multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e,2)) w2.start() print'main:waitingbeforecallingEvent.set()' time.sleep(3) e.set() print'main:eventisset'
Python多进程,一般的情况是Queue来传递。
Queue:
frommultiprocessingimportProcess,Queue deff(q): q.put([42,None,'hello']) if__name__=='__main__': q=Queue() p=Process(target=f,args=(q,)) p.start() printq.get()#prints"[42,None,'hello']" p.join()
多线程优先队列Queue:
importQueue importthreading importtime exitFlag=0 classmyThread(threading.Thread): def__init__(self,threadID,name,q): threading.Thread.__init__(self) self.threadID=threadID self.name=name self.q=q defrun(self): print"Starting"+self.name process_data(self.name,self.q) print"Exiting"+self.name defprocess_data(threadName,q): whilenotexitFlag: queueLock.acquire() ifnotworkQueue.empty(): data=q.get() queueLock.release() print"%sprocessing%s"%(threadName,data) else: queueLock.release() time.sleep(1) threadList=["Thread-1","Thread-2","Thread-3"] nameList=["One","Two","Three","Four","Five"] queueLock=threading.Lock() workQueue=Queue.Queue(10) threads=[] threadID=1 #Createnewthreads fortNameinthreadList: thread=myThread(threadID,tName,workQueue) thread.start() threads.append(thread) threadID+=1 #Fillthequeue queueLock.acquire() forwordinnameList: workQueue.put(word) queueLock.release() #Waitforqueuetoempty whilenotworkQueue.empty(): pass #Notifythreadsit'stimetoexit exitFlag=1 #Waitforallthreadstocomplete fortinthreads: t.join() print"ExitingMainThread"
多进程使用Queue通信的例子
importtime frommultiprocessingimportProcess,Queue MSG_QUEUE=Queue(5) defstartA(msgQueue): whileTrue: ifmsgQueue.empty()>0: print('queueisempty%d'%(msgQueue.qsize())) else: msg=msgQueue.get() print('getmsg%s'%(msg,)) time.sleep(1) defstartB(msgQueue): whileTrue: msgQueue.put('helloworld') print('puthelloworldqueuesizeis%d'%(msgQueue.qsize(),)) time.sleep(3) if__name__=='__main__': processA=Process(target=startA,args=(MSG_QUEUE,)) processB=Process(target=startB,args=(MSG_QUEUE,)) processA.start() print('processAstart..')
主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。
更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《PythonSocket编程技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》
希望本文所述对大家Python程序设计有所帮助。