详解python调度框架APScheduler使用
最近在研究python调度框架APScheduler使用的路上,那么今天也算个学习笔记吧!
#coding=utf-8
"""
Demonstrateshowtousethebackgroundschedulertoscheduleajobthatexecuteson3second
intervals.
"""
fromdatetimeimportdatetime
importtime
importos
fromapscheduler.schedulers.backgroundimportBackgroundScheduler
deftick():
print('Tick!Thetimeis:%s'%datetime.now())
if__name__=='__main__':
scheduler=BackgroundScheduler()
scheduler.add_job(tick,'interval',seconds=3)#间隔3秒钟执行一次
scheduler.start()#这里的调度任务是独立的一个线程
print('PressCtrl+{0}toexit'.format('Break'ifos.name=='nt'else'C'))
try:
#Thisisheretosimulateapplicationactivity(whichkeepsthemainthreadalive).
whileTrue:
time.sleep(2)#其他任务是独立的线程执行
print('sleep!')
except(KeyboardInterrupt,SystemExit):
#Notstrictlynecessaryifdaemonicmodeisenabledbutshouldbedoneifpossible
scheduler.shutdown()
print('ExitTheJob!')
非阻塞调度,在指定的时间执行一次
#coding=utf-8
"""
Demonstrateshowtousethebackgroundschedulertoscheduleajobthatexecuteson3second
intervals.
"""
fromdatetimeimportdatetime
importtime
importos
fromapscheduler.schedulers.backgroundimportBackgroundScheduler
deftick():
print('Tick!Thetimeis:%s'%datetime.now())
if__name__=='__main__':
scheduler=BackgroundScheduler()
#scheduler.add_job(tick,'interval',seconds=3)
scheduler.add_job(tick,'date',run_date='2016-02-1415:01:05')#在指定的时间,只执行一次
scheduler.start()#这里的调度任务是独立的一个线程
print('PressCtrl+{0}toexit'.format('Break'ifos.name=='nt'else'C'))
try:
#Thisisheretosimulateapplicationactivity(whichkeepsthemainthreadalive).
whileTrue:
time.sleep(2)#其他任务是独立的线程执行
print('sleep!')
except(KeyboardInterrupt,SystemExit):
#Notstrictlynecessaryifdaemonicmodeisenabledbutshouldbedoneifpossible
scheduler.shutdown()
print('ExitTheJob!')
非阻塞的方式,采用cron的方式执行
#coding=utf-8
"""
Demonstrateshowtousethebackgroundschedulertoscheduleajobthatexecuteson3second
intervals.
"""
fromdatetimeimportdatetime
importtime
importos
fromapscheduler.schedulers.backgroundimportBackgroundScheduler
deftick():
print('Tick!Thetimeis:%s'%datetime.now())
if__name__=='__main__':
scheduler=BackgroundScheduler()
#scheduler.add_job(tick,'interval',seconds=3)
#scheduler.add_job(tick,'date',run_date='2016-02-1415:01:05')
scheduler.add_job(tick,'cron',day_of_week='6',second='*/5')
'''
year(int|str)–4-digityear
month(int|str)–month(1-12)
day(int|str)–dayofthe(1-31)
week(int|str)–ISOweek(1-53)
day_of_week(int|str)–numberornameofweekday(0-6ormon,tue,wed,thu,fri,sat,sun)
hour(int|str)–hour(0-23)
minute(int|str)–minute(0-59)
second(int|str)–second(0-59)
start_date(datetime|str)–earliestpossibledate/timetotriggeron(inclusive)
end_date(datetime|str)–latestpossibledate/timetotriggeron(inclusive)
timezone(datetime.tzinfo|str)–timezonetouseforthedate/timecalculations(defaultstoschedulertimezone)
*anyFireoneveryvalue
*/aanyFireeveryavalues,startingfromtheminimum
a-banyFireonanyvaluewithinthea-brange(amustbesmallerthanb)
a-b/canyFireeverycvalueswithinthea-brange
xthydayFireonthex-thoccurrenceofweekdayywithinthemonth
lastxdayFireonthelastoccurrenceofweekdayxwithinthemonth
lastdayFireonthelastdaywithinthemonth
x,y,zanyFireonanymatchingexpression;cancombineanynumberofanyoftheaboveexpressions
'''
scheduler.start()#这里的调度任务是独立的一个线程
print('PressCtrl+{0}toexit'.format('Break'ifos.name=='nt'else'C'))
try:
#Thisisheretosimulateapplicationactivity(whichkeepsthemainthreadalive).
whileTrue:
time.sleep(2)#其他任务是独立的线程执行
print('sleep!')
except(KeyboardInterrupt,SystemExit):
#Notstrictlynecessaryifdaemonicmodeisenabledbutshouldbedoneifpossible
scheduler.shutdown()
print('ExitTheJob!')
阻塞的方式,间隔3秒执行一次
#coding=utf-8
"""
Demonstrateshowtousethebackgroundschedulertoscheduleajobthatexecuteson3second
intervals.
"""
fromdatetimeimportdatetime
importos
fromapscheduler.schedulers.blockingimportBlockingScheduler
deftick():
print('Tick!Thetimeis:%s'%datetime.now())
if__name__=='__main__':
scheduler=BlockingScheduler()
scheduler.add_job(tick,'interval',seconds=3)
print('PressCtrl+{0}toexit'.format('Break'ifos.name=='nt'else'C'))
try:
scheduler.start()#采用的是阻塞的方式,只有一个线程专职做调度的任务
except(KeyboardInterrupt,SystemExit):
#Notstrictlynecessaryifdaemonicmodeisenabledbutshouldbedoneifpossible
scheduler.shutdown()
print('ExitTheJob!')
采用阻塞的方法,只执行一次
#coding=utf-8
"""
Demonstrateshowtousethebackgroundschedulertoscheduleajobthatexecuteson3second
intervals.
"""
fromdatetimeimportdatetime
importos
fromapscheduler.schedulers.blockingimportBlockingScheduler
deftick():
print('Tick!Thetimeis:%s'%datetime.now())
if__name__=='__main__':
scheduler=BlockingScheduler()
scheduler.add_job(tick,'date',run_date='2016-02-1415:23:05')
print('PressCtrl+{0}toexit'.format('Break'ifos.name=='nt'else'C'))
try:
scheduler.start()#采用的是阻塞的方式,只有一个线程专职做调度的任务
except(KeyboardInterrupt,SystemExit):
#Notstrictlynecessaryifdaemonicmodeisenabledbutshouldbedoneifpossible
scheduler.shutdown()
print('ExitTheJob!')
采用阻塞的方式,使用cron的调度方法
#coding=utf-8
"""
Demonstrateshowtousethebackgroundschedulertoscheduleajobthatexecuteson3second
intervals.
"""
fromdatetimeimportdatetime
importos
fromapscheduler.schedulers.blockingimportBlockingScheduler
deftick():
print('Tick!Thetimeis:%s'%datetime.now())
if__name__=='__main__':
scheduler=BlockingScheduler()
scheduler.add_job(tick,'cron',day_of_week='6',second='*/5')
'''
year(int|str)–4-digityear
month(int|str)–month(1-12)
day(int|str)–dayofthe(1-31)
week(int|str)–ISOweek(1-53)
day_of_week(int|str)–numberornameofweekday(0-6ormon,tue,wed,thu,fri,sat,sun)
hour(int|str)–hour(0-23)
minute(int|str)–minute(0-59)
second(int|str)–second(0-59)
start_date(datetime|str)–earliestpossibledate/timetotriggeron(inclusive)
end_date(datetime|str)–latestpossibledate/timetotriggeron(inclusive)
timezone(datetime.tzinfo|str)–timezonetouseforthedate/timecalculations(defaultstoschedulertimezone)
*anyFireoneveryvalue
*/aanyFireeveryavalues,startingfromtheminimum
a-banyFireonanyvaluewithinthea-brange(amustbesmallerthanb)
a-b/canyFireeverycvalueswithinthea-brange
xthydayFireonthex-thoccurrenceofweekdayywithinthemonth
lastxdayFireonthelastoccurrenceofweekdayxwithinthemonth
lastdayFireonthelastdaywithinthemonth
x,y,zanyFireonanymatchingexpression;cancombineanynumberofanyoftheaboveexpressions
'''
print('PressCtrl+{0}toexit'.format('Break'ifos.name=='nt'else'C'))
try:
scheduler.start()#采用的是阻塞的方式,只有一个线程专职做调度的任务
except(KeyboardInterrupt,SystemExit):
#Notstrictlynecessaryifdaemonicmodeisenabledbutshouldbedoneifpossible
scheduler.shutdown()
print('ExitTheJob!')
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。