python异步实现定时任务和周期任务的方法
一.如何调用
deff1(arg1,arg2):
print('f1',arg1,arg2)
deff2(arg1):
print('f2',arg1)
deff3():
print('f3')
deff4():
print('周期任务',int(time.time()))
timer=TaskTimer()
#把任务加入任务队列
timer.join_task(f1,[1,2],timing=15.5)#每天15:30执行
timer.join_task(f2,[3],timing=14)#每天14:00执行
timer.join_task(f3,[],timing=15)#每天15:00执行
timer.join_task(f4,[],interval=10)#每10秒执行1次
#开始执行(此时才会创建线程)
timer.start()
f1~f4是我们需要定时执行的函数。
首先创建TaskTimer对象(TaskTimer的代码在下面)。调用join_task函数,把需要执行的函数加入到任务队列。最后调用start,任务就开始执行了。
join_task参数:
fun:需要执行的函数
arg:fun的参数,如果没有就传一个空列表
interval:如果有此参数,说明任务是周期任务,单位为秒(注意interval最少5秒)
timing:如果有此参数,说明任务是定时任务,单位为时
注意:interval和timing只能选填1个
二.源码
importdatetime
importtime
fromthreadingimportThread
fromtimeimportsleep
classTaskTimer:
__instance=None
def__new__(cls,*args,**kwargs):
"""
单例模式
"""
ifnotcls.__instance:
cls.__instance=object.__new__(cls)
returncls.__instance
def__init__(self):
ifnothasattr(self,'task_queue'):
setattr(self,'task_queue',[])
ifnothasattr(self,'is_running'):
setattr(self,'is_running',False)
defwrite_log(self,level,msg):
cur_time=datetime.datetime.now()
withopen('./task.log',mode='a+',encoding='utf8')asfile:
s="["+str(cur_time)+"]["+level+"]"+msg
print(s)
file.write(s+"\n")
defwork(self):
"""
处理任务队列
"""
whileTrue:
fortaskinself.task_queue:
iftask['interval']:
self.cycle_task(task)
eliftask['timing']:
self.timing_task(task)
sleep(5)
defcycle_task(self,task):
"""
周期任务
"""
iftask['next_sec']<=int(time.time()):
try:
task['fun'](*task['arg'])
self.write_log("正常","周期任务:"+task['fun'].__name__+"已执行")
exceptExceptionase:
self.write_log("异常","周期任务:"+task['fun'].__name__+"函数内部异常:"+str(e))
finally:
task['next_sec']=int(time.time())+task['interval']
deftiming_task(self,task):
"""
定时任务
"""
#今天已过秒数
today_sec=self.get_today_until_now()
#到了第二天,就重置任务状态
iftask['today']!=self.get_today():
task['today']=self.get_today()
task['today_done']=False
#第一次执行
iftask['first_work']:
iftoday_sec>=task['task_sec']:
task['today_done']=True
task['first_work']=False
else:
task['first_work']=False
#今天还没有执行
ifnottask['today_done']:
iftoday_sec>=task['task_sec']:#到点了,开始执行任务
try:
task['fun'](*task['arg'])
self.write_log("正常","定时任务:"+task['fun'].__name__+"已执行")
exceptExceptionase:
self.write_log("异常","定时任务:"+task['fun'].__name__+"函数内部异常:"+str(e))
finally:
task['today_done']=True
iftask['first_work']:
task['first_work']=False
defget_today_until_now(self):
"""
获取今天凌晨到现在的秒数
"""
i=datetime.datetime.now()
returni.hour*3600+i.minute*60+i.second
defget_today(self):
"""
获取今天的日期
"""
i=datetime.datetime.now()
returni.day
defjoin_task(self,fun,arg,interval=None,timing=None):
"""
interval和timing只能存在1个
:paramfun:你要调用的任务
:paramarg:fun的参数
:paraminterval:周期任务,单位秒
:paramtiming:定时任务,取值:[0,24)
"""
#参数校验
if(interval!=Noneandtiming!=None)or(interval==Noneandtiming==None):
raiseException('interval和timing只能选填1个')
iftimingandnot0<=timing<24:
raiseException('timing的取值范围为[0,24)')
ifintervalandinterval<5:
raiseException('interval最少为5')
#封装一个task
task={
'fun':fun,
'arg':arg,
'interval':interval,
'timing':timing,
}
#封装周期或定时任务相应的参数
iftiming:
task['task_sec']=timing*3600
task['today_done']=False
task['first_work']=True
task['today']=self.get_today()
elifinterval:
task['next_sec']=int(time.time())+interval
#把task加入任务队列
self.task_queue.append(task)
self.write_log("正常","新增任务:"+fun.__name__)
defstart(self):
"""
开始执行任务
返回线程标识符
"""
ifnotself.is_running:
thread=Thread(target=self.work)
thread.start()
self.is_running=True
self.write_log("正常","TaskTimer已开始运行!")
returnthread.ident
self.write_log("警告","TaskTimer已运行,请勿重复启动!")
以上这篇python异步实现定时任务和周期任务的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。