一文了解Python并发编程的工程实现方法
上一篇文章介绍了线程的使用。然而Python中由于GlobalInterpreterLock(全局解释锁GIL)的存在,每个线程在在执行时需要获取到这个GIL,在同一时刻中只有一个线程得到解释锁的执行,Python中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。如果要充分利用现代多核CPU的并发能力,就要使用multipleprocessing模块了。
0x01multipleprocessing
与使用线程的threading模块类似,multipleprocessing模块提供许多高级API。最常见的是Pool对象了,使用它的接口能很方便地写出并发执行的代码。
frommultiprocessingimportPool deff(x): returnx*x if__name__=='__main__': withPool(5)asp: #map方法的作用是将f()方法并发地映射到列表中的每个元素 print(p.map(f,[1,2,3])) #执行结果 #[1,4,9]
关于Pool下文中还会提到,这里我们先来看Process。
Process
要创建一个进程可以使用Process类,使用start()方法启动进程。
frommultiprocessingimportProcess importos defecho(text): #父进程ID print("ProcessParentID:",os.getppid()) #进程ID print("ProcessPID:",os.getpid()) print('echo:',text) if__name__=='__main__': p=Process(target=echo,args=('helloprocess',)) p.start() p.join() #执行结果 #ProcessParentID:27382 #ProcessPID:27383 #echo:helloprocess
进程池
正如开篇提到的multiprocessing模块提供了Pool类可以很方便地实现一些简单多进程场景。它主要有以下接口
- apply(func[,args[,kwds]])
- 执行func(args,kwds)方法,在方法结束返回前会阻塞。
- apply_async(func[,args[,kwds[,callback[,error_callback]]]])
- 异步执行func(args,kwds),会立即返回一个result对象,如果指定了callback参数,结果会通过回调方法返回,还可以指定执行出错的回调方法error_callback()
- map(func,iterable[,chunksize])
- 类似内置函数map(),可以并发执行func,是同步方法
- map_async(func,iterable[,chunksize[,callback[,error_callback]]])
- 异步版本的map
- close()
- 关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。
- terminate()
- 终止进程池
- join()
- 等待工作进程执行完,必需先调用close()或者terminate()
frommultiprocessingimportPool deff(x): returnx*x if__name__=='__main__': withPool(5)asp: #map方法的作用是将f()方法并发地映射到列表中的每个元素 a=p.map(f,[1,2,3]) print(a) #异步执行map b=p.map_async(f,[3,5,7,11]) #b是一个result对象,代表方法的执行结果 print(b) #为了拿到结果,使用join方法等待池中工作进程退出 p.close() #调用join方法前,需先执行close或terminate方法 p.join() #获取执行结果 print(b.get()) #执行结果 #[1,4,9] ##[9,25,49,121]
map_async()和apply_async()执行后会返回一个classmultiprocessing.pool.AsyncResult对象,通过它的get()可以获取到执行结果,ready()可以判断AsyncResult的结果是否准备好。
进程间数据的传输
multiprocessing模块提供了两种方式用于进程间的数据共享:队列(Queue)和管道(Pipe)
Queue是线程安全,也是进程安全的。使用Queue可以实现进程间的数据共享,例如下面的demo中子进程put一个对象,在主进程中就能get到这个对象。任何可以序列化的对象都可以通过Queue来传输。
frommultiprocessingimportProcess,Queue deff(q): q.put([42,None,'hello']) if__name__=='__main__': #使用Queue进行数据通信 q=Queue() p=Process(target=f,args=(q,)) p.start() #主进程取得子进程中的数据 print(q.get())#prints"[42,None,'hello']" p.join() #执行结果 #[42,None,'hello']
Pipe()返回一对通过管道连接的Connection对象。这两个对象可以理解为管道的两端,它们通过send()和recv()发送和接收数据。
frommultiprocessingimportProcess,Pipe defwrite(conn): #子进程中发送一个对象 conn.send([42,None,'hello']) conn.close() defread(conn): #在读的进程中通过recv接收对象 data=conn.recv() print(data) if__name__=='__main__': #Pipe()方法返回一对连接对象 w_conn,r_conn=Pipe() wp=Process(target=write,args=(w_conn,)) rp=Process(target=read,args=(r_conn,)) wp.start() rp.start() #执行结果 #[42,None,'hello']
需要注意的是,两个进程不能同时对一个连接对象进行send或recv操作。
同步
我们知道线程间的同步是通过锁机制来实现的,进程也一样。
frommultiprocessingimportProcess,Lock importtime defprint_with_lock(l,i): l.acquire() try: time.sleep(1) print('helloworld',i) finally: l.release() defprint_without_lock(i): time.sleep(1) print('helloworld',i) if__name__=='__main__': lock=Lock() #先执行有锁的 fornuminrange(5): Process(target=print_with_lock,args=(lock,num)).start() #再执行无锁的 #fornuminrange(5): #Process(target=print_without_lock,args=(num,)).start()
有锁的代码将每秒依次打印
helloworld0
helloworld1
helloworld2
helloworld3
helloworld4
如果执行无锁的代码,则在我的电脑上执行结果是这样的
helloworldhelloworld 0
1
helloworld2
helloworld3
helloworld4
除了Lock,还包括RLock、Condition、Semaphore和Event等进程间的同步原语。其用法也与线程间的同步原语很类似。API使用可以参考文末中引用的文档链接。
在工程中实现进程间的数据共享应当优先使用队列或管道。
0x02总结
本文对multiprocessing模块中常见的API作了简单的介绍。讲述了Process和Pool的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。