Django实现celery定时任务过程解析
1.首先在项目同名目录下建一个celery.py
from__future__importabsolute_import importos fromceleryimportCelery fromdatetimeimporttimedelta fromkombuimportQueue #setthedefaultDjangosettingsmoduleforthe'celery'program. os.environ.setdefault('DJANGO_SETTINGS_MODULE','OpsManage.settings') fromdjango.confimportsettings app=Celery('OpsManage') #Usingastringheremeanstheworkerwillnothaveto #pickletheobjectwhenusingWindows. #配置celery classConfig: BROKER_URL='amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND='redis://localhost:6379' CELERY_ACCEPT_CONTENT=['application/json'] CELERY_TASK_SERIALIZER='json' CELERY_RESULT_SERIALIZER='json' CELERY_TASK_RESULT_EXPIRES=60*60 CELERY_TIMEZONE='Asia/Shanghai' CELERY_ENABLE_UTC=True CELERY_ANNOTATIONS={'*':{'rate_limit':'500/s'}} CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler' app.config_from_object(Config) #到各个APP里自动发现tasks.py文件 app.autodiscover_tasks() #crontabconfig app.conf.update( CELERYBEAT_SCHEDULE={ #每隔30s执行一次函数 'every-30-min-add':{ 'task':'apps.tasks.celery_assets.push_host_by_salt_tasks', 'schedule':timedelta(seconds=30) ##每天凌晨12点 #'schedule':crontab(minute=0,hour=0) }, }, ) #kombu:Celery自带的用来收发消息的库,提供了符合Python语言习惯的,使用AMQP协议的高级接口 Queue('transient',routing_key='transient',delivery_mode=1)
2.在settings.py里配置celery
INSTALLED_APPS=[ ...... 'django_celery_beat', 'django_celery_results', ]
3.在项目同名目录下的__init__.py文件里申明celery任务,记得要去检测呀
#coding:utf-8 from__future__importabsolute_import,unicode_literals #Thiswillmakesuretheappisalwaysimportedwhen #Djangostartssothatshared_taskwillusethisapp. fromceleryimportappascelery_app __all__=['celery_app'] importpymysql pymysql.install_as_MySQLdb()
4.在task.py里执行任务的函数上加@
fromceleryimporttask #定时任务 @task defpush_host_by_salt_tasks(): “”“balabala”“” return'这里是定时任务'
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。