如何在Django中设置定时任务的方法示例
Django作为后端Web开发框架,有时候我们需要用到定时任务来或者固定频次的任务来执行某段代码,这时我们就要用到Celery了。Django中有一个中间件:Django-celery
环境:
- Python3.6
- Django为小于1.8版本
- Celery为3.1版本
第一步安装:django-celery
pipinstalldjango-celery
第二步:配置celery和任务
创建测试django环境:
django-admin.pycreateprojecttest django-admin.pystartappdemo
创建好的项目布局如下:
-proj/ -manage.py -proj/ -__init__.py -celery.py -settings.py -urls.py -demo/ -migrations -__init__.py -admin.py -apps.py -models.py -tasks.py -tests.py -views.py
2.1配置celery.py文件
需要替换的内容,我都在对应的行后提示了,剩下的内容默认就好
创建test/test/celery.py文件,内容如下:
from__future__importabsolute_import,unicode_literals importos fromceleryimportCelery #setthedefaultDjangosettingsmoduleforthe'celery'program. os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')#“proj.settings”替换为你的项目信息:test.settings app=Celery('proj')#这里的proj替换为你的项目名称:test #Usingastringheremeanstheworkerdoesn'thavetoserialize #theconfigurationobjecttochildprocesses. #-namespace='CELERY'meansallcelery-relatedconfigurationkeys #shouldhavea`CELERY_`prefix. app.config_from_object('django.conf:settings',namespace='CELERY') #LoadtaskmodulesfromallregisteredDjangoappconfigs. app.autodiscover_tasks() @app.task(bind=True) defdebug_task(self): print('Request:{0!r}'.format(self.request))
2.2配置项目的init.py中配置celery内容
打开test/test/__init_.py文件,添加内容:
from__future__importabsolute_import,unicode_literals #Thiswillmakesuretheappisalwaysimportedwhen #Djangostartssothatshared_taskwillusethisapp. from.celeryimportappascelery_app __all__=('celery_app',)
2.3在task.py中添加计划任务
编辑test/demo/task.py文件,添加计划任务,内容如下:
#Createyourtaskshere from__future__importabsolute_import,unicode_literals fromceleryimportshared_task @shared_task defadd(x,y): returnx+y @shared_task defmul(x,y): returnx*y @shared_task defxsum(numbers): returnsum(numbers)
第三步:任务执行
运行django项目:pythonmanage.pyrunserver
3.1后台添加计划任务
访问“http://localhost:8000/admin/”,在celery的管理页面里,选择Periodictasks,进行任务添加。选择对应的任务,设置定时或者周期时间
3.2启动定时的celery服务
注意:celery依赖redis服务,需要提前运行redis服务:`redis-server`
#以下两个命令在不同的shell窗口里执行,需要在django的目录下 pythonmanager.pycelerybeat-linfo#接收定时任务的命令 pythonmanager.pyceleryworker-linfo#执行定时任务的命令,此shell窗口会看到任务的输入信息
3.3启动单次的celery服务
注意:celery依赖redis服务,需要提前运行redis服务:`redis-server`
pythonmanager.pyshell#进到django的shell里 fromdemo.taskimportmul,xsum#导入task任务 a=mul() b=xsum() #执行a,b会输出信息 a(1,2) b(1)
PS:django-crontab实现Django定时任务
django-crontab安装:
pipinstalldjango-crontab
django-crontab加入:只需要将django-crontab加入到settings.py的INSTALLED_APPS即可。如下代码:
INSTALLED_APPS=( 'django_crontab', ... )
django-crontab配置:settings.py中加入django-crontab的命令即可:
CRONJOBS=[ ('4711***','django.core.management.call_command',['closepoll'],{},'>>/var/run.log'), ]
格式:
参数1:定时例如4711***表示每天的11时47分执行
参数2:方法的python模块路径,如果执行django-admin命令,则写django.core.management.call_command
参数3:方法的位置参数列表(默认值:[]),如果执行django-admin命令,则填写所需执行的命令,例如我们在polls中已经定义过的closepoll
参数4:方法的关键字参数的dict(默认值:{})
参数5:执行log存放位置(即重定向到文件,默认:'')
django-crontab任务加载:
django-crontab任务加载比较简单,只需要运行pythonmanage.pycrontabadd即可
查看已经激活的任务使用pythonmanage.pycrontabshow
删除已经有的任务使用pythonmanage.pycrontabremove
如果你修改了任务记得一定要使用pythonmanage.pycrontabadd这个会更新定时任务
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。