Celery(二)-----------------使用Celery的第二步

使用Celery的第一步是有意最小化的介绍Celery。本节教程将为你展示Celery提供的更多细节,包括怎样为你的程序和库添加Celery支持。

本节教程不会介绍Celery的所有特性和最佳实践,所以建议你也阅读一下用户指引 User Guide

在你的程序中使用Celery

我们工程的结构如下:

布局:

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py

from __future__ import absolute_import, unicode_literals

from celery import Celery

app = Celery(‘proj‘,
             broker=‘amqp://‘,
             backend=‘amqp://‘,
             include=[‘proj.tasks‘])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == ‘__main__‘:
    app.start()

在celery.py模块中,我们创建了Celery实例(有时被引用为app)。为了在你的工程中使用Celery,你可以简单的导入这个实例。

a.broker关键字参数指定了中间人所使用的URL

查看Choosing
a Broker
 获得更多信息

b.backend关键字参数指定了要使用的结果后台

用它来跟踪任务状态和结果。尽管默认情况下,结果选项被禁用。我这里还指明了backend是因为稍后我将展示怎样检索结果,你可能会在你的程序里使用其它的backend。

它们有各自的优势和劣势。如果你不需要结果你最好禁用它们。结果也可以被个别的任务禁用,通过在task装饰器中指定ignore_result选项( @task(ignore_result=True))

查看Keeping
Results
 获得更多的信息

c.include关键字参数是一个在worker启动时要导入的模块列表。在这里需要添加我们的tasks模块,这样worker启动时才能找到我们的task.

proj/tasks.py

from __future__ import absolute_import, unicode_literals

from proj.celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

启动worker

可以用celery命令启动worker(你需要在proj的父目录执行以下命令):

$ celery -A proj worker -l info

当worker启动时你应该可以看到如下信息:

When the worker starts you should see a banner and some messages:

-------------- celery@halcyon.local v4.0 (0today8)
---- **** -----
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has 

--broker URL(transport)是我们在celery模块中指定的broker关键字参数,你也可以通过-b选项在命令行指定一个不同的broker.

--concurrency 是用来执行任务而prefork的worker进程,如果所有的worker都在执行任务,那么新添加的任务必须要等待有一个正在执行的任务完成后才能被执行。

默认的concurrency数量是机器上CPU的数量,你可以指定一个数量通过-c选项(celery worker -c )。在这里没有推荐值,因为最优值依懒许多因素,但是如果你的任务大部分都是I/O相关的,你可以尝试增加这个值,实验证明增加到2倍以上于CPU数的值对性能提高微乎其微,相反还会降低性能。

