Python 使用生成器代替线程的方法
问题
你想使用生成器(协程)替代系统线程来实现并发。这个有时又被称为用户级线程或绿色线程。
解决方案
要使用生成器实现自己的并发,你首先要对生成器函数和yield语句有深刻理解。yield语句会让一个生成器挂起它的执行,这样就可以编写一个调度器,将生成器当做某种“任务”并使用任务协作切换来替换它们的执行。要演示这种思想,考虑下面两个使用简单的yield语句的生成器函数:
#Twosimplegeneratorfunctions
defcountdown(n):
whilen>0:
print('T-minus',n)
yield
n-=1
print('Blastoff!')
defcountup(n):
x=0
whilex
这些函数在内部使用yield语句,下面是一个实现了简单任务调度器的代码:
fromcollectionsimportdeque
classTaskScheduler:
def__init__(self):
self._task_queue=deque()
defnew_task(self,task):
'''
Admitanewlystartedtasktothescheduler
'''
self._task_queue.append(task)
defrun(self):
'''
Rununtiltherearenomoretasks
'''
whileself._task_queue:
task=self._task_queue.popleft()
try:
#Rununtilthenextyieldstatement
next(task)
self._task_queue.append(task)
exceptStopIteration:
#Generatorisnolongerexecuting
pass
#Exampleuse
sched=TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()
TaskScheduler类在一个循环中运行生成器集合——每个都运行到碰到yield语句为止。运行这个例子,输出如下:
T-minus10
T-minus5
Countingup0
T-minus9
T-minus4
Countingup1
T-minus8
T-minus3
Countingup2
T-minus7
T-minus2
...
到此为止,我们实际上已经实现了一个“操作系统”的最小核心部分。生成器函数就是任务,而yield语句是任务挂起的信号。调度器循环检查任务列表直到没有任务要执行为止。
实际上,你可能想要使用生成器来实现简单的并发。那么,在实现actor或网络服务器的时候你可以使用生成器来替代线程的使用。
下面的代码演示了使用生成器来实现一个不依赖线程的actor:
fromcollectionsimportdeque
classActorScheduler:
def__init__(self):
self._actors={}#Mappingofnamestoactors
self._msg_queue=deque()#Messagequeue
defnew_actor(self,name,actor):
'''
Admitanewlystartedactortotheschedulerandgiveitaname
'''
self._msg_queue.append((actor,None))
self._actors[name]=actor
defsend(self,name,msg):
'''
Sendamessagetoanamedactor
'''
actor=self._actors.get(name)
ifactor:
self._msg_queue.append((actor,msg))
defrun(self):
'''
Runaslongastherearependingmessages.
'''
whileself._msg_queue:
actor,msg=self._msg_queue.popleft()
try:
actor.send(msg)
exceptStopIteration:
pass
#Exampleuse
if__name__=='__main__':
defprinter():
whileTrue:
msg=yield
print('Got:',msg)
defcounter(sched):
whileTrue:
#Receivethecurrentcount
n=yield
ifn==0:
break
#Sendtotheprintertask
sched.send('printer',n)
#Sendthenextcounttothecountertask(recursive)
sched.send('counter',n-1)
sched=ActorScheduler()
#Createtheinitialactors
sched.new_actor('printer',printer())
sched.new_actor('counter',counter(sched))
#Sendaninitialmessagetothecountertoinitiate
sched.send('counter',10000)
sched.run()
完全弄懂这段代码需要更深入的学习,但是关键点在于收集消息的队列。本质上,调度器在有需要发送的消息时会一直运行着。计数生成器会给自己发送消息并在一个递归循环中结束。
下面是一个更加高级的例子,演示了使用生成器来实现一个并发网络应用程序:
fromcollectionsimportdeque
fromselectimportselect
#Thisclassrepresentsagenericyieldeventinthescheduler
classYieldEvent:
defhandle_yield(self,sched,task):
pass
defhandle_resume(self,sched,task):
pass
#TaskScheduler
classScheduler:
def__init__(self):
self._numtasks=0#Totalnumoftasks
self._ready=deque()#Tasksreadytorun
self._read_waiting={}#Taskswaitingtoread
self._write_waiting={}#Taskswaitingtowrite
#PollforI/Oeventsandrestartwaitingtasks
def_iopoll(self):
rset,wset,eset=select(self._read_waiting,
self._write_waiting,[])
forrinrset:
evt,task=self._read_waiting.pop(r)
evt.handle_resume(self,task)
forwinwset:
evt,task=self._write_waiting.pop(w)
evt.handle_resume(self,task)
defnew(self,task):
'''
Addanewlystartedtasktothescheduler
'''
self._ready.append((task,None))
self._numtasks+=1
defadd_ready(self,task,msg=None):
'''
Appendanalreadystartedtasktothereadyqueue.
msgiswhattosendintothetaskwhenitresumes.
'''
self._ready.append((task,msg))
#Addatasktothereadingset
def_read_wait(self,fileno,evt,task):
self._read_waiting[fileno]=(evt,task)
#Addatasktothewriteset
def_write_wait(self,fileno,evt,task):
self._write_waiting[fileno]=(evt,task)
defrun(self):
'''
Runthetaskscheduleruntiltherearenotasks
'''
whileself._numtasks:
ifnotself._ready:
self._iopoll()
task,msg=self._ready.popleft()
try:
#Runthecoroutinetothenextyield
r=task.send(msg)
ifisinstance(r,YieldEvent):
r.handle_yield(self,task)
else:
raiseRuntimeError('unrecognizedyieldevent')
exceptStopIteration:
self._numtasks-=1
#Exampleimplementationofcoroutine-basedsocketI/O
classReadSocket(YieldEvent):
def__init__(self,sock,nbytes):
self.sock=sock
self.nbytes=nbytes
defhandle_yield(self,sched,task):
sched._read_wait(self.sock.fileno(),self,task)
defhandle_resume(self,sched,task):
data=self.sock.recv(self.nbytes)
sched.add_ready(task,data)
classWriteSocket(YieldEvent):
def__init__(self,sock,data):
self.sock=sock
self.data=data
defhandle_yield(self,sched,task):
sched._write_wait(self.sock.fileno(),self,task)
defhandle_resume(self,sched,task):
nsent=self.sock.send(self.data)
sched.add_ready(task,nsent)
classAcceptSocket(YieldEvent):
def__init__(self,sock):
self.sock=sock
defhandle_yield(self,sched,task):
sched._read_wait(self.sock.fileno(),self,task)
defhandle_resume(self,sched,task):
r=self.sock.accept()
sched.add_ready(task,r)
#Wrapperaroundasocketobjectforusewithyield
classSocket(object):
def__init__(self,sock):
self._sock=sock
defrecv(self,maxbytes):
returnReadSocket(self._sock,maxbytes)
defsend(self,data):
returnWriteSocket(self._sock,data)
defaccept(self):
returnAcceptSocket(self._sock)
def__getattr__(self,name):
returngetattr(self._sock,name)
if__name__=='__main__':
fromsocketimportsocket,AF_INET,SOCK_STREAM
importtime
#Exampleofafunctioninvolvinggenerators.Thisshould
#becalledusingline=yieldfromreadline(sock)
defreadline(sock):
chars=[]
whileTrue:
c=yieldsock.recv(1)
ifnotc:
break
chars.append(c)
ifc==b'\n':
break
returnb''.join(chars)
#Echoserverusinggenerators
classEchoServer:
def__init__(self,addr,sched):
self.sched=sched
sched.new(self.server_loop(addr))
defserver_loop(self,addr):
s=Socket(socket(AF_INET,SOCK_STREAM))
s.bind(addr)
s.listen(5)
whileTrue:
c,a=yields.accept()
print('Gotconnectionfrom',a)
self.sched.new(self.client_handler(Socket(c)))
defclient_handler(self,client):
whileTrue:
line=yieldfromreadline(client)
ifnotline:
break
line=b'GOT:'+line
whileline:
nsent=yieldclient.send(line)
line=line[nsent:]
client.close()
print('Clientclosed')
sched=Scheduler()
EchoServer(('',16000),sched)
sched.run()
这段代码有点复杂。不过,它实现了一个小型的操作系统。有一个就绪的任务队列,并且还有因I/O休眠的任务等待区域。还有很多调度器负责在就绪队列和I/O等待区域之间移动任务。
讨论
在构建基于生成器的并发框架时,通常会使用更常见的yield形式:
defsome_generator():
...
result=yielddata
...
使用这种形式的yield语句的函数通常被称为“协程”。通过调度器,yield语句在一个循环中被处理,如下:
f=some_generator()
#Initialresult.IsNonetostartsincenothinghasbeencomputed
result=None
whileTrue:
try:
data=f.send(result)
result=...dosomecalculation...
exceptStopIteration:
break
这里的逻辑稍微有点复杂。不过,被传给send()的值定义了在yield语句醒来时的返回值。因此,如果一个yield准备在对之前yield数据的回应中返回结果时,会在下一次send()操作返回。如果一个生成器函数刚开始运行,发送一个None值会让它排在第一个yield语句前面。
除了发送值外,还可以在一个生成器上面执行一个close()方法。它会导致在执行yield语句时抛出一个GeneratorExit异常,从而终止执行。如果进一步设计,一个生成器可以捕获这个异常并执行清理操作。同样还可以使用生成器的throw()方法在yield语句执行时生成一个任意的执行指令。一个任务调度器可利用它来在运行的生成器中处理错误。
最后一个例子中使用的yieldfrom语句被用来实现协程,可以被其它生成器作为子程序或过程来调用。本质上就是将控制权透明的传输给新的函数。不像普通的生成器,一个使用yieldfrom被调用的函数可以返回一个作为yieldfrom语句结果的值。关于yieldfrom的更多信息可以在PEP380中找到。
最后,如果使用生成器编程,要提醒你的是它还是有很多缺点的。特别是,你得不到任何线程可以提供的好处。例如,如果你执行CPU依赖或I/O阻塞程序,它会将整个任务挂起直到操作完成。为了解决这个问题,你只能选择将操作委派给另外一个可以独立运行的线程或进程。另外一个限制是大部分Python库并不能很好的兼容基于生成器的线程。如果你选择这个方案,你会发现你需要自己改写很多标准库函数。作为本节提到的协程和相关技术的一个基础背景,可以查看PEP342和“协程和并发的一门有趣课程”
PEP3156同样有一个关于使用协程的异步I/O模型。特别的,你不可能自己去实现一个底层的协程调度器。不过,关于协程的思想是很多流行库的基础,包括gevent,greenlet,StacklessPython以及其他类似工程。
以上就是Python使用生成器代替线程的方法的详细内容,更多关于Python生成器代替线程的资料请关注毛票票其它相关文章!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。