Celery 基本使用

1. 认识 Celery

Celery 是一个 基于 Python 开发的分布式异步消息任务队列,可以实现任务异步处理,制定定时任务等。

  • 异步消息队列:执行异步任务时,会返回一个任务 ID 给你,过一段时间后拿着任务 ID 去取执行结果
  • 定时任务:类似于 Windows / Linux 上的定时任务,到点执行任务

Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用 rabbitMQRedis(默认采用 RabbitMQ)

优点:

  • 简单易用
  • 高可用:即使执行失败或执行过程发生中断,也会尝试再次执行
  • 快速:一个单进程的 Celery 每分钟可以执行上百万个任务
  • 拓展性强:Celery 的各个组件都可以拓展和自定制


Celery 构成

Celery 主要模块:

  • 任务模块 Task:异步和定时任务
  • 消息中间件 Broker:即任务调度队列,接收生产者发来的任务,将任务存入队列。Celery 本身不提供队列服务,官方推荐 RabbitMQ 或 Redis 等
  • 任务执行单元 Worker:处理任务,实时监控消息队列,获取队列中调度的任务,并执行它。
  • 结果存储 Backend:存储任务执行结果,以便查询,与中间件一样,也可以使用 RabbitMQ、Redis 或 MongoDB 存储

2. 异步任务

实现异步任务步骤:

  • 创建一个 Celery 实例
  • 启动 Celery Worker
  • 应用程序调用异步任务

1、安装

pip3 install 'celery[redis]'
pip3 install celery

2、创建 Celery 实例

C1/tasks.py

# -*- coding: utf-8 -*-

import time
from celery import Celery

broker = 'redis://127.0.0.1:6379'       # 消息中间件
backend = 'redis://127.0.0.1:6379/0'    # backend ,存储结果

app = Celery('my_task', broker=broker, backend=backend)     # 创建实例

# 创建一个任务,5s 后执行
@app.task(name='tasks.add')
def add(x, y):
    time.sleep(5)   # 模拟耗时操作
    return x + y

3、启动 Celery Worker

打开 Ubuntu 终端,输入:celery worker -A C1.tasks --loglevel=info,看到如下图就表示启动成功了:

参数:

  • A:指定实例所在位置
  • --loglevel:指定日志级别,有:warning、debug、info、error、fatal ,默认 warning

4、调用任务

另起一个终端,进入 Python 环境,执行任务:

# Celery 提供两种方法来调用任务,delay() 或 apply_async() 方法
python3
>>> from tasks import add
>>> add.delay(6, 8)     # 调用任务,并返回一个任务 ID
<AsyncResult: 194e99af-d0bd-481b-a500-433ec19117e4>

判断任务是否完成:

>>> result = add.delay(6, 8)
>>> result.ready()          # True 表示已完成
True

获取任务结果:

>>> result.get()
14

踩坑:在调用任务时出现Received unregistered task of type ‘tasks.add‘.

  • 原因:Celery 没有找到读取到任务
  • 解决办法:在装饰器出加上 name=‘tasks.add‘

参考博客:Received unregistered task of type ‘XXX’ Celery报错

3. 项目中使用 celery

celery 还可以配置成一个应用,放置在项目中使用,其结构为:

Tips:

  • 项目应该是个包文件
  • 必须命名为 celery.py,否则报错 AttributeError:module ‘proj‘ has no attribute ‘celery‘

1、proj/celery.py

from __future__ import absolute_import, unicode_literals        # 将相对路径转换为绝对路径
from celery import Celery
# 创建一个Celery的实例
app = Celery('tasks',
             # redis://:[email protected]:port/db_number  有密码认证的连接
             broker='redis://127.0.0.1:6379',
             # broker='redis://:密码@192.168.2.105:6379/0',
             backend='redis://127.0.0.1:6379/0',  # 用于Celery的返回结果的接收
             include=['proj.tasks']       # 用于声明Celery要执行的tasks任务的位置
             )
# 配置结果超时时间
app.conf.update(
    result_expires=3600,   # Celery结果存在中间件Redis的超时时间[仅针对当前的Celery的App]
)
if __name__ == '__main__':
    app.start()

2、proj/tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app  # 从我的Celery中导入App
import time

@app.task(name='tasks.add')     # 需要配置 name='tasks.add',否则报 Received unregistered task of type 'app.tasks.add'.
def add(x, y):
    time.sleep(10)
    return x + y

