python3+celery+redis实现异步任务

一、原理

Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。它是Python写的库,但是它实现的通讯协议也可以使用ruby,php,javascript等调用。异步任务除了消息队列的后台执行的方式,还是一种则是定时计划任务。

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图

组件:

1、任务(tasks)--用户定义的函数,用于实现用户的功能,比如执行一个耗时很长的任务

2、中间介(Broker)--用于存放tasks的地方,但是这个中间介需要解决一个问题,就是可能需要存放非常非常多的tasks,而且要保证Worker能够从这里拿取

3、执行者(Worker)--用于执行tasks,也就是真正调用我们在tasks中定义的函数

4、存储(Backend)--把执行tasks返回的结果进行存储,以供用户查看或调用

二、实现过程

1.环境安装(RabbitMQ/Redis、Celery、django-celery、flower)

2.创建工程

红圈为本工程所需:

web_order下面需要修改的文件:celery.py、__init__.py、settings文件

web_test下面需要修改的文件:tasks.py文件、longTask.py文件

3.修改文件过程

1)修改settings.py。在settings的最后加上如下代码:

1 # CELERY SETTING
2 BROKER_URL = ‘redis://localhost:6379‘    #指定消息中间件
3 CELERY_RESULT_BACKEND = ‘redis://localhost:6379‘   ##指定结果存储位置为本地数据库
4 CELERY_ACCEPT_CONTENT = [‘application/json‘]  #
5 CELERY_TASK_SERIALIZER = ‘json‘
6 CELERY_RESULT_SERIALIZER = ‘json‘
7 CELERY_TIMEZONE = ‘Asia/Shanghai‘
8 CELERY_IMPORTS = ("web_test.tasks")   #注册任务,

2)__init__ 文件。

#绝对导入,以免celery和标准库中的celery模块冲突
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.

#以下导入时为了确保在Django启动时加载app,shared_task在app中会使用到
from .celery import app as celery_app

__all__ = [‘celery_app‘]

3)celery文件

from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "web_order.settings")  # 设置celery可以在命令行中使用
app = Celery(‘web_order‘)  # 创建app实例
# app = Celery(‘tcelery‘, backend=‘redis://localhost:6379/0‘, broker=‘redis://localhost:6379/0‘)
app.conf.CELERY_IGNORE_RESULT = False  # 结果不忽略
# app.conf.CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘ #结果保存在redis中

app.config_from_object(‘django.conf:settings‘)  # 从文件中加载实例
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 自动加载tasks,注意:他会去app下面查找tasks.py文件,所以我们必须将task放在tasks.py文件中

4)longTasks.py

from django.http import JsonResponse
import json
from .tasks import test   #导入异步任务的方法

def sendlongtask(request):
    #由此处调用test方法,执行异步命令
    run_res = test(sh, userIp, username)
    return JsonResponse("执行成功", safe=False)

5)Tasks.py

from celery import shared_task

@shared_task
def test(x, y):
    return (x+y)

4.启动Django和celery

  在项目根目录执行:

  python manage.py runserver 0.0.0.0:8000

  python manage.py celery worker -c 4 --loglevel=info

5.另外,Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现,如下图所示:

    Django下实现的方式如下: 

      1.) 安装flower:

    pip install flower

      2.) 启动flower(默认会启动一个webserver,端口为5555):

python manage.py celery flower

     3.) 进入http://localhost:5555即可查看。

6.python3踩过的坑:python3.7与celery不兼容

1.出现这个错误时,需要将报错文件中所有的async改为asynchronous或者其他变量名即可。

执行下面的脚本也可快速修改

TARGET=/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/backends
cd $TARGET
if [ -e async.py ]
then
    mv async.py asynchronous.py
    sed -n ‘s/async/asynchronous/g‘ redis.py
    sed -n ‘s/async/asynchronous/g‘ rpc.py
fi

运行后,你会发现celery可以正常使用了

2.将python版本降到3.6及以下,celery也可正常使用。

原文地址:https://www.cnblogs.com/weisunblog/p/12275261.html

