django-celery动态添加定时任务

为了使用celery替代crontab并做到实时添加定时任务的效果,需要使用django-celery,效果如下图,

来自:https://www.caktusgroup.com/blog/2014/06/23/scheduling-tasks-celery/

要使用django-celery,需要安装python的以下包:django,celery,django-celery。其中django安装比较麻烦,首先它和python版本相关,django1.7.9和1.8.3都是支持python 2.7,所以打算安装这俩版本中其一;其次,django需要sqlite2或sqlite3的支持,而python 2.7中sqlite是已经包含在内、却没有编译的,因此先要安装sqlite、然后重新安装python
2.7。整个开发环境安装和配置如下

一. 环境配置

首先安装sqlite3。先手动从http://www.sqlite.org/download.html下载软件包sqlite-autoconf-3081002.tar.gz(也可直接使用wget),

tar -xzvf sqlite-autoconf-3081002.tar.gz

./configure --prefix=/home/panxiaofeng/install  #指定安装目录

make

make install

接着重新安装python 2.7,首先,

tar -zxf Python-2.7.9.tgz

cd ~/tools/Python-2.7.9

修改setup.py,

sqlite_inc_paths = [ ‘/usr/include‘,

‘/usr/include/sqlite‘,

‘/usr/include/sqlite3‘,

‘/usr/local/include‘,

‘/usr/local/include/sqlite‘,

‘/usr/local/include/sqlite3‘,

‘~/install/include‘,

‘~/install/include/sqlite3‘,

]

也就是加上sqlite3的路径,然后,

./configure --prefix=/home/panxiaofeng/install/ && make -j4 && make install  #4核并行编译

ln -s ~/install/bin/python ~/bin/python

完成。

测试安装成功否,

~/bin/python

>>>import sqlite3

>>>

没报错即可。

配置python开发环境,不仅要安装python 2.7,还需要安装一些工具,

unzip setuptools-12.0.3.zip 2>/dev/null 1>/dev/null

cd ~/tools/setuptools-12.0.3

~/bin/python setup.py build install  #用哪个python来安装包,包就会安装到哪个python的目录下,其他版本的python无法使用该包

tar -zxf pip-6.0.6.tar.gz

cd ~/tools/pip-6.0.6

~/bin/python setup.py build install

ln -s ~/install/bin/pip ~/bin/pip

其中pip工具用来安装python包很方便,但是默认的源下载很慢经常超时,需要采用一些方法加速,具体可以参考http://blog.csdn.net/sasoritattoo/article/details/10020547 。本文安装加速的手段采用了最简单的换源:

sudo ~/bin/pip install -i http://pypi.douban.com/simple django,用豆瓣的源,暴快!注意,用哪个python安装的pip工具来安装包,包就会安装到哪个python的目录下,其他版本的python无法使用该包。然后,celery和django-celery也采用该源来安装,快得很,不像之前用默认的源,很卡很慢还经常失败。

记录一下,之前也手动安装过Django,手动安装唯一需要注意的一点就是sudo python setup.py时搞清楚你用的是哪个版本的python,不要搞错版本了,如果默认的python不能用,就自己指定,比如,sudo /usr/local/bin/python2.7
setup.py install。还有手动安装比较利于卸载包:

setup.py 帮助你纪录安装细节方便你卸载

python setup.py install --record log

这时所有的安装细节都写到 log 里了

想要卸载的时候

cat log | xargs rm -rf

就可以干净卸载了

最后记录下解决pip下载失败的一些可能有用的方法:

出现Bad md5 hash for package https://here/package/path错误时,尝试,

1. You can use wheels for installing python packages: pip install wheel. Then try to pip the package you want again.

2. pip失败后,会看到你需要的包信息,那么直接下载安装,如下面的例子,

[[email protected] conn]# pip install chardet==2.2.1

Collecting chardet==2.2.1

/usr/lib/python2.6/site-packages/pip/_vendor/requests/packages/urllib3/util/ssl_.py:79: InsecurePlatformWarning: A true SSLContext object is not available. This prevents urllib3 from configuring SSL appropriately and may cause certain SSL connections to fail.
For more information, see https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning.