@app.task(name='tasks.mul')
def mul(x, y):
    time.sleep(10)
    return x * y

3、启动 worker,分为前台和后台启动(无需关心起行为):

# 前台
celery -A proj worker -l info

运行结果如下:

4、调用任务:

# 在这里使用终端调用,也可以再项目中调用
>>> from proj.tasks import add, mul

>>> result1 = add.delay(5, 8)
>>> result2 = mul.delay(5, 8)
>>> result1.get()       # 取值
13
>>> result2.get()
40

worker 放在后台继续运行,我们可以继续做别的事情:

# w1:worker
celery multi start w1 -A proj -l info       # 启动 worker
celery multi restart w1 -A proj -l info     # 重启
celery multi stop w1 -A proj -l info        # 关闭
ps -ef | grep celery                        # 查看目前还有几个 worker 正在运行



参考文章

4. 定时任务

celery 通过 celery beat 模块即可实现定时任务功能。

4.1 小试牛刀

1、新建一个 c1\task1.py,编辑如下:

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每过 10 s,执行一次 hello
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # 每过 30 s,执行一次 world
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # 每周一七点三十执行一次 Happy Mondays!
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

也可以配置成下面这样,或许更好理解:

# 可以配置多个
app.conf.beat_schedule = {
    'add-every-30-seconds': {           # 任务名字
        'task': 'tasks.add',            # 执行 tasks 中的 add 函数
        'schedule': 30.0,               # 时间,也可以用 timedelta(seconds=20),
        'args': (16, 16)                # 参数
    },
}
app.conf.timezone = 'UTC'               # 时区

2、启动 beat 进程,监控是否有任务:

[email protected]:~/桌面/c1$ celery -A task1 beat

3、启动 worker 执行任务:

[email protected]:~/桌面/c1$ celery -A task1 worker

从上图中可以看到,每过 10s,就会输出一个 hello,每过 30s 输出一个 world,当然这只是几个比较简单的任务示例。

beat 需要将任务的最后运行时间存储在本地数据库文件中(默认名称为 celerybeat-schedule),因此需要访问当前目录中的写入,或者您可以为此文件指定自定义位置:

# beat 运行时,会产生几个文件
[email protected]:~/桌面/c1$ ls
celerybeat.pid  celerybeat-schedule  __pycache__  task1.py

# 指定文件位置
celery -A task1 beat -s /home/celery/var/run/celerybeat-schedule

4.2 使用 crontab 构建复杂定时任务

如果你只是想每过多少秒输出一个 hello 的话,那么上面的功能就能实现。但是若你想每周一的早上七点半定时发送一封邮件或提醒做什么事的话,那么就只能使用 crontab 才能实现(与 Linux 自带的 crontab功能是一样的)。

from celery.schedules import crontab
from datetime import timedelta

app.conf.beat_schedule = {
     # 任务一
    'sum-task':{                # 任务名
        'task':'tasks.add',     # 执行 tasks.py 中的 add 函数
        'schedule':timedelta(seconds=20),       # 时间
        'args':(5, 6)           # 参数
    },
    # 任务二
    'multi-task': {
        'task': 'tasks.multi',
        'schedule': crontab(hour=4, minute=30, day_of_week=1),
        'args': (3, 4)
    }
}

更多关于 crontab

示例 说明
crontab() 每分钟执行一次
crontab(minute=0, hour=0) 每天午夜执行
crontab(minute=0, hour=‘*/3‘) 每三个小时执行一次
crontab(minute=0,``hour=‘0,3,6,9,12,15,18,21‘) 与上面相同
crontab(minute=‘*/15‘) 每 15min执行一次
crontab(day_of_week=‘sunday‘) 周日每分钟执行一次
crontab(minute=‘*‘,``hour=‘*‘,``day_of_week=‘sun‘) 与上面相同
crontab(minute=‘*/10‘,``hour=‘3,17,22‘,``day_of_week=‘thu,fri‘) 每周四或周五凌晨3-4点,下午5-6点和晚上10-11点
crontab(minute=0,hour=‘*/2,*/3‘) 每过一个小时执行一次, 以下时间除外: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour=‘*/5‘) 执行小时可被5整除,比如下午三点(十五点)触发
crontab(minute=0, hour=‘*/3,8-17‘) 执行时间能被 2整除,在办公时间 8-17点,每小时执行一次
crontab(0, 0,day_of_month=‘2‘) 每个月第二天执行
crontab(0, 0,``day_of_month=‘2-30/3‘) 每个偶数日执行
crontab(0, 0,``day_of_month=‘1-7,15-21‘) 在本月的第一周和第三周执行
crontab(0, 0,day_of_month=‘11‘,``month_of_year=‘5‘) 每年5月11日执行
crontab(0, 0,``month_of_year=‘*/3‘) 每个季度第一个月执行

