Celery初识及简单实例

Celery是一个“自带电池”的任务队列。易于使用,可以轻易入门,它遵照最佳实践设计,使产品可以扩展,或与其他语言集成,并且它自带了在生产环境中运行这样一个系统所需的工具和支持。本文介绍基础部分:

  • 选择和安装消息传输方式(中间人)。
  • 安装Celery并创建一个任务
  • 运行职程并调用任务
  • 追踪任务在不同状态间的迁移,并检视返回值

一、选择中间人

Celery需要一个发送和接收消息的解决方案,其通常以独立服务形式出现,称为消息中间人。

可行的选择包括:

RabbitMQ

RabbitMQ功能完备、稳定、耐用,并且安装简便,是生产环境的绝佳选择。

Redis

Redis也是功能完备的,但更容易受濡染终端或断电地阿莱数据丢失的影响。

使用数据库

不推荐把数据库用于消息队列,但对于很小的项目可能是合适的。

其他中间人

还有其他实验性传输实现:AmazonSQS、MongoDB和IronMQ。

二、安装Celery

Celery提交到Python Package Index上,可以使用Python标准工具pip进行安装:

$ pip install celery

三、应用

首先需要一个Celery实例:这个实例用于你想在celery中做一切事,它必须可以被其他模块导入。

我们简单的放在一个模块中,对于比较大的项目,可以创建一个独立模块。

创建tasks.py:

from celery import Celery

app = Celery(‘tasks‘, broker=‘redis://127.0.0.1:6379/3‘)

@app.taskdef add(x, y):    return x + y

创建APP的时候,Celery方法的第一个参数是我们当前py文件的名字。broker是我们选择的中间人。上面的代码使用的是redis。

接下来我们可以先运行我们的工作任务。

$ celery -A tasks worker --loglevel=info

使用celery命令,第二个参数‘tasks‘还是我们的py文件名字,后面是指定日志级别。这条命令必须能找到我们的tasks文件才能运行起来。

接下来调用我们的任务:

>>> from tasks import add
>>> add.delay(1, 3)
<AsyncResult: 7217dee2-3869-4f5e-8ccc-3a3b3dd6a9d7>

# 这里返回的是任务ID,而不是结果。

这样我们就实现了一个最简单的celery应用。

其他更多的参数命令,你可以通过下面的方式了解:

$ celery worker --help

$ celery help

关于调用我们的任务方式:

  • delay()
  • apply_async()
# delay()这个方法使用起来更方便一些,它能像调用普通函数一样调用我们的任务
task.delay(arg1, arg2, kwarg1=‘x‘, kwarg2=‘y‘)

# 使用apply_async你还需要将参数进行整合
task.apply_async(args=[arg1, arg2], kwargs={‘kwarg1‘: ‘x‘, ‘kwarg2‘: ‘y‘})

  

接下来说一下,在我们的应用中如何使用celery

项目布局:

proj/__init__.py
    /celery.py
    /tasks.py

项目中的celery.py

celery.py

from __future__ import absolute_import
from celery import Celery

app = Celery(‘mysite‘,  # 项目名称
             broker=‘redis://127.0.0.1:6379/3‘,  # 制定我们用的中间人
             backend=‘redis://127.0.0.1:6379/5‘,  # 定义回调中间人
             include=[‘proj.tasks‘],) 

app.conf.update(
    result_expires=3600  # 设置过期时间
)

if __name__ == ‘__main__‘:
    app.start()

  

task.py

from __future__ import absolute_import
from .celery import app
import time

@app.task(bind=True)  # 指定bind=True,当前task函数第一个参数为task本身。
def add(self, x, y):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={‘progress‘: 50})
    # task使用update_state修改当前状态可以触发on_message回调函数
    time.sleep(3)
    self.update_state(state="PROGRESS", meta={‘progress‘: 90})
    time.sleep(1)
    print(self.request.chain)
    print(123456)
    self.request.chain = None  # 当使用chain同时调用多个task函数的时候,如果中间需要停止,可以使用self.request.chain = None
    self.update_state(state="PROGRESS", meta={‘result‘: x + y})
    return x + y

@app.task(bind=True)
def mul(self, x, y):
    return x * y

@app.task
def div(x, y):
    return x / y

  

mysite/views.py

from celery import chain

from django.shortcuts import render, HttpResponse

from celery_work.tasks import add
from celery_work.tasks import mul
from celery_work.tasks import div
from celery_work.tasks import error_handler
# from celery_work.tasks import error_handler

def test(request):
    def on_raw_message(body):
        print(body)
    res = add.apply_async((1, 2))

    print(res.get(on_message=on_raw_message, propagate=False))
    # 注意这里使用了res.get(),所以并不是异步返回的,而是等拿到结果之后返回的
    return HttpResponse(‘ok‘)

# 异步方式处理函数,在add这个task函数中是有睡眠时间的,但是我们在发布任务之后就立即将任务ID返回给客户端了
def test0(request):
    res = add.apply_async((1, 2))
    return HttpResponse(‘async: %s‘ % res)