InsecurePlatformWarning

Downloading chardet-2.2.1-py2.py3-none-any.whl (180kB)

72% |███████████████████████▎        | 131kB 357bytes/s eta 0:02:19

Hash of the package https://pypi.python.org/packages/2.7/c/chardet/chardet-2.2.1-py2.py3-none-any.whl#md5=556de73cc5d2d14233b3512798423da1 (from https://pypi.python.org/simple/chardet/) (be001cd2dbe90bb1f1dd4ab4b008c169) doesn‘t match the expected hash 556de73cc5d2d14233b3512798423da1!

Bad md5 hash for package https://pypi.python.org/packages/2.7/c/chardet/chardet-2.2.1-py2.py3-none-any.whl#md5=556de73cc5d2d14233b3512798423da1 (from https://pypi.python.org/simple/chardet/)

解决方案:

wget https://pypi.python.org/packages/2.7/c/chardet/chardet-2.2.1-py2.py3-none-any.whl --no-check-certificate

md5sum chardet-2.2.1-py2.py3-none-any.whl

pip install chardet-2.2.1-py2.py3-none-any.whl

二. 简单测试django

~/bin/python ~/install/bin/django-admin.py startproject djtest   #使用上面安装了相关包的python

cd djtest

~/bin/python manage.py runserver 0.0.0.0:10501

然后最便打开一个浏览器输入:10.121.84.90:10501/,得到响应的页面就成功了。10.121.84.90是执行上面命令的主机ip,即django server的主机ip。

~/bin/python manage.py createsuperuser来创建admin账户。

三. django-celery替代crontab

测试:

测试djcelery的动态添加任务函数并crontab部署的功能。目录结构,

djtest

├── apps

│   ├── app1

│   │   ├── __init__.py

│   │   └── tasks.py

│   ├── app2

│   │   ├── __init__.py

│   │   └── tasks.py

│   ├── __init__.py

│   ├── tasks.py

├── djtest

│   ├── celery.py

│   ├── __init__.py

│   ├── settings.py

│   ├── urls.py

│   ├── wsgi.py

└── manage.py

其中manage.py、__init__.py、settings.py、urls.py、wsgi.py是~/bin/python
~/install/bin/django-admin.py startproject djtest命令自动生成的,但部分文件需要修改。首先修改settings.py,在里面添加,


import djcelery  ###

djcelery.setup_loader()  ###

CELERY_TIMEZONE=‘Asia/Shanghai‘  #并没有北京时区,与下面TIME_ZONE应该一致

BROKER_URL=‘redis://10.121.84.90:16379/8‘  #任何可用的redis都可以,不一定要在django server运行的主机上

CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘  ###

# Application definition

 

INSTALLED_APPS = (

    ‘django.contrib.admin‘,

    ‘django.contrib.auth‘,

    ‘django.contrib.contenttypes‘,

    ‘django.contrib.sessions‘,

    ‘django.contrib.messages‘,

    ‘django.contrib.staticfiles‘,

    ‘djcelery‘,    ###

    ‘apps‘,     ###

)




TIME_ZONE=‘Asia/Shanghai‘  ###

上面代码首先导出djcelery模块,并调用setup_loader方法加载有关配置;注意配置时区,不然默认使用UTC时间会比东八区慢8个小时。其中INSTALLED_APPS末尾添加两项,分别表示添加celery服务和自己定义的apps服务。接下来编写celery.py文件,


#!/bin/python

from __future__ import absolute_import

 

import os

 

from celery import Celery

 

os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘djtest.settings‘)

#Specifying the settings here means the celery command line program will know where your Django project is. 

#This statement must always appear before the app instance is created, which is what we do next: 

from django.conf import settings

 

app = Celery(‘djtest‘)

 

app.config_from_object(‘django.conf:settings‘)

#This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.


#You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.


app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

#With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module.

#The tasks.py should be in dir which is added to INSTALLED_APP in settings.py. 

#So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py.

 

@app.task(bind=True)

def debug_task(self):

    print(‘Request: {0!r}‘.format(self.request))  #dumps its own request information

然后修改djtest/__init__.py,


