Celery框架的基本使用方法

一. Celery简介

  Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

  Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

  任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

  版本支持情况:

Celery version 4.0 runs on
        Python ?2.7, 3.4, 3.5?
        PyPy ?5.4, 5.5?
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

  Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。

二. Celery的使用

  这里以用用户注册,然后celery发送激活邮件为例子说明celery的基本用法。

2.1 安装celery及配置Redis

pip install celery
pip install django-redis
# Windows中还需要安装以下模块,用于任务执行单元
pip install eventlet

  在项目的配置文件中配置redis:

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}

2.2 Celery执行异步任务发送邮件

  首先在django中,celery要建立以下目录结构:

pro_cel
    ├── celery_task# celery相关文件夹
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── tasks1.py    #  所有任务函数
    │   └── tasks2.py    #  所有任务函数
    ├── check_result.py # 检查结果
    └── send_task.py    # 触发任务

  注意,检查结果与触发任务的模块不能写在celery_task模块中,不然会报导入celery的错误。

  比如这里建的目录如下:

  首先celery.py中生成Celery对象,同时里面演示了一下定时任务,后面还会再提:

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

# 消息中间件,密码是你redis的密码
# broker=‘redis://:[email protected]:6379/2‘ 密码123456
broker = ‘redis://127.0.0.1:6379/0‘  # 无密码
# 任务结果存储
backend = ‘redis://127.0.0.1:6379/1‘

# 生成celery对象,‘task‘相当于key,用于区分celery对象
# include参数需要指定任务模块
app = Celery(‘task‘, broker=broker, backend=backend, include=[
    ‘celery_task.add_task‘,
    ‘celery_task.send_email‘
])

# 时区
app.conf.timezone = ‘Asia/Shanghai‘
# 是否使用UTC
app.conf.enable_utc = False

# 定时执行
app.conf.beat_schedule = {
    # 名字随意命名
    ‘add-every-5-seconds‘: {
        # 执行add_task下的addy函数
        ‘task‘: ‘celery_task.add_task.add‘,
        # 每10秒执行一次
        ‘schedule‘: timedelta(seconds=10),
        # add函数传递的参数
        ‘args‘: (1, 2)
    },
    ‘add-every-10-seconds‘: {
        ‘task‘: ‘celery_task.add_task.add‘,
        # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
        ‘schedule‘: crontab(minute=5),
        ‘args‘: (1, 2)
    }
}

  然后写send_msg.py发送邮件的任务,首先要去项目的配置文件中配置邮箱:

# EMAIL_BACKEND = ‘django.core.mail.backends.smtp.EmailBackend‘
EMAIL_HOST = ‘smtp.qq.com‘  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = ‘[email protected]‘  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = ‘授权码‘  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = ‘2333<‘[email protected]>‘
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True

  然后写发送邮件的任务send_msg.py代码如下:

import os

if __name__ == "celery_task.send_email":
    # 因为需要用到django中的内容,所以需要配置django环境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "do_celery.settings")
    import django
    django.setup()
    # 导入celery对象app
    from celery_task.celery import app
    from app01 import models
    # 导入django自带的发送邮件模块
    from django.core.mail import send_mail
    import threading
    from do_celery import settings

    @app.task
    def send_email1(id):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作
        # 此处的id由用户注册的视图函数中传入
        user_obj = models.UserInfo.objects.filter(pk=id).first()
        email = user_obj.email
        # 启用线程发送邮件,此处最好加线程池
        t = threading.Thread(target=send_mail, args=(
            "激活邮件,点击激活账号",  # 邮件标题
            ‘点击该邮件激活你的账号,否则无法登陆‘,  # 给html_message参数传值后,该参数信息失效
            settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址
            [email],  # 接收邮件的邮件地址,可以写多个
            ),
            # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
            kwargs={‘html_message‘: "<a href=‘http://127.0.0.1:8000/active_user/?id=%s‘>点击激活gogogo</a>" % id}
        )
        t.start()

  关于发送邮件的更多信息看该博客:https://www.cnblogs.com/liuqingzheng/articles/10072695.html#_label3

  直接写查看结果的check_result.py文件:

from celery.result import AsyncResult
from celery_task.celery import app

def check_result(task_id):
    async1 = AsyncResult(id=task_id, app=app)

    if async1.successful():
        result = async1.get()
        print(result)
        return result
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    elif async1.failed():
        print(‘执行失败‘)
        return ‘执行失败‘
    elif async1.status == ‘PENDING‘:
        print(‘任务等待中被执行‘)
        return ‘任务等待中被执行‘
    elif async1.status == ‘RETRY‘:
        print(‘任务异常后正在重试‘)
        return ‘任务异常后正在重试‘
    elif async1.status == ‘STARTED‘:
        print(‘任务已经开始被执行‘)
        return ‘任务已经开始被执行‘

  后续在App中建立模型表,然后开路由,写对应的视图函数即可,代码如下:

