Celery异步任务框架

目录

  • Celery架构

    • 消息中间件
    • 任务执行单元
    • 任务结果存储
  • Celery的安装配置
  • Celery执行异步任务
    • 使用步骤
    • 立即任务和延时任务
    • 定时任务
  • 使用场景
  • django项目中使用
    • celery.py(celery服务)
    • tasks.py(任务)
  • 坑点:

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

Celery的安装配置

安装依赖:pip install celery

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名‘, broker=‘xxx‘, backend=‘xxx‘)

Celery执行异步任务

包架构封装

project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py     # 添加任务
    └── get_result.py   # 获取结果

使用步骤

'''
1)创建app + 任务

2)启动celery(app)服务:
    非windows
    cd 到celery_task所在文件夹下
    命令:celery worker -A celery_task -l info

    windows:
    安装依赖:pip3 install eventlet
    cd 到celery_task所在文件夹下
    命令:celery worker -A celery_task -l info -P eventlet

3)手动添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
   自动任务:自动添加任务,所以要启动一个添加任务的服务
    # 命令:celery beat -A celery_task -l info

4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
'''

立即任务和延时任务

celery_task文件夹-> celery.py (celery服务 文件名不可改)

from celery import Celery

# broker:任务仓库
broker = 'redis://127.0.0.1:6379/1'
# backend:任务结果仓库
backend = 'redis://127.0.0.1:6379/2'
# include:任务(函数)所在文件
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

celery_task文件夹-> tasks.py(celery任务 文件名可以改)

from .celery import app
import time
@app.task
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的结果:%s' % (n + m))
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m

add_task.py(添加任务)

from celery_task import tasks

# 往celery的Broker中添加立即任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)

# 添加延迟任务

from datetime import datetime, timedelta
def eta_second(second):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)
    return utc_ctime + time_delay

# apply_async就是添加延迟任务
# args是jump任务需要的参数,没有就设置为空()
# eta是该任务执行的UTC格式的时间
tasks.low.apply_async(args=(200, 50), eta=eta_second(10))

get_result.py (获取任务执行结果)

from celery_task.celery import app

from celery.result import AsyncResult

# 任务id
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

定时任务

celery_task文件夹-> celery.py (celery服务 文件名不可改)

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    # 定时任务:任务名自定义
    'low-task': {
        'task': 'celery_task.tasks.low',    # 任务源
        'args': (300, 150), # 任务参数
        'schedule': timedelta(seconds=3),   # 定时添加任务的时间
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
    }
}

celery_task文件夹-> tasks.py(celery任务 文件名可以改)

from .celery import app

import time
@app.task
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的结果:%s' % (n + m))
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m

get_result.py

from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

启动定时任务:

'''
进入celery_tash的父级目录

启动celery(app)服务:
 非windows
  命令:celery worker -A celery_task -l info
 windows:
  pip3 install eventlet
  celery worker -A celery_task -l info -P eventlet

添加任务:自动添加任务,所以要启动一个添加任务的服务
  命令:celery beat -A celery_task -l info
'''

使用场景

立即任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延时任务:需要延时处理的任务丢到延时任务里取处理

定时任务:定时执行某件事情,比如每天数据统计

django项目中使用

# 1)创建app + 任务

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info

# 4)获取结果

celery.py(celery服务)

# 重点:要将 项目的配置文件 所在的文件夹添加到环境变量
# import sys
# sys.path.append(r'项目的配置文件所在的文件夹')

# 开启django支持
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名.settings.dev')
import django
django.setup()

from celery import Celery

# broker:任务仓库
broker = 'redis://127.0.0.1:6379/1'
# backend:任务结果仓库
backend = 'redis://127.0.0.1:6379/2'
# include:任务(函数)所在文件
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 自动任务的定时配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
    # 定时任务:任务名自定义
    'update_banner_cache': {
        'task': 'celery_task.tasks.update_banner_cache',  # 任务源
        'args': (),  # 任务参数
        'schedule': timedelta(seconds=10), # 定时添加任务的时间
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
    }
}

tasks.py(任务)

from .celery import app
# 获取项目中的模型类
from home.models import Banner
from django.conf import settings
from django.core.cache import cache
from home.serializers import BannerModelSerializer
@app.task
def update_banner_cache():
    # 查询数据库
    banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by("-order")[:settings.BANNER_COUNT]
    # 序列化数据对象得到序列化后的json列表
    banner_list = BannerModelSerializer(banner_query, many=True).data
    # 设置缓存,走的是默认项目配置的缓存数据库
    cache.set('banner_list', banner_list)

坑点:

windows下celery处理post请求会报错。

错误:wrap_socket() got an unexpected keyword argument ‘_context‘

原因:

启动服务器错误
requests包的requests.post发送后,传不回数据

改变服务器启动方法不要用eventlet,加个参数
celery worker -A celery_name --loglevel=info --pool=solo

原文地址:https://www.cnblogs.com/XuChengNotes/p/12019811.html

时间: 2024-08-02 07:13:14

Celery异步任务框架的相关文章

celery异步执行任务框架

Celery 官方 Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/ Celery异步任务框架 """ 1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket) 2)celery服务为为其他项目服务提

Django使用Celery异步任务队列

1  Celery简介 Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行. 任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ.Redis). 1.1  Celery原理 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成. 消息中间件:Celery本身不提供消息服务,但

平台项目 ~ celery 异步之异步处理功能

一 简介:今天来聊聊celery两大功能之一的异步处理 二  标准流程: 1 建立 config 文件           class Config:             ENABLE_UTC = False             CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'             BROKER_URL = 'redis://127.0.0.1:6379/6'            注意 1 所有参数都必须大写,并且

基于redis AE的异步网络框架

最近一直在研究redis的源码,redis的高效率令人佩服. 在我们的linux机器上,cpu型号为, Intel(R) Pentium(R) CPU G630 @ 2.70GHz Intel(R) Pentium(R) CPU G630 @ 2.70GHz 上 set,get 都能达到每秒钟15W的请求处理量,真是佩服这代码的效率. 前几篇文章,主要是介绍了基本的代码,比如字符串处理,链表处理,hash等.这篇文章介绍网络的核心,基于事件反映的异步网络框架. 异步网络处理,是基于epoll的.

Voovan 是一个高性能异步网络框架和 HTTP(Java)

Voovan 是一个高性能异步网络框架和 HTTP 服务器框架,同时支持 HTTP 客户端抓取.动态编译支持.数据库访问封装以及 DateTime.String.Log.反射.对象工具.流操作.文件操作.异步双向通道等功能.旨在提供可靠.方便.可单元测试的代码.它是一个无任何依赖的独立工具包,希望能够方便广大开发者快速的实现应用. 作者:@愚民日记 地址:http://git.oschina.net/helyho/Voovan http://www.oschina.net/news/80909/

Python 开源异步并发框架的未来(转)

Python 开源异步并发框架的未来 fantix 1.1k 2014年04月16日 发布 推荐 4 推荐 收藏 31 收藏,8.9k 浏览 呵呵,这个标题有点大,其实只是想从零开始介绍一下异步的基础,以及 Python 开源异步并发框架的发展和互操作性. 另外,这是我在 OSTC 2014 做的一个 20140330-OSTC-分论坛1王川 http://v.youku.com/v_show/id_XNjk2ODI0ODQ4.html ,幻灯片在这里,欢迎拍砖. 开源 Python 是开源的,

Java异步NIO框架Netty实现高性能高并发

1. 背景 1.1. 惊人的性能数据 近期一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步堵塞IO)的通信框架.性能提升了8倍多. 其实,我对这个数据并不感到吃惊,依据我5年多的NIO编程经验.通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是全然有可能的. 以下我们就一起来看下Ne

Python开源异步并发框架

Python开源异步并发框架的未来 2014年3月30日,由全球最大的中文IT社区CSDN主办的“开源技术大会·2014” (Open Source Technology Conference 2014,简称OSTC 2014)在北京丽亭华苑酒店召开. 本次大会以“启蒙·开源”(Open Mind, Open Source)为主题,邀请到了来自全国各地的30多位开源业界资深人士发表主题演讲,数十个开源社区现场参与,到场的开源软件开发者.贡献者和开源爱好 者总人数超过500人.作为一场“接地气”的

多线程实现简单的事件异步处理框架

老实说,多线程在web开发里面非常常见,很多web容器本身就支持多线程,所以很多时候我们在进行web开发的时候并不需要考虑多线程相关的负责问题,而只需要实现相关的业务功能即可.所以,可以概括地讲,很多时候的web开发,并没有多线程方面的考虑,因为web应用本身就是在多线程基础上的了. 但是,有些时候为了提高程序性能,在用户的一个请求中中如果包含过多的业务操作或者包含耗时比较长的业务操作,我们就需要考虑使用异步的方式来提高程序响应的速度了.这篇博客简单介绍了在java中如何使用多线程实现一个简单的