Django中如何使用django-celery完成异步任务

本篇博文主要介绍在开发环境中的celery使用,请勿用于部署服务器.

许多Django应用需要执行异步任务, 以便不耽误http request的执行. 我们也可以选择许多方法来完成异步任务, 使用Celery是一个比较好的选择, 因为Celery有着大量的社区支持, 能够完美的扩展, 和Django结合的也很好. Celery不仅能在Django中使用, 还能在其他地方被大量的使用. 因此一旦学会使用Celery, 我们可以很方便的在其他项目中使用它.

1. Celery版本

本篇博文主要针对Celery 3.0.x. 早期版本的Celery可能有细微的差别.

2. Celery介绍

Celery的主要用处是执行异步任务, 可以选择延期或定时执行功能. 为什么需要执行异步任务呢?

第一, 假设用户正发起一个request, 并等待request完成后返回. 在这一request后面的view功能中, 我们可能需要执行一段花费很长时间的程序任务, 这一时间可能远远大于用户能忍受的范围. 当这一任务并不需要立刻执行时, 我们便可以使用Celery在后台执行, 而不影响用户浏览网页. 当有任务需要访问远程服务器完成时, 我们往往都无法确定需要花费的时间.

第二则是定期执行某些任务. 比如每小时需要检查一下天气预报, 然后将数据储存到数据库中. 我们可以编写这一任务, 然后让Celery每小时执行一次. 这样我们的web应用便能获取最新的天气预报信息.

我们这里所讲的任务task, 就是一个Python功能(function). 定期执行一个任务可以被认为是延时执行该功能. 我们可以使用Celery延迟5分钟调用function task1, 并传入参数(1, 2, 3). 或者我们也可以每天午夜运行该function.

我们偏向于将Celery放入项目中, 便于task访问统一数据库和Django设置.

当task准备运行时, Celery会将其放入列队queue中. queue中储存着可以运行的task的list. 我们可以使用多个queue, 但为了简单, 这里我们只使用一个.

将任务task放入queue就像加入todo list一样. 为了使task运行, 我们还需要在其他线程中运行的苦工worker. worker实时观察着代运行的task, 并逐一运行这些task. 你可以使用多个worker, 通常他们位于不同服务器上. 同样为了简单起见, 我们这只是用一个worker.

我们稍后会讨论queue, worker和另外一个十分重要的进程, 接下来我们来动动手:

3. 安装Celery

我们可以使用pip在vietualenv中安装:

    pip install django-celery

4. Django设置

我们暂时使用django runserver来启动celery. 而Celery代理人(broker), 我们使用Django database broker implementation. 现在我们只需要知道Celery需要broker, 使用django自身便可以充当broker. (但在部署时, 我们最好使用更稳定和高效的broker, 例如Redis.)

在settings.py中:

    import djcelery
    djcelery.setup_loader()
    BROKER_URL = ‘django://‘
    ...
    INSTALLED_APPS = (
       ...
       ‘djcelery‘,
       ‘kombu.transport.django‘,
       ...
    )

第一二项是必须的, 第三项则告诉Celery使用Django项目作为broker.

在INSTALLED_APPS中添加的djcelery是必须的. kombu.transport.django则是基于Django的broker

最后创建Celery所需的数据表, 如果使用South作为数据迁移工具, 则运行:

    python manage.py migrate

否则运行: (Django 1.6或Django 1.7都可以)

    python manage.py syncdb

5. 创建一个task

正如前面所说的, 一个task就是一个Pyhton function. 但Celery需要知道这一function是task, 因此我们可以使用celery自带的装饰器decorator: @task. 在django app目录中创建taske.py:

    from celery import task

    @task()
    def add(x, y):
        return x + y

当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为task的function, 并将它们注册为celery task.

将function标注为task并不会妨碍他们的正常执行. 你还是可以像平时那样调用它: z = add(1, 2).

6. 执行task

让我们以一个简单的例子作为开始. 例如我们希望在用户发出request后异步执行该task, 马上返回response, 从而不阻塞该request, 使用户有一个流畅的访问过程. 那么, 我们可以使用.delay, 例如在在views.py的一个view中:

    from myapp.tasks import add
    ...
        add.delay(2, 2)
    ...