from django.conf.urls import url
from django.contrib import admin
from app01 import views
urlpatterns = [
    url(r‘^admin/‘, admin.site.urls),
    # 注册路由
    url(r‘^register/‘, views.register),
    # 用户点击邮件后的激活路由
    url(r‘^active_user/‘, views.active_user),
    # index路由只是用来测试add任务的
    url(r‘^index/‘, views.index),
    url(r‘^login/‘, views.login),
]

urls.py

from django.db import models

# Create your models here.

class UserInfo(models.Model):
    name = models.CharField(max_length=32)
    password = models.CharField(max_length=32)
    email = models.EmailField(null=True)
    # 记录激活状态
    is_active = models.BooleanField(default=0)

models.py

from django.shortcuts import render, HttpResponse, redirect
from app01 import models
import json
# Create your views here.
from celery_task.add_task import add
from celery_task.send_email import send_email1
from check_result import check_result

def index(request):
    ret = add.delay(1, 2)
    return HttpResponse(ret.id)

def register(request):
    if request.method == ‘POST‘:
        dic = json.loads(request.body.decode(‘utf-8‘))
        name = dic.get(‘name‘)
        password = dic.get(‘password‘)
        email = dic.get(‘email‘)
        user_obj = models.UserInfo.objects.filter(name=name).first()
        if user_obj:
            return HttpResponse(‘用户已存在‘)
        user_obj = models.UserInfo.objects.create(name=name, password=password, email=email)
        # 调用celery的发送邮件任务,将其加入消息队列,并将用户id传入
        result = send_email1.delay(user_obj.id)
        print(check_result(result.id))
        return HttpResponse(‘注册成功,已向你发送一封激活邮件‘)
    return HttpResponse(‘ok‘)

def active_user(request):
    uid = request.GET.get(‘id‘)
    models.UserInfo.objects.filter(id=uid).update(is_active=1)
    return redirect(‘/login/‘)

def login(request):
    # 此处写登录的逻辑即可
    return HttpResponse(‘OK‘)

views.py

  然后运行程序,先用pycharm启用任务执行单元worker(以windows为例):

celery worker -A celery_task -l info  -P  eventlet

  使用app.conf.beat_schdule定时任务时,还需要启用beat,用于定时朝消息队列提交任务:

celery beat -A celery_task -l info

  之后用postman朝该接口发送信息即可,效果如下:

  然后查看邮箱:

2.3 Celery执行定时任务

  设定时间让celery执行一个任务:

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间,这里是10秒后执行任务
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

  django中celery使用crontab时,可以写以下格式:

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery(‘tasks‘, broker=‘redis://127.0.0.1:6379/0‘, backend=‘redis://127.0.0.1:6379/1‘, include=[
    ‘celery_task.tasks1‘,
    ‘celery_task.tasks2‘,
])
cel.conf.timezone = ‘Asia/Shanghai‘
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    ‘add-every-10-seconds‘: {
        # 执行tasks1下的test_celery函数
        ‘task‘: ‘celery_task.tasks1.test_celery‘,
        # 每隔2秒执行一次
        # ‘schedule‘: 1.0,
        # ‘schedule‘: crontab(minute="*/1"),
        ‘schedule‘: timedelta(seconds=2),
        # 传递参数
        ‘args‘: (‘test‘,)
    },
    # ‘add-every-12-seconds‘: {
    #     ‘task‘: ‘celery_task.tasks1.test_celery‘,
    #     每年4月11号,8点42分执行
    #     ‘schedule‘: crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     ‘schedule‘: crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     ‘args‘: (16, 16)
    # },
}

  然后启动beat与worker,正常执行程序即可:

# 启动一个beat
celery beat -A celery_task -l info

# 启动work执行
celery worker -A celery_task -l info -P  eventlet

三. django中使用celery的模块

  其实django中使用celery有两种方式,上面建立特定目录结构的是一种,另一种就是利用django-celery模块,不过不推荐使用后者,因为对于django版本有严格的要求,要是项目换了环境,就无法使用了,不过这里也提一下。

3.1 django-celery基本使用

  安装需要的版本:

celery==3.1.25
django-celery==3.1.20

  在项目目录下新建celeryconfig.py:

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    ‘app01.tasks‘,
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

  在App总穿件tasks.py,用于写任务:

from celery import task
@task
def add(a,b):
    with open(‘a.text‘, ‘a‘, encoding=‘utf-8‘) as f:
        f.write(‘a‘)
    print(a+b)

  视图函数views.py:

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse(‘ok‘)

  项目settings.py中还需要注册及配置:

INSTALLED_APPS = [
    ...
    ‘djcelery‘,
    ‘app01‘
]

...

from djagocele import celeryconfig
BROKER_BACKEND=‘redis‘
BOOKER_URL=‘redis://127.0.0.1:6379/1‘
CELERY_RESULT_BACKEND=‘redis://127.0.0.1:6379/2‘

原文地址:https://www.cnblogs.com/maoruqiang/p/11237148.html

时间: 2024-08-28 01:11:26

Celery框架的基本使用方法的相关文章