参考文章

5. Django 中使用 Celery

5.1 构建简单的异步任务

- project/          # 项目主目录
  - app/            # app
        - urls.py
        - views.py
        - tasks.py  # celery 任务,名字必须是 tasks.py
  - project/            # 项目文件
        - __init__.py
        - settings.py
        - urls.py
        - celery.py     # 创建 Celery 实例,加载 redis 配置文件
  - manage.py

在 Django 中使用 Celery ,依赖 django_celery_beat,因此先要安装它:

pip3 install django_celery_beat

并将其添加到 settings.py 中:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app',
    'django_celery_beat',
]

...
# redis  连接
CELERY_BROKER_URL = 'redis://127.0.0.1:6397'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6397/0'

1、project/celery

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# 使用 Django 环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Project.settings')

app = Celery('celery_task')

app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 运行 root 用户运行 celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

2、project/__init__.py

from __future__ import absolute_import, unicode_literals

# 确保导入应用,Django 启动就能使用 app 

from .celery import app as celery_app

__all__ = ['celery_app']

3、创建任务 app/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time 

@shared_task
def add(x, y):
    time.sleep(10)
    return x + y

@shared_task
def multi(x, y):
    time.sleep(10)
    return x * y

tasks.py 必须在各个 app 根目录下,且只能叫 tasks.py

4、视图中调用任务 views.py

  • ready():判断任务是否执行完毕
  • get(timeout=1):获取结果
  • traceback():获取原始回溯信息
from django.shortcuts import render, HttpResponse
from celery.result import AsyncResult

def celery_test(request):
    # 调用任务
    task = add.delay(4,22)

    return HttpResponse(task.id)    # 获取任务 id

def celery_res(request):
    # 获取任务结果
    task_id = 'b3fbe0da-57bb-4055-aea2-160afd6ae801'
    res = AsyncResult(id=task_id)
    return HttpResponse(res.get())      # 获取结果

5、路由配置 app/urls.py

path('celery_test/', views.celery_test, name='celery_test'),
path('celery_result/', views.celery_result, name='celery_result'),

6、打开终端启动 worker

celery -A project worker -l info

访问 127.0.0.1:8000/app/celery_test 调用执行任务:

访问 127.0.0.1:8000/app/celery_result 查看任务结果:

因为这是异步处理的,所有再执行任务时,其他代码照样执行。

5.2 在 Django 中使用定时任务

在 Django 也能设置定时任务,依赖于 django_celery_beatcrontab

1、在 project/celery.py 添加定时任务:

from celery.schedules import crontab
from datetime import timedelta

app.conf.update(
        CELERYBEAT_SCHEDULE = {
            # 任务一
            'sum-task':{
                'task':'app.tasks.add',
                'schedule':timedelta(seconds=20),
                'args':(5, 6)
                },
            # 任务二
            'multi-task': {
                'task': 'app.tasks.multi',
                'schedule': crontab(hour=4, minute=30, day_of_week=1),
                'args': (3, 4)
                }
            }
        )

在上面添加了两个定时任务 sum-taskmulti-task

  • sum-task :每过 20 s执行一次 add() 函数
  • multi-task:每周一的早上四点三十分执行一次 multi() 函数

启动 celery beat ,celery 启动一个 beat 进程不断检查是否有任务要执行:

celery -A project beat -l info

timedelta

timedelta 是datetime 的一个对象,需要引入 from datatime import timedelta,参数如下:

  • days:天
  • seconds:秒
  • microseconds:微秒
  • milliseconds:毫秒
  • minutes:分钟
  • hours:小时

crontab

  • month_of_year:月份
  • day_of_month:日期
  • day_of_week:周
  • hour:小时
  • minute:分钟