#!/bin/python

from __future__ import absolute_import

 

# This will make sure the app is always imported when

# Django starts so that shared_task will use this app.

from .celery import app as celery_app

接下来编写你希望django去完成的app,本文中要编写的就是在INSTALLED_APPS中注册的apps。在celery.py中设定了对settings.py中INSTALLED_APPS做autodiscover_tasks,本文希望apps中能够接受这样的目录组织:所有的app都可以放到apps下面,而且每个app都有独立的目录,就和上面的app1、app2一样,每个app各自有各自的__init__.py和tasks.py(注意,每个app都需要__init__.py文件,可以是空白的)。但是这样的结构组织在启动时会报错说module
apps找不到。然后在apps下增加了一个__init__.py文件,这时报错没了,但是apps下每个app的tasks.py中的任务函数还是无法被django和celery worker找到。

然后尝试了在apps下面写一个__init__.py(空白)和task.py,所有的task function都写到tasks.py中,如下,


from __future__ import absolute_import

 

from celery import task

 

from celery import shared_task

 

#from celery.task import tasks 

#from celery.task import Task 

 

@task()

#@shared_task

def add(x, y):

    print "%d + %d = %d"%(x,y,x+y)

    return x+y

#class AddClass(Task):

#    def run(x,y):

#        print "%d + %d = %d"%(x,y,x+y)

#        return x+y

#tasks.register(AddClass)

 

@shared_task

def mul(x, y):

    print "%d * %d = %d"%(x,y,x*y)

    return x*y

 

 

@shared_task

def sub(x, y):

    print "%d - %d = %d"%(x,y,x-y)

    return x-y

上面代码中,decorator可以用@task()也可以用@shared_task。然后,同步django数据库,

~/bin/python manage.py makemigrations

~/bin/python manage.py migrate

运行django web server和celery beat、celery worker,

~/bin/python manage.py createsuperuser   #如果没管理员账号,先建一个

~/bin/python manage.py runserver 0.0.0.0:10501    #重新指定server使用的端口,0.0.0.0即运行该命令的主机ip: 10.121.84.90

~/bin/python manage.py celery beat

~/bin/python manage.py celery worker -c 6 -l debug   #正式上线-l改成info

~/bin/celery flower --port=10201 --broker=redis://10.121.84.90:16379/8      #如果必要开个flower,从而通过网页10.121.84.90:10201来监控任务情况

这时候,在浏览器输入10.121.84.90:10501/admin/ 可以登录管理员界面,

在该界面可以点Periodic tasks来添加crontab任务,

点击右上角的Add periodic task,

在该页面上可以通过上图中的Task(registered)或者Task(custom)来增加crontab任务,前者会在下拉列表中显示apps/tasks.py里定义的task function。除了动态增加、删除任务,上面的Enabled选项可以用来暂停任务。这时,在django
web server和celery beat、celery worker都启动着的情况下,修改apps/tasks.py文件,往其中增减task function,会实时反映在上图中Task(registered)的下拉列表上,验证了django的autodiscover_tasks功能的确实有效的。但是在不重启celery
worker的情况下,添加该task function的crontab任务,celery worker无法调用该task function,从而报错,

也就是说django的确可以autodiscover_tasks,并且celery beat也会准确无误的发布动态增加的任务消息,只是celery worker无法动态调用新的task function(这是不是说django的autodiscover_tasks功能只管django而不管celery
worker,从而只能看到而无法执行,其实并没有什么卵用?)。。。然后验证Task(custom)选项的作用:我自己写一个app3,放在apps同级目录和apps的子目录各自试验,不过不往settings.py中INSTALLED_TASKS中注册子目录(注册了不就是Task(registered)了吗),然后Add
periodic task时在Task(custom)填写"apps.app3.tasks.function"或者"app3.tasks.function"或者"目录的绝对路径.tasks.function",celery
beat会发布相关的任务消息,但是celery worker无法调用function(这就表示,想绕开INSTALLED_APPS和celery_worker是不行的,Task(custom)似乎没有什么卵用)。。。总的来说,就是Task(registered)或者Task(custom)并不能做到动态添加task
function并部署该task function的相关任务,虽然官方文档似乎表示可以。

