#SORA#celery研究笔记

最近看到celery文档task部分,做一下小结

  1. 实际处理时,我们可以使用一个类似于logging的模块生成日志。
  2. 对于某些任务,你可以设置当满足某些条件时,重试任务、拒绝任务或忽略任务
  3. 在定义task时,@app.task(bind=True)中的bind参数可以让你在task中访问请求中的内容,比如id,group之类的信息
  4. @app.task(AGRS=VALUE),ARGS有好几个参数可以设置,比如name,有些和全局设置(CELERY_xxx_xxx之类的)是一样的配置内容
  5. 可以自定义任务状态(默认有pending,started,success,failure,retry,revoked)
  6. 当你使用pickle作为序列化工具时,你应该定义那些可以被pickle的异常(我用json,直接忽略)

实例化。你可以继承Task类定义新类,定义的__init__方法只会被调用一次,此后将持续存在。当你的task以此新类为基类,后面对此task的调用中,__init__的作用还会存在。(用途:自定义类中维持一个对数据库的连接,task可以不用每次都创建连接,而是对那个存在的属性进行操作)。

e.g:

from celery import Task

class DatabaseTask(Task):
    abstract = True
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db
@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        …

7.定义新类时,设置为抽象类,可以作为task的基类。其中又有四种handle方法(after_return,on_failure,on_retry,on_success)

e.g:

from celery import Task

class DebugTask(Task):
    abstract = True

    def after_return(self, *args, **kwargs):
        print(‘Task returned: {0!r}‘.format(self.request)

@app.task(base=DebugTask)
def add(x, y):
    return x + y

8.最佳实践:

*忽略不需要的结果,或者设置CELERY_TASK_RESULT_EXPIRES

*若不需要,关闭rate limits

*避免写出同步的task(阻塞),同时执行的同步的task,前面的会令后面的低效,应该写成异步的方法或使用callback

e.g:

#Bad:

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(url, page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)
    
    
    
    
    
#Good:

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s() | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

*使用细粒度的任务,而不是又长又臭的long task

*尽量把需要操作的数据放在本地,或使worker靠近数据所在,降低因IO延迟的影响

暂时如此,以后有需要再补

时间: 2024-10-14 08:24:58

#SORA#celery研究笔记的相关文章

#SORA#celery原生配置文件研究

ps:百度是xxx的走狗 回到正题,今天研究了下用一个py文件作为celery的配置文件,所以,还是参考昨天的例子:http://my.oschina.net/hochikong/blog/396079 我们把celery.py的配置项拿出来,在proj目录中创建celeryconfig.py,内容如下: CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] CEL

#SORA#celery研究中的一个小问题

sora的rpc机制打算使用celery处理,celery+rabbitmq.最近开始研究它的文档,试着写了段代码; from celery import Celery app = Celery('cagent',backend='redis://localhost',broker='amqp://[email protected]//') #app.conf.update( #    CELERY_TASK_SERIALIZER='json', #    CELERY_ACCEPT_CONTE

#sora#celery worker guide abstract

celery worker guide abstract 启动worker: e.g. celery -A proj worker -l info celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h 备注: The hostname argument can expand the following variables: %h: Hostname including domain name. %n: Hostn

#SORA#celery实践1

这次研究celery的Next Step部分. 先创建一个python module: mkdir proj cd proj touch __init__.py 在proj目录中创建celery.py: from __future__ import absolute_import from celery import Celery app = Celery('proj',              broker='amqp://',              backend='amqp://',

#sora#celery笔记——call the task

基本的两种task调用方式: apply_async()和delay(),前者要把参数放在元组或字典中,后者直接使用参数 快速参考: T.delay(arg, kwarg=value) always a shortcut to .apply_async. T.apply_async((arg, ), {'kwarg': value}) T.apply_async(countdown=10) executes 10 seconds from now. T.apply_async(eta=now +

#Sora#peewee plus celery = ?

最近疯看全职猎人 初步学习了peewee,因为sqlalchemy实在是太重量级,文档也太恶心,peewee上手容易得多,也非常light 结合celery和peewee,写了点东西,查询年龄 myapp/db.py: from __future__ import absolute_import from myapp.celery import app from peewee import * db = SqliteDatabase('people.db') class Person(Model

#Sora#openstack基础库stevedore试用总结

什么是stevedore? stevedore是建立在setuptools的entry point的功能上的,用于python程序动态加载代码,在openstack中被多个组件使用:比如ceilometer,neutron的plugin.当然,你可以直接使用 python的某些黑魔法实现插件的加载,但太原始了.stevedore基于entry point提供了更高层次的封装. stevedore的官方文档在此:http://docs.openstack.org/developer/stevedo

分布式队列 Celery

详情参见: 分布式队列神器 Celery 个人学习总结后续更新……

异步任务利器Celery(一)介绍

django项目开发中遇到过一些问题,发送请求后服务器要进行一系列耗时非常长的操作,用户要等待很久的时间.可不可以立刻对用户返回响应,然后在后台运行那些操作呢? crontab定时任务很难达到这样的要求 ,异步任务是很好的解决方法,有一个使用python写的非常好用的异步任务工具Celery. broker.worker和backend Celery的架构由三部分组成,消息中间件(broker),任务执行单元(worker)和任务执行结果存储(result backends)组成. 应用程序调用