Python定义一个Actor任务
问题
你想定义跟actor模式中类似“actors”角色的任务
解决方案
actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。事实上,它天生的简单性是它如此受欢迎的重要原因之一。简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。响应这些消息时,它可能还会给其他actor发送更进一步的消息。actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送,也不会接收到一个消息已被处理的回应或通知。
结合使用一个线程和一个队列可以很容易的定义actor,例如:
fromqueueimportQueue fromthreadingimportThread,Event #Sentinelusedforshutdown classActorExit(Exception): pass classActor: def__init__(self): self._mailbox=Queue() defsend(self,msg): ''' Sendamessagetotheactor ''' self._mailbox.put(msg) defrecv(self): ''' Receiveanincomingmessage ''' msg=self._mailbox.get() ifmsgisActorExit: raiseActorExit() returnmsg defclose(self): ''' Closetheactor,thusshuttingitdown ''' self.send(ActorExit) defstart(self): ''' Startconcurrentexecution ''' self._terminated=Event() t=Thread(target=self._bootstrap) t.daemon=True t.start() def_bootstrap(self): try: self.run() exceptActorExit: pass finally: self._terminated.set() defjoin(self): self._terminated.wait() defrun(self): ''' Runmethodtobeimplementedbytheuser ''' whileTrue: msg=self.recv() #SampleActorTask classPrintActor(Actor): defrun(self): whileTrue: msg=self.recv() print('Got:',msg) #Sampleuse p=PrintActor() p.start() p.send('Hello') p.send('World') p.close() p.join()
这个例子中,你使用actor实例的send()方法发送消息给它们。其机制是,这个方法会将消息放入一个队里中,然后将其转交给处理被接受消息的一个内部线程。close()方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor。用户可以通过继承Actor并定义实现自己处理逻辑run()方法来定义新的actor。ActorExit异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求(异常被get()方法抛出并传播出去)。
如果你放宽对于同步和异步消息发送的要求,类actor对象还可以通过生成器来简化定义。例如:
defprint_actor(): whileTrue: try: msg=yield#Getamessage print('Got:',msg) exceptGeneratorExit: print('Actorterminating') #Sampleuse p=print_actor() next(p)#Advancetotheyield(readytoreceive) p.send('Hello') p.send('World') p.close()
讨论
actor模式的魅力就在于它的简单性。实际上,这里仅仅只有一个核心操作send().甚至,对于在基于actor系统中的“消息”的泛化概念可以已多种方式被扩展。例如,你可以以元组形式传递标签消息,让actor执行不同的操作,如下:
classTaggedActor(Actor): defrun(self): whileTrue: tag,*payload=self.recv() getattr(self,'do_'+tag)(*payload) #Methodscorrepondingtodifferentmessagetags defdo_A(self,x): print('RunningA',x) defdo_B(self,x,y): print('RunningB',x,y) #Example a=TaggedActor() a.start() a.send(('A',1))#Invokesdo_A(1) a.send(('B',2,3))#Invokesdo_B(2,3) a.close() a.join()
作为另外一个例子,下面的actor允许在一个工作者中运行任意的函数,并且通过一个特殊的Result对象返回结果:
fromthreadingimportEvent classResult: def__init__(self): self._evt=Event() self._result=None defset_result(self,value): self._result=value self._evt.set() defresult(self): self._evt.wait() returnself._result classWorker(Actor): defsubmit(self,func,*args,**kwargs): r=Result() self.send((func,args,kwargs,r)) returnr defrun(self): whileTrue: func,args,kwargs,r=self.recv() r.set_result(func(*args,**kwargs)) #Exampleuse worker=Worker() worker.start() r=worker.submit(pow,2,3) worker.close() worker.join() print(r.result())
最后,“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。例如,一个类actor对象的send()方法可以被编程让它能在一个套接字连接上传输数据或通过某些消息中间件(比如AMQP、ZMQ等)来发送。
以上就是Python定义一个Actor任务的详细内容,更多关于Pythonactor任务的资料请关注毛票票其它相关文章!