python实现事件驱动
本文实例为大家分享了python实现事件驱动的具体代码,供大家参考,具体内容如下
EventManager事件管理类实现,大概就百来行代码左右。
#encoding:UTF-8
#系统模块
fromQueueimportQueue,Empty
fromthreadingimport*
#################################################
classEventManager:
#----------------------------------------------------------------------
def__init__(self):
"""初始化事件管理器"""
#事件对象列表
self.__eventQueue=Queue()
#事件管理器开关
self.__active=False
#事件处理线程
self.__thread=Thread(target=self.__Run)
#这里的__handlers是一个字典,用来保存对应的事件的响应函数
#其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多
self.__handlers={}
#----------------------------------------------------------------------
def__Run(self):
"""引擎运行"""
whileself.__active==True:
try:
#获取事件的阻塞时间设为1秒
event=self.__eventQueue.get(block=True,timeout=1)
self.__EventProcess(event)
exceptEmpty:
pass
#----------------------------------------------------------------------
def__EventProcess(self,event):
"""处理事件"""
#检查是否存在对该事件进行监听的处理函数
ifevent.type_inself.__handlers:
#若存在,则按顺序将事件传递给处理函数执行
forhandlerinself.__handlers[event.type_]:
handler(event)
#----------------------------------------------------------------------
defStart(self):
"""启动"""
#将事件管理器设为启动
self.__active=True
#启动事件处理线程
self.__thread.start()
#----------------------------------------------------------------------
defStop(self):
"""停止"""
#将事件管理器设为停止
self.__active=False
#等待事件处理线程退出
self.__thread.join()
#----------------------------------------------------------------------
defAddEventListener(self,type_,handler):
"""绑定事件和监听器处理函数"""
#尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList=self.__handlers[type_]
exceptKeyError:
handlerList=[]
self.__handlers[type_]=handlerList
#若要注册的处理器不在该事件的处理器列表中,则注册该事件
ifhandlernotinhandlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
defRemoveEventListener(self,type_,handler):
"""移除监听器的处理函数"""
#读者自己试着实现
#----------------------------------------------------------------------
defSendEvent(self,event):
"""发送事件,向事件队列中存入事件"""
self.__eventQueue.put(event)
########################################################################
"""事件对象"""
classEvent:
def__init__(self,type_=None):
self.type_=type_#事件类型
self.dict={}#字典用于保存具体的事件数据
测试代码
#-------------------------------------------------------------------
#encoding:UTF-8
importsys
fromdatetimeimportdatetime
fromthreadingimport*
fromEventManagerimport*
#事件名称新文章
EVENT_ARTICAL="Event_Artical"
#事件源公众号
classPublicAccounts:
def__init__(self,eventManager):
self.__eventManager=eventManager
defWriteNewArtical(self):
#事件对象,写了新文章
event=Event(type_=EVENT_ARTICAL)
event.dict["artical"]=u'如何写出更优雅的代码\n'
#发送事件
self.__eventManager.SendEvent(event)
printu'公众号发送新文章\n'
#监听器订阅者
classListener:
def__init__(self,username):
self.__username=username
#监听器的处理函数读文章
defReadArtical(self,event):
print(u'%s收到新文章'%self.__username)
print(u'正在阅读新文章内容:%s'%event.dict["artical"])
"""测试函数"""
#--------------------------------------------------------------------
deftest():
listner1=Listener("thinkroom")#订阅者1
listner2=Listener("steve")#订阅者2
eventManager=EventManager()
#绑定事件和监听器响应函数(新文章)
eventManager.AddEventListener(EVENT_ARTICAL,listner1.ReadArtical)
eventManager.AddEventListener(EVENT_ARTICAL,listner2.ReadArtical)
eventManager.Start()
publicAcc=PublicAccounts(eventManager)
timer=Timer(2,publicAcc.WriteNewArtical)
timer.start()
if__name__=='__main__':
test()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。