Celery框架 接口缓存, Celery框架, Django项目实现轮播图缓存更新

接口缓存 """ 1)什么是接口的后台缓存 前台访问后台接口,后台会优先从缓存(内存)中查找接口数据 如果有数据,直接对前台响应缓存数据 如果没有数据,与(mysql)数据库交互,得到数据,对前台响应,同时将数据进行缓存,以备下次使用 了解:前台缓存 - 前台在请求到接口数据后,在前台建立缓存,再发送同样请求时,发现前台缓存有数据,就不再对后台做请求了 2)什么的接口会进行接口缓存 i)接口会被大量访问:比如主页中的接口,几乎所有人都会访问,而且会重复访问 ii)在一定时间内

开关控件在主流前端框架中的使用方法

本文仅介绍使用方法,后续再介绍实现方案及优劣对比. jquerymobile jqm可以使用checkbox和select模拟switch控件,只需要增加data-role='flipswitch'即可: 默认开关控件: <input type="checkbox" data-role="flipswitch" /> 若要变为选中状态,只需要增加一个checked属性即可,如下: <input type="checkbox" d

960网格布局框架(前端css框架)的使用方法

960框架总宽960px CSS框架已经出现很长时间了,关于这些框架的用处也被我们讨论了很多遍了.有人说,CSS框架不够先进,还有人说这些框架大大的节省了他们的开发时间.在此,我们将不再讨论这个问题. 前段时间,我了解到了CSS框架.经过对Malo.BluePrint和960做了实验对比后,我得出一个结论:我最喜欢960CSS框架. 本教程将解释这个框架的基本原理,这样你就可以用960来快速进入开发. 基本原理 你必须知道一些基本原理来“学习这个框架是如何工作的”.你可以通过实验(或者是用fir

struts2.5框架使用通配符指定方法常见错误

struts2.5框架使用通配符指定方法(常见错误) 在学习struts框架时经常会使用到通配符调用方法,如下: <package name="shop" namespace="/" extends="struts-default"> <!-- 配置Action --> <actionname="user_*" class="userAction" method="{

dwr框架中DWRUtil的方法

dwr框架中DWRUtil的方法 2008-10-14 17:57:23|  分类: JAVA |  标签: |举报 |字号大中小 订阅 7. util.js 功能 util.js包含了一些工具函数来帮助你用javascript数据(例如从服务器返回的数据)来更新你的web页面. 你可以在DWR以外使用它,因为它不依赖于DWR的其他部分.你可以下载整个DWR或者单独下载. 4个基本的操作页面的函数:getValue[s]()和setValue[s]()可以操作大部分HTML元素除了table,l

session过期跳转到登陆页面并跳出iframe框架的两个方法

最近在做拦截器,判断用户登录后操作超时,失去权限然后要重新登录,但是用的iframe,返回的登陆页总是在框架中显示,我百度了下,总是只有其中一个方法,现在分享下两种解决方法,希望对你们有帮助: 方法一: 一般使用filter过滤用户是否登录,如果用户没有登陆则转向登陆页面,这时候可以使用response.sendRedirect().但当在页面上使用了iframe后,发现被重定向的只是父页面中的iframe区域,登陆页面内容显示在该区域中.说明在过滤器中发送重定向请求时,是在iframe页面发送

Laravel框架中的make方法详解

为什么网上已经有这么多的介绍Laravel的执行流程了,Laravel的容器详解了,Laravel的特性了,Laravel的启动过程了之类的文章,我还要来再分享呢? 因为,每个人的思维方式和方向是不一样的,所以就会出现这样的一个场景,当你遇到一个问题在网上寻求答案的时候,有很多文章都解释了你的这个问题,但是你只对其中一篇感兴趣,那是因为作者的思维方式和你的很接近而作者的文笔也可能是你喜欢的那种类型.正因如此,我也来分享一些我在研究Laravel框架时的一些观点和看法,希望给那些和我有类似思维方式

讲述了Django框架模板的使用方法

文章来源: 敏而好学论坛 嗨学网www.piaodoo.com 欢迎大家相互学习本文实例讲述了Django框架模板的使用方法.分享给大家供大家参考,具体如下: 创建模板文件夹 在项目下床架一个模板文件夹 在templates下面为了区分是哪一个应用的模板再建一个与应用同名的文件夹. 在setting.py的TEMLATES里配置模板文件的路径 在视图函数里return reder def index(request):#视图函数必须有一个参数 #进行处理,和M和T进行交互... # return

给微软的依赖注入框架写一些扩展方法

给微软的依赖注入框架写一些扩展方法 Intro 现在在项目里大多都是直接使用微软的依赖注入框架,而微软的注入方式比较简单,不如 AutoFac 使用起来灵活,于是想给微软的依赖注入增加一些扩展,使得可以像AutoFac 一样比较灵活的注册服务 Extensions RegisterTypeAsImplementedInterface 将类型注册为其实现的接口,比如 pubic class UserService:IUserService,IUserRepository{}, 注册 UserSer