celery(一) application

Application

application

  1. celery在使用之前,必须首先实例化。e.g. app = Celery()
  2. app 是线程安全的,即:不同配置、组件和任务的多个app可以共存在同一个进程空间。

任务注册表(task-registry)

在Celery中发送一个task 消息,这个消息并不包含任何源代码(函数体)。而是只有你所期望执行的task的名字。每个worker有一个任务注册表(task-registry),它是task 名称与 task 源代码(函数)的映射。每当你定义一个task,这个task就会被注册到本地的注册表中

懒加载

Celery创建app实例是延迟的,只有在调用它的时候才会创建。也就是说,Celery()命令并没有立即创建app实例。

Celery实例化的过程中,做了如下操作:

  • 创建了一个逻辑时钟,用于events
  • 创建了一个任务注册表 task-registry
  • 设置它自己为当前的Celery实例(如果set_as_current 参数被置为disabled,那么就不会进行该操作)
  • 调用app.on_init() (默认情况下,啥都没做)

@app.task 装饰器也是延迟创建task的。task被定义的时候(在一个函数头上加装饰器的时候),task并没有立即创建。在task被使用的时候,或者是app finalized 的时候,task才会创建

finalized 做了如下操作:

  • task是在多app之间共享的,拷贝task。(shard参数可以取消共享,使task独属于其绑定的app)
  • 创建所有的task
  • 确保所有的task都绑定到当前的app(只有绑定app,task才能读取默认的配置)

Main name

task 默认的名称组成是 model.fun e.g. tasks.add

当model名获取不到的时候就会以 __main__ 作为model名。因此会出现 某task以 __main__.add 为名注册到 任务注册表之后,当某个task被引入到其他模块时,会以源模块.task作为任务名,这个时候,就会出现不一致的情况,所以,在Celery实例化的时候一定要指定app的名字。e.g. app=Celery(‘tasks’)

配置Celery

Celery的配置有如下几种:

  • 直接配置app属性
  • 使用配置文件

直接配置app属性

  • 单个配置

app.conf.enable_utc = True

  • 多个配置
app.conf.update(enable_utc=True, timezone=‘Asia/Shanghai‘)

使用配置文件

从配置文件加载配置,使用 app.config_from_object()app.config_from_envvar()方法。

config_from_object

app = Celery()
app.config_from_object(‘celeryconfig.py‘)
# 项目中要有 celeryconfig.py 文件
from xxxx import configmodel

app = Celery()
app.config_from_object(configmodel)
class Config:
    enable_utc = True
app = Celery()
app.config_from_object(Config)

config_from_envva

从环境变量加载指定模块

app.config_from_envvar(‘CELERY_CONFIG_MODEL‘)

Task

所有使用 @task() 装饰器定义的task,都继承自基类 Task 。你也可以指定自己的基类 Task

  1. 在装饰器中指定其他类
@app.task(base=OtherTask)
def function():
    ...
  1. 在配置中指定其他类
app = Celery()
app.Task = OtherTask

自定义 Task

所有的自定义 Task 都必须

继承自 Task

class MyTask(Task):
    ...

最佳实践是 app 作为参数传给需要它的地方

class SomeClass:
    def __init__(self, app):
        self.app = app
        ...

原文地址:https://www.cnblogs.com/jijizhazha/p/9866027.html

时间: 2024-08-29 23:36:42

celery(一) application的相关文章

【理论】python使用celery异步处理请求

Flask中使用celery队列处理执行时间较长的请求. 一. 安装celery pip install celery flask redis 二. celery简介 Celery是个异步分布式任务队列 通过Celery在后台跑任务并不像线程那么简单,但是用Celery的话,能够是应用有较好的扩展性,因为Celery是个分布式架构,下面介绍Celery的三个核心组件: 1. 生产者(Celery client): 生产者发送消息,在Flask上工作时,生产者在Flask应用内运行 2. 消费者(

Celery(三)实例Application

Celery必须实例化后才可以使用,实例称之为application或者简称app.实例是线程安全的,多个Celery实例(不同的配置.部件和任务)都可以在一个进程空间中运行. 创建一个最简单的app: >>> from celery import Celery >>> app = Celery() >>> app <Celery __main__ at 0x7f6be52d0cd0> 上述的app是一个运行在__main__模块中的Cel

在tornado中使用celery实现异步任务处理之一

一.简介 tornado-celery是用于Tornado web框架的非阻塞 celery客户端. 通过tornado-celery可以将耗时任务加入到任务队列中处理, 在celery中创建任务,tornado中就可以像调用AsyncHttpClient一样调用这些任务. ? Celery中两个基本的概念:Broker.Backend Broker : 其实就是一开始说的 消息队列 ,用来发送和接受消息. Broker有几个方案可供选择:RabbitMQ,Redis,数据库等 Backend:

Python Celery队列

Celery队列简介: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery. 使用场景: 1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如

Celery 分布式任务队列快速入门

本节内容 Celery介绍和基本使用 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返

利用celery+django 在admin后台设置定时任务

经常用python开发web应用时,会涉及到定时任务的脚本,以前用linux自带的crontab来操作,但是感觉不太接地气,后来发现用celery+django 可以方便的实现! 安装软件环境如下: python 2.7.5 Django==1.8.2 celery==3.1.18 celery-with-redis==3.0 django-celery==3.1.16 MySQL-python==1.2.3 supervisor==3.1.3 使用pip方式安装完以上软件,并且默认系统已经安装

python使用异步任务celery出现异常崩溃时retry重试

前言: python下的celery是啥东西大家应该有了解,是一个异步的任务框架 .话说,  我以前写过一个报警平台的项目,也需要任务的扩展成分布式,当时总是觉得 用celery不是那么太靠谱,所以就自己写了一个分布式的任务派发的系统. 今个和朋友聊起了分布式爬虫,这哥们说 任务有时候经常的崩溃,但是celery的retry的机制有些意思,最后看了下文档  ,又研究了下retry的参数,然后把自己的一些实战分享给大家. #xiaorui.cc @celery.task(bind=True,max

celery消息的编码和序列化(转)

add by zhj: 原文讲的是序列化时的安全问题,不过,我关心的是怎样可以看到消息队列中的数据.下面是在broker中看到的消息,body是先用 body_encoding编码,然后用content-type进行序列化后得到的,'application/x-python-serialize'指的是pickle,用 pickle.loads(body.decode('base64'))就可以看到原始的数据. { 'body': 'gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNx

celery实现任务统一收集、分发执行

首先解释下目标的概念:celery任务消息会由各种途径(比如手动通过python shell触发.通过tornado触发等)发往统一的一个celery broker,然后任务消息会由不同server上的worker去获取并执行.具体点说就是,借助celery消息路由机制,celery broker中开不同的消息队列来接收相应类型的任务消息,然后不同server上开启worker来处理目标消息队列里面的任务消息,即任务统一收集.分发到不同server上执行. 测试 项目架构如下:一个服务,一部分t