# 没有中断的chain任务
def test1(request):
    res = (mul.s(1, 2) | mul.s(3) | mul.s(3))()
    # 另一种写法  chain(mul.s(1, 2), mul.s(3), mul.s(3))()
    return HttpResponse(res.get())

# 有中断的chain任务,这里需要注意一下,我们在add这个函数中将chain终止了,res永远无法get到返回值
# 因为res是要取chain任务最后一个task函数的返回值,这样就要使用parent从最后一个task函数往前找,
# 直到找到终止chain任务的那个task函数,进行get
def test2(request):
    res = (add.s(1, 2) | mul.s(3) | mul.s(3))()
    print(res.parent.parent.get())
    print(res.parent.parent.successful())
    return HttpResponse(‘mul‘)

  

  

时间: 2024-08-24 14:34:57

Celery初识及简单实例的相关文章

【MySQL】存储过程、游标、循环简单实例

有时候仅凭 sql 语句可能达不到想要的数据操作目的,有可能需要写一些方法体,通过循环判断等操作最终达到目的.那么在数据库里实现这种方法体就需要存储过程了,个人觉得一个带注释的简单实例可以简单粗暴地解决大部分问题,当然要深入学习了解的话还是要看教程文档了,话不多说,上码: [sql] view plain copy create procedure my_procedure() -- 创建存储过程 begin -- 开始存储过程 declare my_id varchar(32); -- 自定义

session 对象的简单实例

一个session对象的简单实例: 1.登录界面:使用简单的html表单提交界面. <%@ page language="java" contentType="text/html; charset=GB18030"    pageEncoding="GB18030"%><!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "ht

javamail发送邮件的简单实例(转)

javamail发送邮件的简单实例 今天学习了一下JavaMail,javamail发送邮件确实是一个比较麻烦的问题.为了以后使用方便,自己写了段代码,打成jar包,以方便以后使用.呵呵 以下三段代码是我的全部代码,朋友们如果想用,直接复制即可. 第一个类:MailSenderInfo.java package com.util.mail;    /**    * 发送邮件需要使用的基本信息  *author by wangfun http://www.5a520.cn 小说520   */  

AJAX学习整理二之简单实例

做了几个简单的实例,加载txt文本内容.加载xml文件内容,把xml文本内容转换成html表格显示.废话不多说,直接贴代码: <!DOCTYPE html> <html xmlns="http://www.w3.org/1999/html"> <head>     <title>通过ajax获取文本内容</title>     <meta charset="utf-8">     <scr

Spring+Struts 2 简单实例报空指针异常

空指针出现于Action注入位置..如果一般错误请检查配置文件. 我出的错误.在于拷贝了之前做的实例中的lib文件夹到这个工程中. 其中有个包为struts2-convention-plugin-2.3.16.3.jar 造成了包识别异常.出现空指针.有类似经历的可以查看,也给大家提个醒.不要一气呵成的导入所有包.容易出现混乱.也不利于大家清楚的认识包和代码的联系. Spring+Struts 2 简单实例报空指针异常,布布扣,bubuko.com

mvc area区域和异步表单,bootstrap简单实例

码农最怕眼高手低 今天来练习mvc Area技术和bootstrap以及异步表单的C#代码实现. 1.area区域架构对于建立复杂业务逻辑很有帮助,由  AreaRegistration.RegisterAllAreas()方法遍历路由表,获得所有注册的路由.参见 建立类库Common,下设一个文件夹BookStore 在其中建立model和controller.(注意引用System.Web.Mvc这个dll) 项目结构如图: 其中book.cs为model模型 namespace Commo

DataGridView重绘painting简单实例

private void dataGridViewX1_CellPainting(object sender, DataGridViewCellPaintingEventArgs e) { if (e.RowIndex >= 0 && e.ColumnIndex>=0) { Rectangle newRect = new Rectangle(e.CellBounds.X, e.CellBounds.Y, e.CellBounds.Width - 1, e.CellBounds.

jQuery Datatable 实用简单实例

目标: 使用jQuery Datatable构造数据列表,并且增加或者隐藏相应的列,已达到数据显示要求.同时,jQuery Datatable强大的功能支持:排序,分页,搜索等. Query Datatable能良好支持数据完全加载到本地后构建数据列表,排序.分页.搜索等功能就会自带,不需要我们去关心,在此主要说明通过后台动态的加载数据,已达到在大数据面前提高效率的效果. 1. 通过后台进行分页 2. 通过后台进行排序 3. 通过后台进行搜索 具体使用方法: 1. 首先构建我们需要的数据列表,以

Hadoop初学指南(6)--MapReduce的简单实例及分析

本文在上一节的基础上通过一个简单的MR示例对MapReduce的运行流程进行分析. 假设有两行数据,分别是hello you,hello me,我们要统计其中出现的单词以及每个单词出现的次数. 所得的结果为 hello   2 you     1 me      1 (1)大致运行流畅 1.解析成2个<k,v>,分别是<0, hello you><10, hello me>.调用2次map函数. 2.执行map任务 3.map输出后的数据是:<hello,1>