Python并发之多进程的方法实例代码
一,进程的理论基础
一个应用程序,归根结底是一堆代码,是静态的,而进程才是执行中的程序,在一个程序运行的时候会有多个进程并发执行。
进程和线程的区别:
- 进程是系统资源分配的基本单位。
- 一个进程内可以包含多个线程,属于一对多的关系,进程内的资源,被其内的线程共享
- 线程是进程运行的最小单位,如果说进程是完成一个功能,那么其线程就是完成这个功能的基本单位
- 进程间资源不共享,多进程切换资源开销,难度大,同一进程内的线程资源共享,多线程切换资源开销,难度小
进程与线程的共同点:
都是为了提高程序运行效率,都有执行的优先权
二,Python的多进程(multiprocessing模块)
创建一个进程(和创建线程类似)
方法一:创建Process对象,通过对象调用start()方法启动进程
frommultiprocessingimportProcess deffoo(name): print('hello,%s'%name) if__name__=='__main__': p1=Process(target=foo,args=('world',)) p2=Process(target=foo,args=('China',)) p1.start() p2.start() print('=====主进程=====') #=====主进程===== #hello,world #hello,China #主进程和子进程并发执行
注意:Process对象只能在在if__name__=='__main__':下创建,不然会报错。
方法二:自定义一个类继承Process类,并重写run()方法,将执行代码放在其内
frommultiprocessingimportProcess classMyProcess(Process): def__init__(self,name): super().__init__() self.name=name defrun(self): print('hello,%s'%self.name) if__name__=='__main__': myprocess1=MyProcess('world') myprocess2=MyProcess('world') myprocess1.start() myprocess2.start()
Process内置方法
实例方法:
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止。timeout是可选的超时时间
Process属性
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
p.pid:进程的pid
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
守护进程
类似于守护线程,只不过守护线程是对象的一个方法,而守护进程封装成对象的属性。
frommultiprocessingimportProcess importtime classMyProcess(Process): def__init__(self,name): super().__init__() self.name=name defrun(self): time.sleep(3) print('hello,%s'%self.name) if__name__=='__main__': myprocess1=MyProcess('world') myprocess1.daemon=True myprocess1.start() print('结束') #不会输出‘helloworld',因为设置为守护进程,主进程不会等待
也可以使用join方法,使主进程等待
frommultiprocessingimportProcess importtime classMyProcess(Process): def__init__(self,name): super().__init__() self.name=name defrun(self): time.sleep(3) print('hello,%s'%self.name) if__name__=='__main__': myprocess1=MyProcess('world') myprocess1.daemon=True myprocess1.start() myprocess1.join()#程序阻塞 print('结束') join()
进程同步和锁
进程虽然不像线程共享资源,但是这并不意味着进程间不需要加锁,比如不同进程会共享同一个终端(屏幕),或者操作同一个文件,数据库,那么数据安全还是很有必要的,因此我们可以加锁,
frommultiprocessingimportProcess,Lock importtime defa_print(l):#需要传入对象,因为信息不共享 l.acquire() print('我要打印信息') time.sleep(1) print('我打印完了') l.release() if__name__=='__main__': l=Lock() foriinrange(20): p=Process(target=a_print,args=(l,)) p.start()
信号量(Semaphore)
能够并发执行的进程数,超出的进程阻塞,直到有进程运行完成。
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release()时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞进程直到其他进程调用release()。
frommultiprocessingimportProcess,Queue,Semaphore importtime,random defseat(s,n): s.acquire() print('学生%d坐下了'%n) time.sleep(random.randint(1,2)) s.release() if__name__=='__main__': s=Semaphore(5) foriinrange(20): p=Process(target=seat,args=(s,i)) p.start() print('-----主进程-------')
注意:其实信号量和锁类似,只是限制进程运行某个代码块的数量(锁为1个),并不是能限制并发的进程,如上述代码,一次性还是创建了20个进程
事件(Event)
frommultiprocessingimportProcess,Event importtime,random defeating(event): event.wait() print('去吃饭的路上...') defmakeing(event): print('做饭中') time.sleep(random.randint(1,2)) print('做好了,快来...') event.set() if__name__=='__main__': event=Event() t1=Process(target=eating,args=(event,)) t2=Process(target=makeing,args=(event,)) t1.start() t2.start() #做饭中 #做好了,快来... #去吃饭的路上...
和线程事件几乎一致
进程队列(Queue)
进程队列是进程通讯的方式之一。使用multiprocessing下的Queue
frommultiprocessingimportProcess,Queue importtime deffunc1(queue): whileTrue: info=queue.get() ifinfo==None: return print(info) deffunc2(queue): foriinrange(10): time.sleep(1) queue.put('is%d'%i) queue.put(None)#结束的标志 if__name__=='__main__': q=Queue() p1=Process(target=func1,args=(q,)) p2=Process(target=func2,args=(q,)) p1.start() p2.start() Queue类的方法,源码如下: classQueue(object): def__init__(self,maxsize=-1):#可以传参设置队列最大容量 self._maxsize=maxsize defqsize(self):#返回当前时刻队列中的个数 return0 defempty(self):#是否为空 returnFalse deffull(self):是否满了 returnFalse defput(self,obj,block=True,timeout=None):#放值,blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常 pass defput_nowait(self,obj):#=put(False) pass defget(self,block=True,timeout=None):获取值,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. pass defget_nowait(self):#=get(False) pass defclose(self):#将队列关闭 pass defjoin_thread(self):#略,几乎不用 pass defcancel_join_thread(self): pass
进程队列源码注释
进程池
进程的消耗是很大的,因此我们不能无节制的开启新进程,因此我们可以通过维护一个进程池来控制进程的数量。这就不同于信号量,进程池可以从源头控制进程数量。在Python中可以通过如下方法使用
同步调用
frommultiprocessingimportPool importtime,random,os deffunc(n): pid=os.getpid() print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S')) time.sleep(2) res='处理%s'%random.choice(['成功','失败']) returnres if__name__=='__main__': p=Pool(4)#创建4个进程, li=[] foriinrange(10): res=p.apply(func,args=(i,))交给进程池处理,处理完成才返回值,会阻塞,即使池内还有空余进程,相当于顺序执行 li.append(res) foriinli: print(i)
#进程1916正在处理第0个任务时间21-02-53
#进程1240正在处理第1个任务时间21-02-55
#进程3484正在处理第2个任务时间21-02-57
#进程7512正在处理第3个任务时间21-02-59
#进程1916正在处理第4个任务时间21-03-01
#进程1240正在处理第5个任务时间21-03-03
#进程3484正在处理第6个任务时间21-03-05
#进程7512正在处理第7个任务时间21-03-07
#进程1916正在处理第8个任务时间21-03-09
#进程1240正在处理第9个任务时间21-03-11
从结果可以发现两点:
- 不是并发处理
- 一直都只有四个进程,串行执行
因此进程池提供了异步处理的方式
frommultiprocessingimportPool importtime,random,os deffunc(n): pid=os.getpid() print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S')) time.sleep(2) res='处理%s'%random.choice(['成功','失败']) returnres if__name__=='__main__': p=Pool(4) li=[] foriinrange(10): res=p.apply_async(func,args=(i,))结果不会立刻返回,遇到阻塞,开启下一个进程,在这,相当于几乎同时出现四个打印结果(一个线程处理一个任务,处理完下个任务才能进来) li.append(res) p.close()#join之前需要关闭进程池 p.join()#因为异步,所以需要等待池内进程工作结束再继续 foriinli: print(i.get())#i是一个对象,通过get方法获取返回值,而同步则没有该方法
关于回调函数
frommultiprocessingimportPool importtime,random,os deffunc(n): pid=os.getpid() print('进程%s正在处理第%d个任务'%(pid,n),'时间%s'%time.strftime('%H-%M-%S')) time.sleep(2) res='处理%s'%random.choice(['成功','失败']) returnres deffoo(info): print(info)#传入值为进程执行结果 if__name__=='__main__': p=Pool(4) li=[] foriinrange(10): res=p.apply_async(func,args=(i,),callback=foo)callback()回调函数会在进程执行完之后调用(主进程调用) li.append(res) p.close() p.join() foriinli: print(i.get())
有回调函数
总结
以上所述是小编给大家介绍的Python并发之多进程的方法实例代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对毛票票网站的支持!