#sora#celery笔记——call the task

基本的两种task调用方式:

apply_async()和delay(),前者要把参数放在元组或字典中,后者直接使用参数

快速参考:

T.delay(arg, kwarg=value)

always a shortcut to .apply_async.

T.apply_async((arg, ), {‘kwarg‘: value})

T.apply_async(countdown=10)

executes 10 seconds from now.

T.apply_async(eta=now + timedelta(seconds=10))

executes 10 seconds from now, specifed using eta

T.apply_async(countdown=60, expires=120)

executes in one minute from now, but expires after 2 minutes.

T.apply_async(expires=now + timedelta(days=2))

expires in 2 days, set using datetime.

例子:

task.delay(arg1, arg2, kwarg1=‘x‘, kwarg2=‘y‘)

task.apply_async(args=[arg1, arg2], kwargs={‘kwarg1‘: ‘x‘, ‘kwarg2‘: ‘y‘})

其他的特性:

##任务延时(设置countdown参数)##

>>> result = add.apply_async((2, 2), countdown=3)

>>> result.get()    # this takes at least 3 seconds to return

20

##任务过期(设置expires参数)##

>>> # Task expires after one minute from now.

>>> add.apply_async((10, 10), expires=60)

When a worker receives an expired task it will mark the task as REVOKED

##任务重试(设置retry参数)##

add.apply_async((2, 2), retry=True, retry_policy={

‘max_retries‘: 3,

‘interval_start‘: 0,

‘interval_step‘: 0.2,

‘interval_max‘: 0.2,

})

。Retry Policy

A retry policy is a mapping that controls how retries behave, and can contain the following keys:

。max_retries

Maximum number of retries before giving up, in this case the exception that caused the retry to fail will be raised.

A value of 0 or None means it will retry forever.

The default is to retry 3 times.

。interval_start

Defines the number of seconds (float or integer) to wait between retries. Default is 0, which means the first retry will be instantaneous.

。interval_step

On each consecutive retry this number will be added to the retry delay (float or integer). Default is 0.2.

。interval_max

Maximum number of seconds (float or integer) to wait between retries. Default is 0.2.

##序列化工具(设置serializer参数)##

有几种可用的序列化工具:JSON,pickle,YAML,msgpack

JSON虽然不错,但是对于传输二进制数据有点无力,因为基于Base64编码,数据体积变大,而且可用的数据格式有限

pickle适用于无需支持其他语言开发的组件的情况,传输二进制文件更轻量级和快速

YAML对跨语言的兼容性不错,支持更多的数据格式。但是处理它的python模块比较慢

msgpack是一种新的交换格式,功能类似JSON,不过还是too young

序列化工具的配置优先级如下

1、The serializer execution option.

2、The Task.serializer attribute

3、The CELERY_TASK_SERIALIZER setting.

e.g >>> add.apply_async((10, 10), serializer=‘json‘)

##压缩工具(设置compression)##

如果发送的消息比较大,可以考虑压缩消息

e.g >>> add.apply_async((2, 2), compression=‘zlib‘)

##不同的消息队列#

e.g add.apply_async(queue=‘priority.high‘)

e.g $ celery -A proj worker -l info -Q celery,priority.high

高级参数:

。exchange

Name of exchange (or a kombu.entity.Exchange) to send the message to.

。routing_key

Routing key used to determine.

。priority

A number between 0 and 9, where 0 is the highest priority.

Supported by: redis, beanstalk

时间: 2024-11-08 05:58:01

#sora#celery笔记——call the task的相关文章

#SORA#celery实践1

这次研究celery的Next Step部分. 先创建一个python module: mkdir proj cd proj touch __init__.py 在proj目录中创建celery.py: from __future__ import absolute_import from celery import Celery app = Celery('proj',              broker='amqp://',              backend='amqp://',

#SORA#celery研究中的一个小问题

sora的rpc机制打算使用celery处理,celery+rabbitmq.最近开始研究它的文档,试着写了段代码; from celery import Celery app = Celery('cagent',backend='redis://localhost',broker='amqp://[email protected]//') #app.conf.update( #    CELERY_TASK_SERIALIZER='json', #    CELERY_ACCEPT_CONTE

#sora#celery worker guide abstract

celery worker guide abstract 启动worker: e.g. celery -A proj worker -l info celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h 备注: The hostname argument can expand the following variables: %h: Hostname including domain name. %n: Hostn

#SORA#celery原生配置文件研究

ps:百度是xxx的走狗 回到正题,今天研究了下用一个py文件作为celery的配置文件,所以,还是参考昨天的例子:http://my.oschina.net/hochikong/blog/396079 我们把celery.py的配置项拿出来,在proj目录中创建celeryconfig.py,内容如下: CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] CEL

#SORA#celery研究笔记

最近看到celery文档task部分,做一下小结 实际处理时,我们可以使用一个类似于logging的模块生成日志. 对于某些任务,你可以设置当满足某些条件时,重试任务.拒绝任务或忽略任务 在定义task时,@app.task(bind=True)中的bind参数可以让你在task中访问请求中的内容,比如id,group之类的信息 @app.task(AGRS=VALUE),ARGS有好几个参数可以设置,比如name,有些和全局设置(CELERY_xxx_xxx之类的)是一样的配置内容 可以自定义

celery (二) task调用

调用 TASK 基础 task 的调用方式有三种: 类似普通函数的调用方式, 通过 __calling__ 调用 ,类似 function() 通过 apply_async() 调用,能接受较多的参数 通过 delay() 调用 ,是apply_async 方法的快捷方法,可接受的参数较少 task.delay(arg1, arg2, kwarg1=1, kwarg2=2) 等同于 task.apply_async(args=[arg1, arg2], kwargs={'kwarg':1, 'k

ansible学习笔记4-playbooks之task

Task列表 每个play包含了一个task列表(任务列表)一个task在其所在对应的所有主机 上(通过host pattern匹配的所有主机)执行完毕之后,下一个task才会执行 有一点需要明白的是,在一个play之中,所有hosts会获取相同的任务指令, 这是play的一个目的所在,也就是将一组选出的hosts映射到task 在运行playbook时,如果一个host执行task是吧,这个host将会从整个playbook 的rotation中移除,如果发生执行失败的情况,请修正playbo

异步任务神器 Celery 简明笔记

转自:http://www.jianshu.com/p/1840035cb510 异步任务 异步任务是web开发中一个很常见的方法.对于一些耗时耗资源的操作,往往从主应用中隔离,通过异步的方式执行.简而言之,做一个注册的功能,在用户使用邮箱注册成功之后,需要给该邮箱发送一封激活邮件.如果直接放在应用中,则调用发邮件的过程会遇到网络IO的阻塞,比好优雅的方式则是使用异步任务,应用在业务逻辑中触发一个异步任务. 实现异步任务的工具有很多,其原理都是使用一个任务队列,比如使用redis生产消费模型或者

celery学习笔记2

1.定义: Celery是一个异步的任务队列(也叫做分布式任务队列) 2.工作结构 Celery分为3个部分 (1)worker部分负责任务的处理,即工作进程(我的理解工作进程就是你写的python代码,当然还包括python调用系统工具功能) (2)broker部分负责任务消息的分发以及任务结果的存储,这部分任务主要由中间数据存储系统完成,比如消息队列服务器RabbitMQ.redis.Amazon SQS.MongoDB.IronMQ等或者关系型数据库,使用关系型数据库依赖sqlalchem