Celery会将task加入到queue中, 并马上返回. 而在一旁待命的worker看到该task后, 便会按照设定执行它, 并将他从queue中移除. 而worker则会执行以下代码:

    import myapp.tasks.add

    myapp.tasks.add(2, 2)

7. 关于import

这里需要注意的是, 在impprt task时, 需要保持一致. 因为在执行djcelery.setup_loader()时, task是以INSTALLED_APPS中的app名, 加.tasks.function_name注册的, 如果我们由于python path不同而使用不同的引用方式时(例如在tasks.py中使用from myproject.myapp.tasks import add形式), Celery将无法得知这是同一task,
因此可能会引起奇怪的bug.

8. 测试

a. 启动worker

正如之前说到的, 我们需要worker来执行task. 以下是在开发环境中的如何启动worker:

首先启动terminal, 如同开发django项目一样, 激活virtualenv, 切换到django项目目录. 然后启动django自带web服务器: python manage.py runserver.

然后启动worker:

    python manage.py celery worker --loglevel=info

此时, worker将会在该terminal中运行, 并显示输出结果.

b. 启动task

打开新的terminal, 激活virtualenv, 并切换到django项目目录:

    $ python manage.py shell
    >>> from myapp.tasks import add
    >>> add.delay(2, 2)

此时, 你可以在worker窗口中看到worker执行该task:

    [2014-10-07 08:47:08,076: INFO/MainProcess] Got task from broker: myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc]
    [2014-10-07 08:47:08,299: INFO/MainProcess] Task myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc] succeeded in 0.183349132538s: 4

9. 另一个例子

下面我们来看一个更为真实的例子, 在views.py和tasks.py中:

    # views.py
    from myapp.tasks import do_something_with_form_data

    def view(request):
        form = SomeForm(request.POST)
        if form.is_valid():
            data = form.cleaned_data
            # Schedule a task to process the data later
            do_something_with_form_data.delay(data)
        return render_to_response(...)
    # tasks.py

    @task
    def do_something_with_form_data(data):
        call_slow_web_service(data[‘user‘], data[‘text‘], ...)

10. 调试

由于Celery的运行需要启动多个部件, 我们可能会漏掉一两个. 所以我们建议:

  • 使用最简单的设置
  • 使用python debug和logging功能显示当前的进程

11. Eager模式

如果在settings.py设置:

    CELERY_ALWAYS_EAGER = True

那么Celery便以eager模式运行, 则task便不需要加delay运行:

    # 若启用eager模式, 则以下两行代码相同
    add.delay(2, 2)
    add(2, 2)

12. 查看queue

因为我们使用了django作为broker, queue储存在django的数据库中. 这就意味着我们可以通过django admin查看该queue:

    # admin.py
    from django.contrib import admin
    from kombu.transport.django import models as kombu_models

    admin.site.register(kombu_models.Message)

13. 检查结果

每次运行异步task后, Celery都会返回AsyncResult对象作为结果. 你可以将其保存, 然后在将来查看该task是否运行成功和返回结果:

    # views.py

    result = add.delay(2, 2)
    ...
    if result.ready():
        print "Task has run"
        if result.successful():
            print "Result was: %s" % result.result
        else:
            if isinstance(result.result, Exception):
                print "Task failed due to raising an exception"
                raise result.result
            else:
                print "Task failed without raising exception"
     else:
         print "Task has not yet run"

14. 定期任务

还有一种Celery的常用模式便是执行定期任务. 执行定期任务时, Celery会通过celerybeat进程来完成. Celerybeat会保持运行, 一旦到了某一定期任务需要执行时, Celerybeat便将其加入到queue中. 不像worker进程, Celerybeat只有需要一个即可.

启动Celerybeat:

    python manage.py celery beat

使Celery运行定期任务的方式有很多种, 我们先看第一种, 将定期任务储存在django数据库中. 即使是在django和celery都运行的状态, 这一方式也可以让我们方便的修改定期任务. 我们只需要设置settings.py中的一项便能开启这一方式:

    # settings.py
    CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘

然后我们便可以通过django admin的/admin/djcelery/periodictask/添加定期任务了.

  • Name: 这一定期任务的注册名
  • Task (registered): 可以选择所有已经注册的task之一, 例如前面的add function
  • Task (custom): task的全名, 例如myapp.tasks.add, 但最好还是用以上项
  • Enabled: 是否开启这一定期任务
  • Interval: 定期任务的间隔时间, 例如每隔5分钟
  • Crontab: 如果希望task在某一特定时间运行, 则使用Unix中的Crontab代替interval
  • Arguments: 用于传参数到task中
  • Execution Options: 更高级的设置, 在此不详细说明, 请查看celery官方文档

15. 注意

本篇博文中所描述的方法只适用于开发环境, 而不应当应用于部署环境.

如果希望在部署环境中使用, 最重要的便是使用更稳定和可扩展的broker, 而不是使用kombu.transport.django.

1. 简单设置

一个简单的Celery堆有一个queue和一个worker进程组成. 使用以下命令启动worker:

    python manage.py celery worker -B

以上命令是基于django-celery, 当然你也可以celery自身启动worker. 通常我们使用supervisord管理celery worker的启动和重启, 而不是使用手动的方式. supervisord的介绍我们会在今后的文章中作详细介绍. 现在我们只需要知道它是一款进程管理程序即可. 当然, 你也可以选择类似的系统, 例如init.d, upstart, runit或god等.

"-B"参数告诉celery在启动worker时同时启动celery beat, 并使用统一进程, 以便执行定期任务.

在部署服务器上, 我们使用Redis或RabbitMQ作为broker. 而在这一简单的celery堆中, 我们用django数据库储存执行结果, 或干脆忽略结果都可.

2. 完整设置

如果简单设置无法满足我们的需要的话, 我们只需要做一些简单的改变就能完整设置Celery异步任务. 完整设置中, 我们使用多个queue来区分任务优先级. 每个queue我们配置一个不同concurrency设置的worker. beat进程也与worker进程分离出来.

    # 默认 queue
    python manage.py celery worker -Q celery
    # 高优先级 queue. 10个 workers
    python manage.py celery worker -Q high -c 10
    # 低优先级 queue. 2个 workers
    python manage.py celery worker -Q low -c 2
    # Beat 进程
    python manage.py celery beat

注意, 其中high和low只是queue的名字, 并没有其他特殊意义. 我们通过为高优先级的queue配置高concurrency的worker, 使高优先级queue能够使用更多的资源.

同样的, 这些启动命令通过supervisor管理. 至于broker, 我们还是使用Redis或RabbitMQ. 而任务结果则可以储存在Redis或Memcached这些拥有高写入速度的系统中. 如果有必要, 这些worker进程可以移到其他服务器中, 但最好共享一个broker和结果储存系统.

3. 扩展性

我们不能一味的依靠增加额外的worker来提高性能, 因为每个worker都会占用一定的资源. 默认的concurrency设置是, 都多少CPU便创建多少worker, 并为每个worker创建一个新的进程. 将concurrency设置的太高则会很快的榨干服务器的CPU和内存资源.

对于I/O资源需求较大的任务, 我们则可以指定worker使用gevent或eventlet池, 而不是使用更多进程. 这一配置使用的内存资源会大大降低, 同时提升concurrency的性能. 需要注意的是, 但如果我们涉及到的library没有为greenlet打过补丁的话, 很有可能会阻塞所有的任务!

4. 注意

还有需要注意的是django的transaction. transaction根据django的版本和是否已web request形式传入有所不同, 所以你需要自己查阅相关的文档.

时间: 2024-10-06 18:25:27

Django中如何使用django-celery完成异步任务的相关文章

Celery在Django中的使用介绍

Celery在Django中的使用介绍 Celery简介 celery是一个简单.灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必须工具. 它是一个专注于实时处理的任务队列,同时也支持任务调度. 何为任务队列 任务队列:是一种在线程和机器间分发任务的机制. celery的三大组成部分 worker 任务执行单元-->Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中. broker(存tasks的仓库) 消息中间件--> Celery

Django中使用Celery实现定时任务(用djcelery)