现在能做到的就是,需要的task function写好,然后通过上面的例子可以动态的部署写好的task function的crontab任务(只可以动态传参数),似乎有点寒碜,不过勉强算是够用了,只要把task function写得通用一点,比如说开一个进程去调用用户自己编写的脚本。上面提到的失败的尝试,今后有空再去探索吧。

后续:

(一)

经过实验,celery worker使用--autoreload选项启动可以解决上面apps/tasks.py中新增、减函数无法被worker检测到的问题,具体见http://docs.celeryproject.org/en/latest/userguide/workers.html#autoreloading。不过该功能属于实验阶段,官方不推荐使用,而且,

When running the worker with the --autoreload option,
it correctly reloads if the tasks.py module
is changed. But if I change a module that is imported and used by the tasks module, nothing is reloaded and the worker still uses the old code. Currently
autoreloader doesn‘t monitor the dependency tree. 来源: <https://github.com/celery/celery/issues/1025>

不过,用来实现apps/tasks.py里面增减函数、自动被worker检测,是足够了的。在原生的celery中,--autoreload要搭配CELERY_IMPORT或include设定来用(autoreload这两项设定中的module文件),不过在django中,INSTALLED_APPS设定可以用来替代CELERY_IMPORT,所以django中可以使celery
worker autoreload INSTALLED_APPS设定目录下的tasks.py文件。最终,django的autodiscover_tasks用来使django
admin页面动态检测apps/tasks.py里task function的变化,celery的autoreload用来使celery worker动态检测apps/tasks.py里task function的变化,从而实现动态增减task
function并正确执行的功能。注意,动态新增的task function的decorator使用@tasks会报"NotRegistered: u‘apps.tasks.funcName‘"的错误,用@shared_task才可以正确被worker执行。不少问题出在celery
worker上,建议看看Workers Guide: http://docs.celeryproject.org/en/latest/userguide/workers.html#autoreloading

之前的尝试中,apps下每个app一个子目录,如果只在INSTALLED_APPS中添加"apps"这一项而不注册子目录,django无法autodiscover_tasks到子目录(google出来的例子是每个子目录都注册),看来autodiscover_tasks只支持注册目录下tasks.py文件的监测、而不支持注册目录下子目录的tasks.py文件的监测(如果支持,那么celery
worker的--autoreload也可以检测到子目录的tasks.py文件,那么apps子目录支持的问题就解决了)。不过,要实现apps下每个app一个子目录,可以考虑换种途径:添加新的子目录时动态修改settings.py中的INSTALLED_APPS,因为django似乎有支持这种操作的函数(参考笔记./【转】python
- dynamically loading django apps at runtime
,每增加一次调用一次用来增加的代码)。实验等有空再做。

综上所述,现在动态修改apps/tasks.py、添加apps目录下新的任务子目录(子目录目录结构和最上面app1、app2一样)的功能都可以了。唯一没有起作用的就是django
admin页面上的Task(custom)选项,该选项设计的初衷想必是:你写一个类似app1的app,直接通过该选项就可以调用。想法很好,只是目前来看,这个设计绕过了INSTALLED_APPS设定,似乎并不能被django和celery
worker检测到新增的app,起不了作用。

(二)

