Celery介绍

Celery

官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

celery框架工作流程

1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
2)给worker对应的app添加可处理的任务函数,用include配置给worker的app
3)完成提供的任务的定时配置app.conf.beat_schedule
4)启动celery服务,运行worker,执行任务(text_py)
    D:\mypy\luffy\scripts\异步任务>celery worker -A celery_task -l info -P eventlet
5)启动beat服务,运行beat,添加任务
6)如果需要设置,每周循环定时任务可以查看定时任务中的周时间任务源码

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP(高级消息队列协议), redis等

使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名‘, broker=‘xxx‘, backend=‘xxx‘)

Celery执行异步任务

包架构封装

project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py     # 添加任务
    └── get_result.py   # 获取结果

基本使用

celery.py
# 1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本

from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py
from .celery import app
import time
@app.task
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的结果:%s' % (n + m))
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m
add_task.py
from celery_task import tasks

# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)

# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)
get_result.py
from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

高级使用

celery.py
# 1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info

# 4)获取结果

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'low-task': {
        'task': 'celery_task.tasks.low',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (300, 150),
    }
}
tasks.py
from .celery import app

import time
@app.task
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的结果:%s' % (n + m))
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m
get_result.py
from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

大致使用方式==>伪代码

在项目的celery_task文件中的tasks编写一下代码
# 伪代码:立即和延迟任务使用
@app.task
def send_email(user, content):
    result = print('对user发送content邮件内容')
    if not result:
        print('短信推送用户,邮件发送失败')
        return False
    return True

在Django项目中的APP中写入下面的代码:
# 伪代码:celery异步或延迟任务
from celery_task.tasks import send_email
class EMailAPIView(APIView):
    def post(self, request, *args, **kwargs):
        user = request.data.get('user')
        content = request.data.get('content')
        eta = request.data.get('eta')
        if not eta:
            # 添加立即的异步任务
            send_email.delay(user, content)
        else:
            # 延迟任务
            send_email.apply_aysnc(args=(user, content), eta=eta)
        return APIResponse()

django中使用

celery.py
"""
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务

重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
"""

# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")

# 二、加载celery配置环境
from celery import Celery
# broker
broker = 'redis://127.0.0.1:6379/0'
# backend
backend = 'redis://127.0.0.1:6379/1'
# worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'django-task': {
        'task': 'celery_task.tasks.test_django_celery',
        'schedule': timedelta(seconds=3),
        'args': (),
    }
}
tasks.py
from .celery import app
# 获取项目中的模型类
from api.models import Banner
@app.task
def test_django_celery():
    banner_query = Banner.objects.filter(is_delete=False).all()
    print(banner_query)

原文地址:https://www.cnblogs.com/mqhpy/p/12181341.html

时间: 2024-08-07 05:20:03

Celery介绍的相关文章

python celery介绍和基本使用

08 python celery介绍和基本使用 celery分布式任务队列 RPC远程,当执行一条命令,等待远程执行结果返回客户端. 在Linux上可以在后台执行,不影响其他任务执行.(涉及到异步) 1.分布式任务运算celery 参考:https://www.cnblogs.com/alex3714/p/6351797.html 任务计划:https://www.cnblogs.com/peida/archive/2013/01/08/2850483.html Crontab操作系统本身任务计

Celery介绍和基本使用

Celery介绍和基本使用 Celery是一个基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery,举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间,但你不想让你的程序等着结果返回,而是给你返回一个任务ID,你过段时间只需要拿着这个任务ID就可以拿到任务执行结果,在任务执行ing进行时,你可以继续做其他的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户到资料,

Celery学习---Celery 分布式队列介绍及安装

Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 1. 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2. 你想做一个定时任务,比如每天检测

Celery 分布式任务队列快速入门

本节内容 Celery介绍和基本使用 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返

Celery分布式任务队列快速入门

Celery介绍 Celery是基于Python开发的分布式任务队列.它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度. 如果业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们所有客

Celery异步任务队列/周期任务+ RabbitMQ + Django

一.Celery介绍和基本使用  Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你

Celery 分布式任务队列入门

一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们

Django中Celery的实现

Celery官网http://www.celeryproject.org/ 学习资料:http://docs.jinkan.org/docs/celery/ Celery介绍 Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度. 上图展示的是Celery的架构,它采用典型的生产者-消费者模式,主要由三部分组成:broker(消息队列).workers(消费者:处理任务).backend(存储结果). 消息中间件:Celery本

day43——celery简介、celery小例子

一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们