总结

  • 同时启动异步任务和定时任务:celery -A project worker -b -l info
  • 使用 RabbitMQ,配置:broker=‘amqp://admin:[email protected]‘
  • Celery 长时间运行避免内存泄露,添加配置:CELERY_MAX_TASKS_PER_CHILD = 10

原文地址:https://www.cnblogs.com/midworld/p/10960465.html

时间: 2024-11-03 16:26:42

Celery 基本使用的相关文章

分布式队列 Celery

详情参见: 分布式队列神器 Celery 个人学习总结后续更新……

异步任务利器Celery(一)介绍

django项目开发中遇到过一些问题,发送请求后服务器要进行一系列耗时非常长的操作,用户要等待很久的时间.可不可以立刻对用户返回响应,然后在后台运行那些操作呢? crontab定时任务很难达到这样的要求 ,异步任务是很好的解决方法,有一个使用python写的非常好用的异步任务工具Celery. broker.worker和backend Celery的架构由三部分组成,消息中间件(broker),任务执行单元(worker)和任务执行结果存储(result backends)组成. 应用程序调用

django celery的分布式异步之路(一) hello world

设想你遇到如下场景: 1)高并发 2)请求的执行相当消耗机器资源,流量峰值的时候可能超出单机界限 3)请求返回慢,客户长时间等在页面等待任务返回 4)存在耗时的定时任务 这时你就需要一个分布式异步的框架了. celery会是一个不错的选择.本文将一步一步的介绍如何使用celery和django进行集成,并进行分布式异步编程. 1.安装依赖 默认你已经有了python和pip.我使用的版本是: python 2.7.10 pip 9.0.1virtualenv 15.1.0 创建沙盒环境,我们生产

结合Django+celery二次开发定时周期任务

需求: 前端时间由于开发新上线一大批系统,上完之后没有配套的报表系统.监控,于是乎开发.测试.产品.运营.业务部.财务等等各个部门就跟那饥渴的饿狼一样需要 各种各样的系统数据满足他们.刚开始一天一个还能满足他们,优化脚本之后只要开发提供查询数据的SQL.收件人.执行时间等等参数就可以几分钟写完一个定时任务脚本 ,到后面不知道是不是吃药了一天三四个定时任务,不到半个月手里一下就20多个定时任务了,渐渐感到力不从心了,而且天天还要给他们修改定时任务的SQL.收件人.执 行时间等等,天天写定时任务脚本

celery出现警告或异常的解决方式

做个笔记,记录下使用celery踩过的坑,不定期更新.  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) 我用的是Flask,所以在Flask的配置文件 confg.py 中,设置好CELERY_ACCEPT_CONTENT这个属性即可.  WARNING/MainProcess 一样对配置文件做下修改 增加属性 CELERY_REDIRECT_STDOUTS_LEVEL = 'INFO' p.p1 { margin: 0.0px

celery queue

1.vi tasks.py #coding:utf-8 from server import app import random,string,smtplib @app.task def add(x,y):     return x+y @app.task def send_mail():         SUBJECT="临时登录密码"         HOST="smtp.163.com"         # TO=passwords['config']['em

Celery/RabbitMQ在Ubuntu上的安装

1.安装RabbitMQ sudo apt-get install rabbitmq-server sudo rabbitmqctl add_user [username] [password] sudo rabbitmqctl add_vhost [vhostname] sudo rabbitmqctl set_user_tags [username] [tagname] sudo rabbitmqctl set_permissions -p [vhostname] [username]".*

Celery的实践指南

celery原理: celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信. 典型的场景为: 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行. 实践中的典型场景:

redis celery too many connection

用django 框架,异步任务用celery,队列用redis 出现了这个问题,too many connection Couldn't ack '5f41afc62d-a112-bef34d5de1cc', reason:ConnectionError('Too many connections',) Traceback (most recent call last): File "/srv/www/wom/env/lib/python2.6/site-packages/kombu/messa

在tornado中使用celery实现异步任务处理之一

一.简介 tornado-celery是用于Tornado web框架的非阻塞 celery客户端. 通过tornado-celery可以将耗时任务加入到任务队列中处理, 在celery中创建任务,tornado中就可以像调用AsyncHttpClient一样调用这些任务. ? Celery中两个基本的概念:Broker.Backend Broker : 其实就是一开始说的 消息队列 ,用来发送和接受消息. Broker有几个方案可供选择:RabbitMQ,Redis,数据库等 Backend: