Celery—分布式的异步任务处理系统

Celery

1.什么是Clelery

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

专注于实时处理的异步任务队列

同时也支持任务调度

Celery架构

Celery的架构由三部分组成:

● 消息中间件(message broker)

● 任务执行单元(worker)

● 任务执行结果存储(task result store)

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

版本支持情况

Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4, 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

If you’re running an older version of Python, you need to be running an older version of Celery:

Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

2.使用场景

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

3.Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名‘,backend=‘xxx‘,broker=‘xxx‘)

4.Celery执行异步任务

基本使用

新建celery_task.py文件

# celery_task.py

from celery import Celery
# 不加密码
broker=‘redis://127.0.0.1:6379/0‘   # borker配置,任务队列
backend=‘redis://127.0.0.1:6379/1‘  # backend配置 执行结果存储
#加密码
# backend=‘redis://:[email protected]:6379/1‘
# broker=‘redis://:[email protected]:6379/2‘
#一定要指定一个名字

app=Celery(‘test‘,broker=broker,backend=backend)  # 创建任务异步处理器

#任务其实就是个函数
#需要用一个装饰器 *.task装饰,表示该任务是被 * celery管理的,并且可以用celery执行的
@app.task
def add(x,y):
    import time
    time.sleep(2)  # 模拟阻塞任务
    return x+y

这样我们就完成了上图流程中的三大配置,下面我们需要去提交任务并启动worker执行任务

另建add_task.py文件

#用于提交任务的py文件

import celery_task

#提交任务到消息队列中
#只是把任务提交到消息队列中,并没有执行需要启动worker才可以生效
ret=celery_task.add.delay(3,4)  # add 即celery_task中add函数
print(ret)
# ret=a5ea035f-0cc3-44ba-b334-f5d7c7ce681d  :任务的id号,待任务执行完毕,需要通过这个id去backend去取执行结果

#提交定时任务:于2019-07-20 11:13:56执行的任务,此任务只是在这里提交了,需要启动worker才可以生效
from datetime import datetime
v1 = datetime(2019, 7, 20, 11, 13, 56)
print(v1)  # 时间对象
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)  

# #取出要执行任务的时间对象,调用apply_async方法,args是参数,eta是执行的时间
result = celery_task.add.apply_async(args=[1, 3], eta=v2)
print(result.id)   # 任务的id号,待任务执行完毕,需要通过这个id去backend去取执行结果

#第二种获取时间的方法
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
#取10s之后的时间对象
time_delay = timedelta(seconds=3)
task_time = utc_ctime + time_delay   # 时间对象的相加
result = celery_task.add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

启动worker

启动worker的两种方法:

# 方法一:
from celery_task import app
if __name__ == ‘__main__‘:
    app.worker_main()
    # cel.worker_main(argv=[‘--loglevel=info‘)

# 方法二:
# 命令行启动—常用
# linux下: celery worker -A celery_task_s1 -l info
# windows下:celery worker -A celery_task_s1 -l info -P eventlet  #eventlet是个模块 需要pip装下

接下来就是查看任务执行结果了

另起result.py文件

# 创建py文件:result.py,查看任务执行结果

from celery.result import AsyncResult
from celery_task import app

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print(‘执行失败‘)
elif async.status == ‘PENDING‘:
    print(‘任务等待中被执行‘)
elif async.status == ‘RETRY‘:
    print(‘任务异常后正在重试‘)
elif async.status == ‘STARTED‘:
    print(‘任务已经开始被执行‘)

总结下Celery使用步骤:

创建celery_app_task.py,配置borker\backend\worker和任务函数

执行 add_task.py,添加任务,并获取任务ID

执行命令启动worker:celery worker -A celery_app_task -l info

执行 result.py,检查任务状态并获取结果

下面我们进入重点:

原文地址:https://www.cnblogs.com/dongxixi/p/11176983.html

时间: 2024-08-29 11:55:32

Celery—分布式的异步任务处理系统的相关文章

在tornado中使用celery实现异步任务处理之一

一.简介 tornado-celery是用于Tornado web框架的非阻塞 celery客户端. 通过tornado-celery可以将耗时任务加入到任务队列中处理, 在celery中创建任务,tornado中就可以像调用AsyncHttpClient一样调用这些任务. ? Celery中两个基本的概念:Broker.Backend Broker : 其实就是一开始说的 消息队列 ,用来发送和接受消息. Broker有几个方案可供选择:RabbitMQ,Redis,数据库等 Backend:

使用消息队列异步化系统

使用消息队列异步化系统 基于Spring与ActiveMQ的配置实现方案 前言 前期为了快速开发,项目结构较为混乱,代码维护与功能扩展都比较困难,为了方便后续功能开发,最近对项目进行的重构,顺便在重构的过程中将之前的部分操作进行了异步处理,也第一次实际接触了JMS与消息队列.项目中采用的消息中间件为ActiveMQ. 什么是JMS Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分

分布式发布订阅消息系统 Kafka 架构设计[转]

分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部

Celery 分布式任务队列入门

一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们

Android异步任务处理框架AsyncTask源码分析

[转载请注明出处:http://blog.csdn.net/feiduclear_up CSDN 废墟的树] 引言 在平时项目开发中难免会遇到异步耗时的任务(比如最常见的网络请求).遇到这种问题,我们可以自己通过Handler+Message+Thread/ThreadPool来构造一个异步耗时任务框架.当你下次项目中又遇到一个网络请求,你又不得不重写异步耗时任务处理框架.出于避免开发者重复搬砖工作,Google工程师给开发者搭建了一个通用的异步耗时任务处理框架--AsyncTask. Asyn

Celery学习---Celery 分布式队列介绍及安装

Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 1. 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2. 你想做一个定时任务,比如每天检测

Celery - 一个懂得 异步任务 , 定时任务 , 周期任务 的芹菜

1.什么是Celery?Celery 是芹菜Celery 是基于Python实现的模块, 用于执行异步定时周期任务的其结构的组成是由    1.用户任务 app    2.管道 broker 用于存储任务 官方推荐 redis rabbitMQ  / backend 用于存储任务执行结果的    3.员工 worker 2.Celery的简单实例 1 from celery import Celery 2 import time 3 4 #创建一个Celery实例,这就是我们用户的应用app 5

Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?

前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也带来了调用链路处理上的问题,因为大部分应用请求都会要求同步响应实时处理结果,而由于请求的处理过程已经通过消息异步解耦,所以整个调用链路就变成了异步链路,此时请求链路的发起者如何同步拿到响应结果,就需要进行额外的系统设计考虑. 为了更清晰地理解这个问题,小码哥以最近正在做的共享单车的IOT系统为例,给

高性能的分布式内存对象缓存系统Memcached

Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. 外文名 memcached 所    属 缓存系统 编写语言 不限 通信手段 memcached协议 目录 1功能 2特征 ? 协议 ? 事件处