使用Celery的第一步是有意最小化的介绍Celery。本节教程将为你展示Celery提供的更多细节,包括怎样为你的程序和库添加Celery支持。
本节教程不会介绍Celery的所有特性和最佳实践,所以建议你也阅读一下用户指引 User Guide。
在你的程序中使用Celery
我们工程的结构如下:
布局:
proj/__init__.py /celery.py /tasks.py
proj/celery.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery(‘proj‘, broker=‘amqp://‘, backend=‘amqp://‘, include=[‘proj.tasks‘]) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == ‘__main__‘: app.start()
在celery.py模块中,我们创建了Celery实例(有时被引用为app)。为了在你的工程中使用Celery,你可以简单的导入这个实例。
a.broker关键字参数指定了中间人所使用的URL
查看Choosing
a Broker 获得更多信息
b.backend关键字参数指定了要使用的结果后台
用它来跟踪任务状态和结果。尽管默认情况下,结果选项被禁用。我这里还指明了backend是因为稍后我将展示怎样检索结果,你可能会在你的程序里使用其它的backend。
它们有各自的优势和劣势。如果你不需要结果你最好禁用它们。结果也可以被个别的任务禁用,通过在task装饰器中指定ignore_result选项( @task(ignore_result=True))
查看Keeping
Results 获得更多的信息
c.include关键字参数是一个在worker启动时要导入的模块列表。在这里需要添加我们的tasks模块,这样worker启动时才能找到我们的task.
proj/tasks.py
from __future__ import absolute_import, unicode_literals from proj.celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
启动worker
可以用celery命令启动worker(你需要在proj的父目录执行以下命令):
$ celery -A proj worker -l info
当worker启动时你应该可以看到如下信息:
When the worker starts you should see a banner and some messages:
-------------- celery@halcyon.local v4.0 (0today8) ---- **** ----- --- * *** * -- [Configuration] -- * - **** --- . broker: amqp://guest@localhost:5672// - ** ---------- . app: __main__:0x1012d8590 - ** ---------- . concurrency: 8 (processes) - ** ---------- . events: OFF (enable -E to monitor this worker) - ** ---------- - *** --- * --- [Queues] -- ******* ---- . celery: exchange:celery(direct) binding:celery --- ***** ----- [2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has
--broker URL(transport)是我们在celery模块中指定的broker关键字参数,你也可以通过-b选项在命令行指定一个不同的broker.
--concurrency 是用来执行任务而prefork的worker进程,如果所有的worker都在执行任务,那么新添加的任务必须要等待有一个正在执行的任务完成后才能被执行。
默认的concurrency数量是机器上CPU的数量,你可以指定一个数量通过-c选项(
celery worker -c
)。在这里没有推荐值,因为最优值依懒许多因素,但是如果你的任务大部分都是I/O相关的,你可以尝试增加这个值,实验证明增加到2倍以上于CPU数的值对性能提高微乎其微,相反还会降低性能。
除了默认的进程池,Celery还支持使用Eventlet,Gevent和线程(查看 Concurrency)
--Events是一个选项,它可以用来使Celery在worker中有事件发生时发送监控消息(events)。这些信息可以被一些监控程序使用,比如celery events和Flower---一个实时的Celery
monitor,更多详细介绍可以查看Monitoring
and Management guide.
--Queuens是一个队列列表,workers将会从中消费任务。可以告诉worker一次性地从多个队列中消费任务,这可以用来路由消息给指定的worker。这对于构建高质量的服务,关系的分离和提供优先级都有意义。具体的描述参考Routing
Guide.
你可以获得一个完整的命令行列表通过--help参数。
$ celery worker --help
更多关于选项的说明请查看Workers
Guide
停止worker
可以简单的通过crtl+c来终止worker。worker可以接收的信号列表可以查看Workers
Guide.
在后台运行
在生产环境中,你可能希望在后台运行worker.下面的文档中有详细的介绍:daemonization
tutorial.
下面的脚本使用celery multi命令在后台启动一个或多个worker.
$ celery multi start w1 -A proj -l info celery multi v4.0.0 (0today8) > Starting nodes... > w1.halcyon.local: OK
你也可以重新启动:
$ celery multi restart w1 -A proj -l info celery multi v4.0.0 (0today8) > Stopping nodes... > w1.halcyon.local: TERM -> 64024 > Waiting for 1 node..... > w1.halcyon.local: OK > Restarting node w1.halcyon.local: OK celery multi v4.0.0 (0today8) > Stopping nodes... > w1.halcyon.local: TERM -> 64052
或者停止它:
$ celery multi stop w1 -A proj -l info
stop命令是异步的,它不会等待所有的worker真正关闭。你可能需要使用stopwait命令,这个命令会保证当前所有的任务都执行完毕。
$ celery multi stopwait w1 -A proj -l info
注:celery multi命令不会保存workers的信息,所以当重新启动时你需要使用相同的命令行参数。当停止时,只有相同的pidfile和logfile参数是必须的。
默认情况下,celery会在当前目录下创建pidfile和logfile.为了防止多个worker在启动时相互影响,你可以指定一个特定的目录。
$ mkdir -p /var/run/celery $ mkdir -p /var/log/celery $ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log
通过multi命令你可以启动多个workers,另外这个命令还支持强大的命令行语法来为不同的workers指定不同的参数。
$ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \ -Q default -L:4,5 debug
更多关闭multi的举例请参考multi模块的API手册。
关于 -app参数:
-app参数将会指定将要使用的Celery app实例。它的形式必须是这样的:module.path:attribute
但是也支持一种简写形式,如只给出了一个包名称。Celery将会按以下顺序搜寻app实例:
with --app=proj:
1.名为proj.app的属性
2.名为proj.celery的属性
3.proj模块中任何值为celery application的属性
如果以上都没有找到,将会在子模块proj.celery中寻找:
4.proj.celery.app
5.proj.celery.celery
6.proj.celery模块中任何值为celer application的属性
本文档中的实例说明了这种规则,一般在简单模块中使用proj:app;在大型工程中使用proj.celery:app。
调用任务
你可以使用delay()方法来执行一个任务:
>>> add.delay(2, 2)
这个方法实际上是另外一个方法apply_async()的简单写法:
>>> add.apply_async((2, 2))
apply_async允许你指定一些执行选项,如何时开始执行任务,任务应该被发送给哪个队列等等。
>>> add.apply_async((2, 2), queue=‘lopri‘, countdown=10)
上面的举例中,任务将会被发送给‘lopri‘队列,任务最早将会在消息发送10s之后执行。
直接使用任务将会在当前进程中执行,因此没有任何消息被发送:
>>> add(2, 2) 4
delay(),apply_async(),__call__()三个方法,代表了Celery的调用API,这3个方法也用于方法签名。
更多关于调用API的细节请参考:Calling
User Guide.
每个任务调用都会被赋予一个唯一的标识符(一个UUID),这就是任务id.
delay(),apply_async()方法会返回一个AsyncResult实例,通过这个实例可以跟踪任务状态轨迹。要使用此功能,需要提供结果后台(result backend),这样才有地方存储任务状态等信息。
结果跟踪功能默认情况下是关闭的,因为没有对任何应用程序都适用的结果后台。你需要考虑每种结果后台的优缺点然后选择一个合适的。对于很多任务来说,保存结果并不十分有用,所以默认开启结果后台是不适合的。另外,你需要明白结果后台不是用来监控任务和workers的,Celery有专用的事件消息来实现此功能(Monitoring
and Management Guide)。
如果你配置了一个结果后台,你可以检索任务的结果:
>>> res = add.delay(2, 2) >>> res.get(timeout=1) 4
你也可以通过id属性来查看任务的id:
>>> res.id d6b3aea2-fb9b-4ebc-8da4-848818db9114
你还可以检查异常和调用栈如果任务发生了异常,事实上,默认情况下,result.get()方法会传播任何错误
>>> res = add.delay(2) >>> res.get(timeout=1)
Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/opt/devel/celery/celery/result.py", line 113, in get interval=interval) File "/opt/devel/celery/celery/backends/amqp.py", line 138, in wait_for raise meta[‘result‘] TypeError: add() takes exactly 2 arguments (1 given)
如果你不希望传递任何错误,你可以禁用它通过传递propagate参数:
>>> res.get(propagate=False) TypeError(‘add() takes exactly 2 arguments (1 given)‘,)
这样,get会返回异常实例而不会直接将其抛出。所以,你需要使用result的相关方法来检查任务执行成功还是失败。
>>> res.failed() True >>> res.successful() False
你还可以查看任务的状态:
>>> res.state ‘FAILURE‘
一个任务只能处于某一个状态,但是它可以在不同的状态间转化,一个典型的任务状态过程如下:
PENDING -> STARTED -> SUCCESS
"STARTED"状态是一个特殊状态,仅仅会在如下情况下出现:task_trace_started配置被设置或者@task(track_started=True)
选项被设置。
“PENDING”状态实际上并不是一个记录状态,它是任何未知id的任务的默认状态,下面的例子中会出现这一状态:
>>> from proj.celery import app >>> res = app.AsyncResult(‘this-id-does-not-exist‘) >>> res.state ‘PENDING‘
如果你一个任务被重新尝试执行过,状态会变得更加复杂。比如,一个任务被尝试执行了2次,状态转换会如下所示:
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
要了解更多关于任务状态的信息,请查看:States
另外,更多关于执行任务的信息,请查看:Calling
Guide.
Canvas:设计工作流
你刚刚学习到了如何使用任务的delay方法来调用一个任务,通常这对你已经足够了。但是有时候,你希望将一个任务调用的签名传递给其它进程或者作为一个参数传递给其它函数,Celery使用一种名为签名的机制实现这种需求。
一个签名包装了一个任务调用的参数和执行参数,这样它就可以被传递给函数甚至被序列化和在网络上传递。
你可以为一个使用参数(2,2)和countdown=10的add任务创建一个签名:
>>> add.signature((2, 2), countdown=10) tasks.add(2, 2)
还有一种简写形式:
>>> add.s(2, 2) tasks.add(2, 2)
再谈调用任务API
签名实例也支持调用API,这意味着我们也可以调用delay,apply_async方法。
但是有些许不同,签名在创建时可能已经指定了参数。add任务需要2个参数,上面的签名在创建时已经指定了2个参数,所以你只需向下面这样:
>>> s1 = add.s(2, 2) >>> res = s1.delay() >>> res.get() 4
你也可以创建指定部分参数的签名实例:
# incomplete partial: add(?, 2) >>> s2 = add.s(2)
s2现在是一个部分签名,你可以在调用签名时指定另外一个参数:
# resolves the partial: add(8, 2) >>> res = s2.delay(8) >>> res.get() 10
上面添加的参数8,将会放在已存在的参数2之前。构成完整的参数(8,2)
关键字参数也可以在调用签名时添加,新添加的参数会与原参数合并,但是调用时的参数优先级高:
>>> s3 = add.s(2, 2, debug=True) >>> s3.delay(debug=False) # debug is now False.
如上所述,签名支持调用API。这意味着:
a. sig.apply_async(args=(), kwargs={}, **options)
调用签名支持部分参数和部分关键字参数,也支持部分执行参数。
b. sig.delay(*args, **kwargs)
任何参数都会被放在签名参数之前,任何关键字参数都会与原参数合并。
上面所述的这些看起来很有用,但是你可能不知道在实际中这些有何用。要了解这些的用处,我必须向你介绍canvas原语。
原语
这些原语本身就是签名对象。所以它们可以被用任意方式组合成复杂的工作流。
注:下面的这些例子检索了结果,所以你需要配置一个结果后台(result backend)。上面的例子proj已经指定了结果后台(查看Celery的backend参数Celery
)。
下面是一些例子
Groups
group并行地执行一个任务列表,返回一个特殊的结果实例,让你可以以group的方式观察结果并按顺序检索返回值。
>>> from celery import group >>> from proj.tasks import add >>> group(add.s(i, i) for i in xrange(10))().get() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
部分group(上面已经说明了,原语本身就是签名对象)
>>> g = group(add.s(i) for i in xrange(10)) >>> g(10).get() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
通过Chains,任务可以像使用管道一样连接在一起,前一个执行完成后会执行下一个:
>>> from celery import chain >>> from proj.tasks import add, mul # (4 + 4) * 8 >>> chain(add.s(4, 4) | mul.s(8))().get() 64
部分Chains
>>> # (? + 4) * 8 >>> g = chain(add.s(4) | mul.s(8)) >>> g(4).get() 64
Chains还可以像下面这样:
>>> (add.s(4, 4) | mul.s(8))().get() 64
Chords
chord是一个带回调函数的group
>>> from celery import chord >>> from proj.tasks import add, xsum >>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() 90
一个group与另外一个任务通过chains(管道"|")连接将会自动地转换为chord:
>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get() 90
因为这些原语全部都是签名对象,所以你可以按照自己的需要随意组合它们:
upload_document.s(file) | group(apply_filter.s() for filter in filters)
了解更多关于工作流的说明:Canvas
路由
Celery支持AMQP提供的所有路由机制,但是也支持将消息发送到命名队列的简单路由。
task_routes配置允许你通过名字来路由任务,还可以将所有的东西进行集中控制。
app.conf.update( task_routes = { ‘proj.tasks.add‘: {‘queue‘: ‘hipri‘}, }, )
你也可以在运行时通过queue关键字参数来为apply_async函数指定队列。
>>> from proj.tasks import add >>> add.apply_async((2, 2), queue=‘hipri‘)
你可以通过指定celery worker -Q选项来让一个worker从队列中执行任务。
$ celery -A proj worker -Q hipri
你可以通过逗号分隔符来指定多个队列。比如,你可以指定worker从默认队列和“hipri”队列中执行任务,默认的工作队列由于历史原因名为‘celery‘。
$ celery -A proj worker -Q hipri,celery
worker将会同等对待指定的所有队列,队列指定的先后顺序没有关系。
要了解所有关于路由的的内容,包括如何利用AMQP的强大路由功能,请查看:Routing
Guide.
远程控制
如果你正在使用RabbitMQ(AMQP),Redis或者MongoDB作为中间人,你可以在运行时控制和监视worker。
比如,你可以查看哪个任务正在被worker执行:
$ celery -A proj inspect active
这是通过广播消息实现的,所以所有的远程控制命令会被集群中的所有worker收到。
你还可以通过使用--destination选项来指定需要发送监控请求的workers所有的节点,所有的节点以逗号分隔。
$ celery -A proj inspect active --destination=[email protected]
如果不指定destination,则所有的worker都会响应和回应广播请求。
celery inspect的相关所有命令并不会改变worker中的任何东西,它仅仅是请求关于正在运行的worker的信息和数据。要查看所有相关的命令,你可以执行:
$ celery -A proj inspect --help
还有一个celery control命令,它可以在运行时改变worker的相关内容:
$ celery -A proj control --help
比如,你可以强制worker使能事件消息(用于监控task和workers)
$ celery -A proj control enable_events
在事件机制开启的情况下,你可以开启事件dumper来观察workers正在干什么:
$ celery -A proj events --dump
或者你也开启curses接口:
$ celery -A proj events
当监控完成后,你可以去使能事件机制:
$ celery -A proj control disable_events
celery status命令也是使用远程控制命令,它会显示集群中在线的workers:
$ celery -A proj status
要了解更多关于celery的命令和监控相关的内容,请查看:Monitoring
Guide.
时区
所有的时间和日期,内部使用的和消息中的全都使用UTC时区。
当一个worker收到一个消息,比如当countdown设置时,会将UTC时间转换为本地时间。如果你希望采用一个和系统时区不同的时区,你必须用timezone选项来进行配置:
app.conf.timezone = ‘Europe/London‘
优化
默认的配置没有为吞吐量进行优化,celery假设有很多短的任务和较少长的任务,采取了一个在公平调度和吞吐量之间折衷的方式。
如果你对公平调度有严格的要求,或者需要为吞吐量进行优化,请查看: Optimizing
Guide.
如果你正在使用rabbitmq,你需要安装librabbitmq,这是一个C实现的AMQP客户端的python模块。
$ pip install librabbitmq
下一步干什么?
学习完前2章之后,你可以继续学习用户指引:User
Guide.
或者你可以先看一下API API
reference