除了默认的进程池,Celery还支持使用Eventlet,Gevent和线程(查看 Concurrency

--Events是一个选项,它可以用来使Celery在worker中有事件发生时发送监控消息(events)。这些信息可以被一些监控程序使用,比如celery events和Flower---一个实时的Celery
monitor,更多详细介绍可以查看Monitoring
and Management guide
.

--Queuens是一个队列列表,workers将会从中消费任务。可以告诉worker一次性地从多个队列中消费任务,这可以用来路由消息给指定的worker。这对于构建高质量的服务,关系的分离和提供优先级都有意义。具体的描述参考Routing
Guide
.

你可以获得一个完整的命令行列表通过--help参数。

$ celery worker --help

更多关于选项的说明请查看Workers
Guide

停止worker

可以简单的通过crtl+c来终止worker。worker可以接收的信号列表可以查看Workers
Guide
.

在后台运行

在生产环境中,你可能希望在后台运行worker.下面的文档中有详细的介绍:daemonization
tutorial
.

下面的脚本使用celery multi命令在后台启动一个或多个worker.

$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (0today8)
> Starting nodes...
    > w1.halcyon.local: OK

你也可以重新启动:

$ celery  multi restart w1 -A proj -l info
celery multi v4.0.0 (0today8)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (0today8)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

或者停止它:

$ celery multi stop w1 -A proj -l info

stop命令是异步的,它不会等待所有的worker真正关闭。你可能需要使用stopwait命令,这个命令会保证当前所有的任务都执行完毕。

$ celery multi stopwait w1 -A proj -l info

注:celery multi命令不会保存workers的信息,所以当重新启动时你需要使用相同的命令行参数。当停止时,只有相同的pidfile和logfile参数是必须的。

默认情况下,celery会在当前目录下创建pidfile和logfile.为了防止多个worker在启动时相互影响,你可以指定一个特定的目录。

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

通过multi命令你可以启动多个workers,另外这个命令还支持强大的命令行语法来为不同的workers指定不同的参数。

$ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \
    -Q default -L:4,5 debug

更多关闭multi的举例请参考multi模块的API手册。

关于 -app参数:

-app参数将会指定将要使用的Celery app实例。它的形式必须是这样的:module.path:attribute

但是也支持一种简写形式,如只给出了一个包名称。Celery将会按以下顺序搜寻app实例:

with --app=proj:

1.名为proj.app的属性

2.名为proj.celery的属性

3.proj模块中任何值为celery application的属性

如果以上都没有找到,将会在子模块proj.celery中寻找:

4.proj.celery.app

5.proj.celery.celery

6.proj.celery模块中任何值为celer application的属性

本文档中的实例说明了这种规则,一般在简单模块中使用proj:app;在大型工程中使用proj.celery:app。

调用任务

你可以使用delay()方法来执行一个任务:

>>> add.delay(2, 2)

这个方法实际上是另外一个方法apply_async()的简单写法:

>>> add.apply_async((2, 2))

apply_async允许你指定一些执行选项,如何时开始执行任务,任务应该被发送给哪个队列等等。

>>> add.apply_async((2, 2), queue=‘lopri‘, countdown=10)

上面的举例中,任务将会被发送给‘lopri‘队列,任务最早将会在消息发送10s之后执行。

直接使用任务将会在当前进程中执行,因此没有任何消息被发送:

>>> add(2, 2)
4

delay(),apply_async(),__call__()三个方法,代表了Celery的调用API,这3个方法也用于方法签名。

更多关于调用API的细节请参考:Calling
User Guide
.

每个任务调用都会被赋予一个唯一的标识符(一个UUID),这就是任务id.

delay(),apply_async()方法会返回一个AsyncResult实例,通过这个实例可以跟踪任务状态轨迹。要使用此功能,需要提供结果后台(result backend),这样才有地方存储任务状态等信息。

结果跟踪功能默认情况下是关闭的,因为没有对任何应用程序都适用的结果后台。你需要考虑每种结果后台的优缺点然后选择一个合适的。对于很多任务来说,保存结果并不十分有用,所以默认开启结果后台是不适合的。另外,你需要明白结果后台不是用来监控任务和workers的,Celery有专用的事件消息来实现此功能(Monitoring
and Management Guide
)。

如果你配置了一个结果后台,你可以检索任务的结果:

>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4

你也可以通过id属性来查看任务的id:

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

你还可以检查异常和调用栈如果任务发生了异常,事实上,默认情况下,result.get()方法会传播任何错误

>>> res = add.delay(2)
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/celery/celery/result.py", line 113, in get
    interval=interval)
File "/opt/devel/celery/celery/backends/amqp.py", line 138, in wait_for
    raise meta[‘result‘]
TypeError: add() takes exactly 2 arguments (1 given)

如果你不希望传递任何错误,你可以禁用它通过传递propagate参数:

>>> res.get(propagate=False)
TypeError(‘add() takes exactly 2 arguments (1 given)‘,)

这样,get会返回异常实例而不会直接将其抛出。所以,你需要使用result的相关方法来检查任务执行成功还是失败。

>>> res.failed()
True

>>> res.successful()
False

你还可以查看任务的状态:

>>> res.state
‘FAILURE‘

一个任务只能处于某一个状态,但是它可以在不同的状态间转化,一个典型的任务状态过程如下:

PENDING -> STARTED -> SUCCESS

"STARTED"状态是一个特殊状态,仅仅会在如下情况下出现:task_trace_started配置被设置或者@task(track_started=True) 选项被设置。

“PENDING”状态实际上并不是一个记录状态,它是任何未知id的任务的默认状态,下面的例子中会出现这一状态:

>>> from proj.celery import app

>>> res = app.AsyncResult(‘this-id-does-not-exist‘)
>>> res.state
‘PENDING‘

如果你一个任务被重新尝试执行过,状态会变得更加复杂。比如,一个任务被尝试执行了2次,状态转换会如下所示:

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

要了解更多关于任务状态的信息,请查看:States

另外,更多关于执行任务的信息,请查看:Calling
Guide
.

Canvas:设计工作流

你刚刚学习到了如何使用任务的delay方法来调用一个任务,通常这对你已经足够了。但是有时候,你希望将一个任务调用的签名传递给其它进程或者作为一个参数传递给其它函数,Celery使用一种名为签名的机制实现这种需求。

一个签名包装了一个任务调用的参数和执行参数,这样它就可以被传递给函数甚至被序列化和在网络上传递。

你可以为一个使用参数(2,2)和countdown=10的add任务创建一个签名:

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)

还有一种简写形式:

>>> add.s(2, 2)
tasks.add(2, 2)