[TOC] 一.引言 Django是python语言下的一个比较热门的Web框架,越来越多的企业和开发者使用Django实现自己的Web服务器.在Web服务器开发过程中,有时候我们不仅仅是要实现Web服务器端和用户端的简单逻辑交互,还要实现一些定时任务.举出以下的例子: 定期删除或缓存Redis数据库的记录 为了追求更高的数据库访问性能,我把Redis作为MySql数据库的缓存.把常访问的数据放在Redis中,然后定时存储到Mysql中.并且把过期的Redis数据删掉.那么这个时候,就需要定时去

Django中ORM介绍和字段及其参数

ORM介绍 ORM概念 对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术. 简单的说,ORM是通过使用描述对象和数据库之间映射的元数据,将程序中的对象自动持久化到关系数据库中. ORM在业务逻辑层和数据库层之间充当了桥梁的作用. ORM的由来 字母‘O’起源于“对象”(Object),'R'代表“关系”(Relational). 几乎所有的软件开发过程中都会涉及到对象和关系数据库.在用户层面和业务逻辑层

django中怎么使用mysql数据库的事务

Mysql数据库事务: 在进行后端业务开始操作修改数据库时,可能会涉及到多张表的数据修改,对这些数据的修改应该是一个整体事务,即要么一起成功,要么一起失败. Django中对于数据库的事务,默认每执行一句数据库操作,便会自动提交.我们需要在保存数据库操作中自己控制数据库事务的执行流程. 在Django中可以通过django.db.transaction模块提供的atomic来定义一个事务,atomic提供两种用法: 装饰器用法 from django.db import transaction

Django中的app及mysql数据库篇(ORM操作)

Django常见命令 在Django的使用过程中需要使用命令让Django进行一些操作,例如创建Django项目.启动Django程序.创建新的APP.数据库迁移等. 创建Django项目 一把我们都新建一个文件夹来存放项目文件,切换到这个目录下,启动命令行工具.创建一个名为mysite的Django项目: django-admin startproject mysite 创建好项目之后,可以查看当前目录下多出一个名为mysite的文件夹,mysite的文件夹目录结构如下: mysite/ ma

Django中ORM

Object Relational Mapping(ORM) ORM概念 对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术. 简单的说,ORM是通过使用描述对象和数据库之间映射的元数据,将程序中的对象自动持久化到关系数据库中. ORM在业务逻辑层和数据库层之间充当了桥梁的作用. ORM由来 让我们从O/R开始.字母O起源于"对象"(Object),而R则来自于"关系"(Re

Django 中事务的使用

在 Django 中可以通过django.db.transaction 模块提供的atomic来定义一个事务 atomic提供两种方案实现事务 装饰器用法: from django.db import transaction @transaction.atomic def viewfunc(request): # 这些代码会在一个事务中执行 ...... 装饰器用法:整个视图中所有 MySQL 数据库的操作都看做一个事务,范围太大,不够灵活.而且无法直接作用于类视图 with 语句用法: fro

1205 CSRF跨站请求与django中的auth模块使用

目录 今日内容 昨日回顾 基于配置文件的编程思想 importlib模块 简单代码实现 跨站请求伪造csrf 1. 钓鱼网站 如何实现 模拟该现象的产生 2. 解决问题 解决 {% csrf_token %} 3. ajax如何解决 方式1 方式2 方式3 4. csrf相关的两个装饰器 1. 使用 2. 两个装饰器在CBV上的异同 django里settings源码剖析 django有两个配置文件 django auth模块 1. 是什么 2. 常用方法 2.1 创建用户 create_use

Django中ORM介绍和字段及字段参数

https://www.cnblogs.com/liwenzhou/p/8688919.html Django中ORM介绍和字段及字段参数 Object Relational Mapping(ORM) ORM介绍 ORM概念 对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术. 简单的说,ORM是通过使用描述对象和数据库之间映射的元数据,将程序中的对象自动持久化到关系数据库中. ORM在业务逻辑层和数据库

异步任务队列Celery在Django中的使用

前段时间在Django Web平台开发中,碰到一些请求执行的任务时间较长(几分钟),为了加快用户的响应时间,因此决定采用异步任务的方式在后台执行这些任务.在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友. 一.Django中的异步请求 Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 --