django正式上线后,需要将settings.py中的"DEBUG = True"注释掉免得内存泄漏,同时在下面加上"ALLOWED_HOSTS = [‘10.121.84.90‘]",使得admin网页可用(10.121.84.90是django
server运行的主机ip)。这时候admin网页会比较丑陋而且有些功能不正常,因为非debug模式下django
server不会帮忙处理静态文件,解决方案是在启动django server的时候添加--insecure选项(http://stackoverflow.com/questions/5836674/why-does-debug-false-setting-make-my-django-static-files-access-fail ),使用该选项要确定settings.py的INSTALLED_APPS中有‘django.contrib.staticfiles‘。

参考:

First steps with Django:http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html

Django+Celery实现周期任务:http://m.blog.csdn.net/blog/zhenxuhit/44302809

djcelery入门:实现运行定时任务!!!:http://my.oschina.net/kinegratii/blog/292395

Django中如何使用django-celery完成异步任务 (1):http://www.weiguda.com/blog/73/

Configuring the webserver and worker:http://www.syncano.com/configuring-running-django-celery-docker-containers-pt-1/

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-09-29 20:08:24

django-celery动态添加定时任务的相关文章

django数据库动态添加列

方法一: python manage.py migrate 方法二: python manage.py migrate 效果: ---〉

django+celery配置(定时任务)

下面介绍一下django+celery的配置做定时任务 1.首先介绍一下环境和版本 python==2.7 django == 1.8.1 celery == 3.1.23 django-celery == 3.1.17 2.celery的安装   sudo pip install celery==3.1.23 sudo pip install django-celery==3.1.17 3.新建一个项目 (1)django-admin startproject django_celery_de

任务调度开源框架Quartz动态添加、修改和删除定时任务

Quartz 是个开源的作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制.Quartz框架包含了调度器监听.作业和触发器监听.你可以配置作业和触发器监听为全局监听或者是特定于作业和触发器的监听.Quartz 允许开发人员根据时间间隔(或天)来调度作业.它实现了作业和触发器的多对多关系,还能把多个作业与不同的触发器关联.整合了 Quartz 的应用程序可以重用来自不同事件的作业,还可以为一个事件组合多个作业.并且还能和Spring配置整合使用.Quartz在功能上远远超越

Quartz 2.2 动态添加、修改和删除定时任务

QuartzManager.Java 动态添加.修改和删除定时任务管理类 import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import

Quartz动态添加、修改和删除定时任务

任务调度开源框架Quartz动态添加.修改和删除定时任务 Quartz 是个开源的作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制.Quartz框架包含了调度器监听.作业和触发器监听.你可以配置作业和触发器监听为全局监听或者是特定于作业和触发器的监听.Quartz 允许开发人员根据时间间隔(或天)来调度作业.它实现了作业和触发器的多对多关系,还能把多个作业与不同的触发器关联.整合了 Quartz 的应用程序可以重用来自不同事件的作业,还可以为一个事件组合多个作业.并且还

结合Django+celery二次开发定时周期任务

需求: 前端时间由于开发新上线一大批系统,上完之后没有配套的报表系统.监控,于是乎开发.测试.产品.运营.业务部.财务等等各个部门就跟那饥渴的饿狼一样需要 各种各样的系统数据满足他们.刚开始一天一个还能满足他们,优化脚本之后只要开发提供查询数据的SQL.收件人.执行时间等等参数就可以几分钟写完一个定时任务脚本 ,到后面不知道是不是吃药了一天三四个定时任务,不到半个月手里一下就20多个定时任务了,渐渐感到力不从心了,而且天天还要给他们修改定时任务的SQL.收件人.执 行时间等等,天天写定时任务脚本

django+celery+redis环境搭建

初次尝试搭建django+celery+redis环境,记录下来,慢慢学习~ 1.安装apache 下载httpd-2.0.63.tar.gz,解压tar zxvf httpd-2.0.63.tar.gz,cd httpd-2.0.63, ./configure --prefix=/usr/local/apache --enable-mods=all --enable-cache --enable-mem-cache --enable-file-cache --enable-rewrite(这一

一个Spring Scheduler (Quartz) 动态添加,删除,修改任务的例子

要求 根据任务清单执行不定数量,不定频率的定时任务.并且需要动态添加,删除,修改任务. 代码 public class JobExample {      public static void main(String[] args) throws Exception {         // Job的配置信息,可以从数据库或配置文件中获取         List<JobParameter> list = new ArrayList<JobParameter>();        

Quartz任务调度[Spring+Quartz结合]_实现任务的动态添加、修改和删除

项目框架图 下面开始贴代码了,不过先贴数据库^^ -- Create table Oracle数据库 create table QUARTZ_SCHEDULEJOB ( id VARCHAR2(32), job_name VARCHAR2(32) not null, job_status NVARCHAR2(3) default 0 not null, cron_expression NVARCHAR2(32) not null, concurrent NVARCHAR2(3) default