时间: 2024-11-08 22:42:27

python3+celery+redis实现异步任务的相关文章

django+celery+redis环境搭建

初次尝试搭建django+celery+redis环境,记录下来,慢慢学习~ 1.安装apache 下载httpd-2.0.63.tar.gz,解压tar zxvf httpd-2.0.63.tar.gz,cd httpd-2.0.63, ./configure --prefix=/usr/local/apache --enable-mods=all --enable-cache --enable-mem-cache --enable-file-cache --enable-rewrite(这一

python3操作redis

redis也被称为缓存 1.redis是一个key-value存储系统,没有ForeignKey和ManyToMany的字段. 2.在redis中创建的数据彼此之间是没有关系的,所以也被称为是非关系型数据库 3.它支持存储包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型)等数据类型. 4.redis支持的数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.

使用Redis实现异步消息队列

工作中涉及到redis异步消费,查阅资料,记录下~ 使用Redis实现异步消息队列 https://blog.csdn.net/b540969928/article/details/78406791 异步消息队列 https://blog.csdn.net/qq_33589510/article/details/77126852 原文地址:https://www.cnblogs.com/meixiaoqiu/p/10238227.html

Python3 下 Redis 返回 bytes 类型的问题

Python3 下 Redis 默认返回 bytes 类型数据,而 Python3 下 bytes 类型和 str 类型不能直接互用,容易出错,解决方法是在建立 Redis 连接的时候将 decode_responses 设置为 True,表示将返回的 bytes 数据解码为 str 数据 def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect

python 关于celery的异步任务队列的基本使用(celery+redis)【采用配置文件设置】

工程结构说明: __init__.py:实例化celery,并加载配置模块 celeryconfig.py:配置模块 task1:任务1,实现加法 task2:任务2,实现乘法 app.py:应用,任务生产者 1.__init__.py:实例化celery,并加载配置模块 # -*- coding: utf-8 -*- from celery import Celery myapp=Celery('demo') #通过Celery实例加载配置模块celeryconfig.py myapp.con

Django使用Celery加redis执行异步任务

简单使用 安装celery及redis 定义celery任务 项目下新建tasks.py from celery import Celery # 创建一个Celery类的实例对象 app = Celery('celery_task.tasks', broker='redis://127.0.0.1:6379/8') # 定义任务函数 @app.task def send_register_active_email(message): with open("D:\\celery\\text.txt

django celery的分布式异步之路(一) hello world

设想你遇到如下场景: 1)高并发 2)请求的执行相当消耗机器资源,流量峰值的时候可能超出单机界限 3)请求返回慢,客户长时间等在页面等待任务返回 4)存在耗时的定时任务 这时你就需要一个分布式异步的框架了. celery会是一个不错的选择.本文将一步一步的介绍如何使用celery和django进行集成,并进行分布式异步编程. 1.安装依赖 默认你已经有了python和pip.我使用的版本是: python 2.7.10 pip 9.0.1virtualenv 15.1.0 创建沙盒环境,我们生产

使用celery的backend异步获取结果

惯例先贴出相关参考的文档: http://docs.celeryproject.org/en/stable/getting-started/next-steps.html http://docs.celeryproject.org/en/stable/userguide/tasks.html#task-result-backends 这篇紧接上篇. 其实我们一般对这种异步任务需求是可能需要回调的.比如说我现在有一个支付的异步任务发送到了队列. 生产者不需要等待,在发送到队列之后就告诉用户已经支付

Celery - 一个懂得 异步任务 , 定时任务 , 周期任务 的芹菜

1.什么是Celery?Celery 是芹菜Celery 是基于Python实现的模块, 用于执行异步定时周期任务的其结构的组成是由    1.用户任务 app    2.管道 broker 用于存储任务 官方推荐 redis rabbitMQ  / backend 用于存储任务执行结果的    3.员工 worker 2.Celery的简单实例 1 from celery import Celery 2 import time 3 4 #创建一个Celery实例,这就是我们用户的应用app 5