再谈调用任务API

签名实例也支持调用API,这意味着我们也可以调用delay,apply_async方法。

但是有些许不同,签名在创建时可能已经指定了参数。add任务需要2个参数,上面的签名在创建时已经指定了2个参数,所以你只需向下面这样:

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

你也可以创建指定部分参数的签名实例:

# incomplete partial: add(?, 2)
>>> s2 = add.s(2)

s2现在是一个部分签名,你可以在调用签名时指定另外一个参数:

# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10

上面添加的参数8,将会放在已存在的参数2之前。构成完整的参数(8,2)

关键字参数也可以在调用签名时添加,新添加的参数会与原参数合并,但是调用时的参数优先级高:

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

如上所述,签名支持调用API。这意味着:

a. sig.apply_async(args=(), kwargs={}, **options)

调用签名支持部分参数和部分关键字参数,也支持部分执行参数。

b. sig.delay(*args, **kwargs)

任何参数都会被放在签名参数之前,任何关键字参数都会与原参数合并。

上面所述的这些看起来很有用,但是你可能不知道在实际中这些有何用。要了解这些的用处,我必须向你介绍canvas原语。

原语

这些原语本身就是签名对象。所以它们可以被用任意方式组合成复杂的工作流。

注:下面的这些例子检索了结果,所以你需要配置一个结果后台(result backend)。上面的例子proj已经指定了结果后台(查看Celery的backend参数Celery)。

下面是一些例子

Groups

group并行地执行一个任务列表,返回一个特殊的结果实例,让你可以以group的方式观察结果并按顺序检索返回值。

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

部分group(上面已经说明了,原语本身就是签名对象)

>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

通过Chains,任务可以像使用管道一样连接在一起,前一个执行完成后会执行下一个:

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

部分Chains

>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

Chains还可以像下面这样:

>>> (add.s(4, 4) | mul.s(8))().get()
64

Chords

chord是一个带回调函数的group

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90

一个group与另外一个任务通过chains(管道"|")连接将会自动地转换为chord:

>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90

因为这些原语全部都是签名对象,所以你可以按照自己的需要随意组合它们:

upload_document.s(file) | group(apply_filter.s() for filter in filters)

了解更多关于工作流的说明:Canvas

路由

Celery支持AMQP提供的所有路由机制,但是也支持将消息发送到命名队列的简单路由。

task_routes配置允许你通过名字来路由任务,还可以将所有的东西进行集中控制。

app.conf.update(
    task_routes = {
        ‘proj.tasks.add‘: {‘queue‘: ‘hipri‘},
    },
)

你也可以在运行时通过queue关键字参数来为apply_async函数指定队列。

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue=‘hipri‘)

你可以通过指定celery worker -Q选项来让一个worker从队列中执行任务。

$ celery -A proj worker -Q hipri

你可以通过逗号分隔符来指定多个队列。比如,你可以指定worker从默认队列和“hipri”队列中执行任务,默认的工作队列由于历史原因名为‘celery‘。

$ celery -A proj worker -Q hipri,celery

worker将会同等对待指定的所有队列,队列指定的先后顺序没有关系。

要了解所有关于路由的的内容,包括如何利用AMQP的强大路由功能,请查看:Routing
Guide
.

远程控制

如果你正在使用RabbitMQ(AMQP),Redis或者MongoDB作为中间人,你可以在运行时控制和监视worker。

比如,你可以查看哪个任务正在被worker执行:

$ celery -A proj inspect active

这是通过广播消息实现的,所以所有的远程控制命令会被集群中的所有worker收到。

你还可以通过使用--destination选项来指定需要发送监控请求的workers所有的节点,所有的节点以逗号分隔。

$ celery -A proj inspect active --destination=[email protected]

如果不指定destination,则所有的worker都会响应和回应广播请求。

celery inspect的相关所有命令并不会改变worker中的任何东西,它仅仅是请求关于正在运行的worker的信息和数据。要查看所有相关的命令,你可以执行:

$ celery -A proj inspect --help

还有一个celery control命令,它可以在运行时改变worker的相关内容:

$ celery -A proj control --help

比如,你可以强制worker使能事件消息(用于监控task和workers)

$ celery -A proj control enable_events

在事件机制开启的情况下,你可以开启事件dumper来观察workers正在干什么:

$ celery -A proj events --dump

或者你也开启curses接口:

$ celery -A proj events

当监控完成后,你可以去使能事件机制:

$ celery -A proj control disable_events

celery status命令也是使用远程控制命令,它会显示集群中在线的workers:

$ celery -A proj status

要了解更多关于celery的命令和监控相关的内容,请查看:Monitoring
Guide
.

