#SORA#celery实践1

这次研究celery的Next Step部分。

先创建一个python module:

mkdir proj
cd proj
touch __init__.py

在proj目录中创建celery.py:

from __future__ import absolute_import
from celery import Celery
app = Celery(‘proj‘,
             broker=‘amqp://‘,
             backend=‘amqp://‘,
             include=[‘proj.tasks‘])
# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_TASK_SERIALIZER=‘json‘,
    CELERY_ACCEPT_CONTENT=[‘json‘],
    CELERY_RESULT_SERIALIZER=‘json‘
)
if __name__ == ‘__main__‘:
    app.start()

解析:

  1. app=Celery(‘proj‘),命名这个模块为‘proj‘,详细可参考User Guide的Main Name部分
  2. broker=‘amqp://‘,指定broker,这里用的是rabbitmq。因为rabbitmq默认的用户为guest(密码为guest),你也可以这样写:amqp://[email protected]//
  3. backend=‘amqp://‘,指定一个backend,若需要检查worker执行任务完成后的返回内容,你必须设置一个backend
  4. app.conf.update(....),在该程序中修改相关配置,最佳实践是把配置放到一个独立的文件中。修改的内容是当使用amqp为backend时,result保存的时间。这里好像用的是秒为单位
  5. include[]是为了指定要导入的文件

备注:默认情况下,celery使用pickle作为payload,我测试时用的是root用户,会提示有安全问题,而不予执行。因而我需要配置CELERY_TASK_SERIALIZER,CELERY_ACCEPT_CONTENT,CELERY_RESULT_SERIALIZER为json。(另外请参考我的另一篇blog:http://my.oschina.net/hochikong/blog/393270

同一目录中,创建tasks.py:

from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
    return x + y
@app.task
def mul(x, y):
    return x * y
@app.task
def xsum(numbers):
    return sum(numbers)

启动worker:

celery -A proj worker -l info

提示如下:

可以看到:

RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

其他相关解析请参考celery文档。

在本机传递任务给worker(新开一个终端):

[email protected]:~/celeryapp# ls
cagent.py  cagent.pyc  config.py  config.pyc  proj  test.py

可以看到proj这个目录

进入python解释器:

[email protected]:~/celeryapp# python
Python 2.7.6 (default, Mar 22 2014, 22:59:56) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from proj.agent import add             #我的tasks.py重命名为agent.py
>>> res = add.delay(2,5)
>>>

注意import的形式:因为具体的函数保存在tasks.py(我的是agent.py),所以import的方法应为:module.path:attribute(具体见celery对于import的解释)

我们调用delay(2,5)执行add函数(也可以使用apply_async(),但参数需要放在元组里传入),传入的参数分别是2和5,返回一个对象res

>>> res
<AsyncResult: c08c72ed-8566-4025-b7f5-6ea5a9137966>

调用get()方法获取运算结果:

>>> res.get()
7

注意,如果任务执行的时间很久,get()需要设置timeout,例如:get(timeout=1)

获取运算任务的信息:

>>> res.state
u‘SUCCESS‘

返回一个unicode字符串SUCCESS

我们看看另一个终端,即启动worker那个:

[2015-04-04 14:41:29,772: INFO/MainProcess] Received task: proj.agent.add[60eea8f6-0b6a-4bb4-909f-60a377936dcc]
[2015-04-04 14:41:29,798: INFO/MainProcess] Task proj.agent.add[60eea8f6-0b6a-4bb4-909f-60[2015-04-04 15:05:38,847: INFO/MainProcess] Received task: proj.agent.add[c08c72ed-8566-4025-b7f5-6ea5a9137966]
[2015-04-04 15:05:38,867: INFO/MainProcess] Task proj.agent.add[c08c72ed-8566-4025-b7f5-6ea5a9137966] succeeded in 0.0136815000001s: 7

可以看到任务执行的信息。

要关闭worker,直接按Ctrl+C即可

时间: 2024-12-16 19:07:40

#SORA#celery实践1的相关文章

#SORA#celery原生配置文件研究

ps:百度是xxx的走狗 回到正题,今天研究了下用一个py文件作为celery的配置文件,所以,还是参考昨天的例子:http://my.oschina.net/hochikong/blog/396079 我们把celery.py的配置项拿出来,在proj目录中创建celeryconfig.py,内容如下: CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] CEL

#SORA#celery研究中的一个小问题

sora的rpc机制打算使用celery处理,celery+rabbitmq.最近开始研究它的文档,试着写了段代码; from celery import Celery app = Celery('cagent',backend='redis://localhost',broker='amqp://[email protected]//') #app.conf.update( #    CELERY_TASK_SERIALIZER='json', #    CELERY_ACCEPT_CONTE

#SORA#celery研究笔记

最近看到celery文档task部分,做一下小结 实际处理时,我们可以使用一个类似于logging的模块生成日志. 对于某些任务,你可以设置当满足某些条件时,重试任务.拒绝任务或忽略任务 在定义task时,@app.task(bind=True)中的bind参数可以让你在task中访问请求中的内容,比如id,group之类的信息 @app.task(AGRS=VALUE),ARGS有好几个参数可以设置,比如name,有些和全局设置(CELERY_xxx_xxx之类的)是一样的配置内容 可以自定义

#sora#celery worker guide abstract

celery worker guide abstract 启动worker: e.g. celery -A proj worker -l info celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h 备注: The hostname argument can expand the following variables: %h: Hostname including domain name. %n: Hostn

#sora#celery笔记——call the task

基本的两种task调用方式: apply_async()和delay(),前者要把参数放在元组或字典中,后者直接使用参数 快速参考: T.delay(arg, kwarg=value) always a shortcut to .apply_async. T.apply_async((arg, ), {'kwarg': value}) T.apply_async(countdown=10) executes 10 seconds from now. T.apply_async(eta=now +

Celery的实践指南

celery原理: celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信. 典型的场景为: 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行. 实践中的典型场景:

[Celery]Celery 最佳实践

orangleliu 翻译 原文点击查看 如果你的工作和 Django 相关, 并且有时候需要执行一些长时间的后台任务.可能你已经使用了某种任务队列,Celery就是Python(和Django)世界中时下解决类似问题最受欢迎的项目. 当在某些项目使用Celery作为任务队列之后,我总结了一些最佳实践,决定把它们些下来.然而,这里也有一些对自己应该做的却没做的反思,还有一些celery提供但是没有充分利用的功能. No.1 不要使用关系型数据库来作为AMQP的代理 让我来解释下我为什么觉得这是错

Celery最佳实践(转)

原文  http://www.cnblogs.com/ajianbeyourself/p/3889017.html 作为一个Celery使用重度用户,看到 Celery Best Practices 这篇文章,干脆翻译出来,同时也会加入我们项目中celery的实战经验. 通常在使用Django的时候,你可能需要执行一些长时间的后台任务,没准你可能需要使用一些能排序的任务队列,那么Celery将会是一个非常好的选择. 当把Celery作为一个任务队列用于很多项目中后,作者积累了一些最佳实践方式,譬

celery最佳实践

作为一个Celery使用重度用户,看到Celery Best Practices这篇文章,不由得菊花一紧.干脆翻译出来,同时也会加入我们项目中celery的实战经验. 至于Celery为何物,看这里Celery. 通常在使用Django的时候,你可能需要执行一些长时间的后台任务,没准你可能需要使用一些能排序的任务队列,那么Celery将会是一个非常好的选择. 当把Celery作为一个任务队列用于很多项目中后,作者积累了一些最佳实践方式,譬如如何用合适的方式使用Celery,以及一些Celery提