python使用threading.Condition交替打印两个字符
Python中使用threading.Condition交替打印两个字符的程序。
这个程序涉及到两个线程的的协调问题,两个线程为了能够相互协调运行,必须持有一个共同的状态,通过这个状态来维护两个线程的执行,通过使用threading.Condition对象就能够完成两个线程之间的这种协调工作。
threading.Condition默认情况下会通过持有一个ReentrantLock来协调线程之间的工作,所谓可重入锁,是只一个可以由一个线程递归获取的锁,此锁对象会维护当前锁的所有者(线程)和当前所有者递归获取锁的次数(本文在逻辑上和可重入锁没有任何关系,完全可以用一个普通锁替代)。
Python文档中给出的描述是:它是一个与某个锁相联系的变量。同时它实现了上下文管理协议。其对象中除了acquire和release方法之外,其它方法的调用的前提是,当前线程必须是这个锁的所有者。
通过代码和其中的注释,能够非常明白地弄清楚Condition的原理是怎样的:
importthreading importtime importfunctools defworker(cond,name): """workerrunningindifferentthread""" withcond:#通过__enter__方法,获取cond对象中的锁,默认是一个ReentrantLock对象 print('...{}-{}-{}'.format(name,threading.current_thread().getName(),cond._is_owned())) cond.wait()#创建一个新的锁NEWLOCK,调用acquire将NEWLOCK获取,然后将NEWLOCK放入等待列表中,\ #释放cond._lock锁(_release_save),最后再次调用acquire让NEWLOCK阻塞 print('waitreturnedin{}'.format(name)) if__name__=='__main__': condition=threading.Condition() t1=threading.Thread(target=functools.partial(worker,condition,'t1')) t2=threading.Thread(target=functools.partial(worker,condition,'t2')) t2.start()#启动线程2 t1.start()#启动线程1 time.sleep(2) withcondition: condition.notify(1)#按照FIFO顺序(wait调用顺序),释放一个锁,并将其从等待列表中删除 time.sleep(2) withcondition: condition.notify(1)#按照FIFO顺序(wait调用顺序),释放另一个锁,并将其从等待队列中删除 t1.join()#主线程等待子线程结束 t2.join()#主线程等待子线程结束 print('Alldone')
其输出为:
...t2-Thread-2-True ...t1-Thread-1-True waitreturnedint2 waitreturnedint1 Alldone
其中wait方法要求获取到threading.Condition对象中的锁(如果没有提供,默认使用一个可重入锁),然后自己创建一个新的普通锁(NEWLOCK),并获取这个NEWLOCK;之后调用_release_save方法释放threading.Condition对象中的锁,让其它线程能够获取到;最后再次调用NEWLOCK上的acquire方法,由于在创建时已经acquire过,所以此线程会阻塞在此。而wait想要继续执行,必须等待其它线程将产生阻塞的这个NEWLOCK给release掉,当然,这就是notify方法的责任了。
notify方法接收一个数字n,从等待列表中取出相应数量的等待对象(让wait方法阻塞的锁对象),调用其release方法,让对应的wait方法能够返回。而notify_all方法仅仅就是将n设置为等待列表的总长度而已。
在理解了threading.Condition对象中wait和notify的工作原理之后,我们就可以利用它们来实现两个线程交替打印字符的功能了:
importthreading importfunctools importtime defprint_a(state): whileTrue: ifstate.closed: print('Closea') return print('A') time.sleep(2) state.set_current_is_a(True) state.wait_for_b() defprint_b(state): whileTrue: ifstate.closed: print('Closeb') return state.wait_for_a() print('B') time.sleep(2) state.set_current_is_a(False) if__name__=='__main__': classState(object): """stateusedtocoordinatemultiple(twohere)threads""" def__init__(self): self.condition=threading.Condition() self.current_is_a=False self.closed=False defwait_for_a(self): withself.condition: whilenotself.current_is_a: self.condition.wait() defwait_for_b(self): withself.condition: whileself.current_is_a: self.condition.wait() defset_current_is_a(self,flag): self.current_is_a=flag withself.condition: self.condition.notify_all() state=State() t1=threading.Thread(target=functools.partial(print_a,state)) t2=threading.Thread(target=functools.partial(print_b,state)) try: t1.start() t2.start() t1.join() t2.join() exceptKeyboardInterrupt: state.closed=True print('Closed')
可以看到有两种类型的任务,一个用于打印字符A,一个用于打印字符B,我们的实现种让A先于B打印,所以在print_a中,先打印A,再设置当前字符状态并释放等待列表中的所有锁(set_current_is_a),如果没有这一步,current_is_a将一直是False,wait_for_b能够返回,而wait_for_a却永远不会返回,最终效果就是每隔两秒就打印一个字符A,而B永远不会打印。另一个副作用是如果wait_for_a永远不会返回,那print_b所在线程的关闭逻辑也就无法执行,最终会成为僵尸线程(这里的关闭逻辑只用作示例,生产环境需要更加完善的关闭机制)。
考虑另一种情况,print_a种将set_current_is_a和wait_for_b交换一下位置会怎么样。从观察到的输出我们看到,程序首先输出了一个字符A,以后,每隔2秒钟,就会同时输出A和B,而不是交替输出。原因在于,由于current_is_a还是False,我们先调用的wait_for_b其会立即返回,之后调用set_current_is_a,将current_is_a设置为True,并释放所有的阻塞wait的锁(notify_all),这个过程中没有阻塞,print_a紧接着进入了下一个打印循环;与此同时,print_b中的wait_for_a也返回了,进入到B的打印循环,故最终我们看到A和B总是一起打印。
可见对于threading.Condition的使用需要多加小心,要注意逻辑上的严谨性。
附一个队列版本:
importthreading importfunctools importtime fromqueueimportQueue defprint_a(q_a,q_b): whileTrue: char_a=q_a.get() ifchar_a=='closed': return print(char_a) time.sleep(2) q_b.put('B') defprint_b(q_a,q_b): whileTrue: char_b=q_b.get() ifchar_b=='closed': return print(char_b) time.sleep(2) q_a.put('A') if__name__=='__main__': q_a=Queue() q_b=Queue() t1=threading.Thread(target=functools.partial(print_a,q_a,q_b)) t2=threading.Thread(target=functools.partial(print_b,q_a,q_b)) try: t1.start() t2.start() q_a.put('A') t1.join() t2.join() exceptKeyboardInterrupt: q_a.put('closed') q_b.put('closed') print('Done')
队列版本逻辑更清晰,更不容易出错,实际应用中应该选用队列。
附一个协程版本(Python3.5+):
importtime importasyncio asyncdefprint_a(): whileTrue: print('a') time.sleep(2)#simulatetheCPUblocktime awaitasyncio.sleep(0)#releasecontroltoeventloop asyncdefprint_b(): whileTrue: print('b') time.sleep(2)#simulatetheCPUblocktime awaitasyncio.sleep(0)#releasecontroltoeventloop asyncdefmain(): awaitasyncio.wait([print_a(),print_b()]) if__name__=='__main__': loop=asyncio.get_event_loop() loop.run_until_complete(main())
协程的运行需要依附于一个事件循环(select/poll/epoll/kqueue),通过asyncdef将一个函数定义为协程,通过await主动让渡控制权,通过相互让渡控制权完成交替打印字符。整个程序运行于一个线程中,这样就没有线程间协调的工作,仅仅是控制权的让渡逻辑。对于IO密集型操作,而没有明显的CPU阻塞(计算复杂,以致出现明显的延时,比如复杂加解密算法)的情况下非常合适。
附一个Java版本:
PrintMain类,用于管理和协调打印A和打印B的两个线程:
packagecom.cuttyfox.tests.self.version1; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; importjava.util.concurrent.TimeUnit; publicclassPrintMain{ privatebooleancurrentIsA=false; publicsynchronizedvoidwaitingForPrintingA()throwsInterruptedException{ while(this.currentIsA==false){ wait(); } } publicsynchronizedvoidwaitingForPrintingB()throwsInterruptedException{ while(this.currentIsA==true){ wait(); } } publicsynchronizedvoidsetCurrentIsA(booleanflag){ this.currentIsA=flag; notifyAll(); } publicstaticvoidmain(String[]args)throwsException{ PrintMainstate=newPrintMain(); ExecutorServiceexecutorService=Executors.newCachedThreadPool(); executorService.execute(newPrintB(state)); executorService.execute(newPrintA(state)); executorService.shutdown(); executorService.awaitTermination(10,TimeUnit.SECONDS); System.out.println("Done"); System.exit(0); } }
打印A的线程(首先打印A):
packagecom.cuttyfox.tests.self.version1; importjava.util.concurrent.TimeUnit; publicclassPrintAimplementsRunnable{ privatePrintMainstate; publicPrintA(PrintMainstate){ this.state=state; } publicvoidrun(){ try{ while(!Thread.interrupted()){ System.out.println("PrintA"); TimeUnit.SECONDS.sleep(1); this.state.setCurrentIsA(true); this.state.waitingForPrintingB(); } }catch(InterruptedExceptione){ System.out.println("ExitthroughInterrupting."); } } }
打印B的线程:
packagecom.cuttyfox.tests.self.version1; importjava.util.concurrent.TimeUnit; publicclassPrintBimplementsRunnable{ privatePrintMainstate; publicPrintB(PrintMainstate){ this.state=state; } publicvoidrun(){ try{ while(!Thread.interrupted()){ this.state.waitingForPrintingA(); System.out.println("PrintB"); TimeUnit.SECONDS.sleep(1); this.state.setCurrentIsA(false); } }catch(InterruptedExceptione){ System.out.println("ExitthroughInterrupting."); } } }
Java对象本身有对象锁,故这里没有像Python中那样需要显式通过创建一个Condition对象来得到一把锁。
使用Python实现交替打印abcdef的过程:
importthreading importtime importfunctools fromcollectionsimportdeque LETTERS=[chr(code)forcodeinrange(97,97+6)] LENGTH=len(LETTERS) classState(object): def__init__(self): self.condition=threading.Condition() self.index_value=0 defset_next_index(self,index): withself.condition: self.index_value=index self.condition.notify_all() defwait_for(self,index_value): withself.condition: whilenotself.index_value==index_value: self.condition.wait() defprint_letter(state:State,wait_ident:int): print('Got:{}!'.format(wait_ident)) whileTrue: state.wait_for(wait_ident) time.sleep(2) print(LETTERS[state.index_value]) print('PRINT:{}ANDSETNEXT:{}'.format(state.index_value, (state.index_value+1)%LENGTH )) state.set_next_index((state.index_value+1)%LENGTH) state=State() d=deque() d.extend(range(LENGTH)) d.rotate(1) print(d) threads=[] forwait_identind: t=threading.Thread(target=functools.partial(print_letter,state,wait_ident)) threads.append(t) forthreadinthreads: thread.start() forthreadinthreads: thread.join()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。