时区

所有的时间和日期,内部使用的和消息中的全都使用UTC时区。

当一个worker收到一个消息,比如当countdown设置时,会将UTC时间转换为本地时间。如果你希望采用一个和系统时区不同的时区,你必须用timezone选项来进行配置:

app.conf.timezone = ‘Europe/London‘

优化

默认的配置没有为吞吐量进行优化,celery假设有很多短的任务和较少长的任务,采取了一个在公平调度和吞吐量之间折衷的方式。

如果你对公平调度有严格的要求,或者需要为吞吐量进行优化,请查看: Optimizing
Guide
.

如果你正在使用rabbitmq,你需要安装librabbitmq,这是一个C实现的AMQP客户端的python模块。

$ pip install librabbitmq

下一步干什么?

学习完前2章之后,你可以继续学习用户指引:User
Guide
.

或者你可以先看一下API API
reference

时间: 2024-11-08 05:58:10

Celery(二)-----------------使用Celery的第二步的相关文章

day43——celery简介、celery小例子

一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们

结合Django+celery二次开发定时周期任务

需求: 前端时间由于开发新上线一大批系统,上完之后没有配套的报表系统.监控,于是乎开发.测试.产品.运营.业务部.财务等等各个部门就跟那饥渴的饿狼一样需要 各种各样的系统数据满足他们.刚开始一天一个还能满足他们,优化脚本之后只要开发提供查询数据的SQL.收件人.执行时间等等参数就可以几分钟写完一个定时任务脚本 ,到后面不知道是不是吃药了一天三四个定时任务,不到半个月手里一下就20多个定时任务了,渐渐感到力不从心了,而且天天还要给他们修改定时任务的SQL.收件人.执 行时间等等,天天写定时任务脚本

异步任务利器Celery(二)在django项目中使用Celery

Celery 4.0支持django1.8及以上的版本,低于1.8的项目使用Celery 3.1. 一个django项目的组织如下: - proj/ - manage.py - proj/ - __init__.py - settings.py - urls.py 首先建立proj/proj/celery.py文件: from __future__ import absolute_import, unicode_literals import os from celery import Cele

使用celery之了解celery(转)

原文  http://www.dongwm.com/archives/shi-yong-celeryzhi-liao-jie-celery/ 前言 我想很多做开发和运维的都会涉及一件事:crontab, 也就是在服务器上设定定时任务,按期执行一些任务.但是假如你有上千台的服务器, 你有上千种任务,那么对于这个定时任务的管理恐怕是一件很头疼的事情.哪怕你只是几十个任务分配的不同的机器上怎么样合理的管理和实现以下功能呢: 查看定时任务的执行情况.比如执行是否成功,当前状态,执行花费的时间. 一个友好

使用celery之深入celery配置(转)

原文:http://www.dongwm.com/archives/shi-yong-celeryzhi-shen-ru-celerypei-zhi/ 前言 celery的官方文档其实相对还是写的很不错的.但是在一些深层次的使用上面却显得杂乱甚至就没有某些方面的介绍, 通过我的一个测试环境的settings.py来说明一些使用celery的技巧和解决办法 amqp交换类型 其实一共有4种交换类型,还有默认类型和自定义类型. 但是对我们配置队列只会用到其中之三,我来一个个说明,英语好的话可以直接去

celery expires 让celery任务具有时效性

起因:有的时候,我们希望任务具有时效性,比如定时每5分钟去抓取某个状态,由于celery队列中的任务可能很多,等到这个任务被执行时,已经超过了5分钟,那么这个任务的执行已经没有意义,因为下一次抓取已经执行了. 可以进行如下设定: @task(ignore_result=True, expires=900) def nupdate_influence_by_15min(uid, today=None, if_whole=False): ... ... expires – Either a int,

python之celery使用详解(二)

前言 前面我们了解了celery的基本使用后,现在对其常用的对象和方法进行分析. Celery对象 核心的对象就是Celery了,初始化方法: class Celery(object): def __init__(self, main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, accept_magic_kwargs=False, tas

Celery 分布式任务队列入门

一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们

在tornado中使用celery实现异步任务处理之一

一.简介 tornado-celery是用于Tornado web框架的非阻塞 celery客户端. 通过tornado-celery可以将耗时任务加入到任务队列中处理, 在celery中创建任务,tornado中就可以像调用AsyncHttpClient一样调用这些任务. ? Celery中两个基本的概念:Broker.Backend Broker : 其实就是一开始说的 消息队列 ,用来发送和接受消息. Broker有几个方案可供选择:RabbitMQ,Redis,数据库等 Backend: