python之celery队列模块

一、celery队列简介

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery.

1.1使用场景

1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。

2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

1.2celery原理

Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis 或者是数据库来存放消息的中间结果

Celery优点:

  1. 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  2. 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  3. 快速:一个单进程的celery每分钟可处理上百万个任务
  4. 灵活: 几乎celery的各个组件都可以被扩展及自定制

Celery缺点:

    1.目前只能在Linux系统上有较好的支持

Celery工作流程图:

1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。

3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

Celery的架构图如图所示:

在传统的web应用中,Django的web页面通过url的映射到view,view再执行方法,如果方法需要调用大量的脚本,执行大量的任务,页面就会阻塞,如果在项目中使用Celery队列.首先用户的任务会被celery放到broker中进行中转,然后将任务分为一个个的task来执行,由于celery是异步机制,所以会直接给用户返回task_id,页面拿到task_id就可以执行后续的操作,比如查看任务进度,暂停任务,而无需等待所有任务全部执行完毕,才能看到页面。

二、celery安装

2.1安装:

  1.在linux(ubuntu)系统上首先安装Celery队列

    pip3 install Celery

2.在linux安装redis

    sudo apt-get install redis-server

3.在linux上安装redis-celery中间件

    pip3 install -U "celery[redis]"

4.启动redis

sudo /etc/init.d/redis-server start

三、celery使用

3.1简单使用

命名为tasks.py

from celery import Celery

app = Celery(‘tasks‘,
             broker=‘redis://localhost‘,
             backend=‘redis://localhost‘)

@app.task
def add(x,y):
    print("running...",x,y)
    return x+y

启动监听并开始执行该服务

celery -A tasks worker -l debug

在开启一个终端进行测试任务

进入python环境

from tasks import add
t = add.delay(3,3) #此时worker会生成一个任务和任务id
t.get() #获取任务执行的结果
t.get(propagate=False) #如果任务执行中出现异常,在client端不会异常退出
t.ready()#查看任务是否执行完毕
t.traceback #打印异常详细信息

3.2项目中创建celery

在当前的目录下创建文件夹celery_pro

mkdir celery_pro

在此目录下创建两个文件

目录结构:

celery_proj
    /__init__.py
    /celery.py
    /tasks.py

celery.py(定义了celery的一些元信息)

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery(‘proj‘,
             broker=‘redis://localhost‘,   #消息中间接收
             backend=‘redis://localhost‘, #消息结果存放
             include=[‘proj.tasks‘])          #执行任务的文件   

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == ‘__main__‘:
    app.start()

tasks.py (定义任务执行的具体逻辑和调用的具体方法)

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

启动worker

celery -A celery_pro worker -l debug

再另一个窗口打开python命令模式进行测试

from celery_pro import tasks

t = tasks.add.delay(3,4)
t.get()

Celery的分布式:多启动worker就可以自动实现负载均衡,无需手动管理

Celery永驻后台(开启&重启&关闭)

celery multi start w1 -A celery_pro -l info  #开启后台celery任务
celery  multi restart w1 -A proj -l info #重启该服务
celery multi stop w1 -A proj -l info #关闭该服务

3.3 celery定时任务

在celery_pro文件夹下创建periodic_tasks.py

目录结构:

celery_proj
    /__init__.py
    /celery.py
    /tasks.py
    /periodic_tasks.py

文件内容如下:

from __future__ import absolute_import, unicode_literals
from .celery import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test(‘hello‘) every 10 seconds.
    sender.add_periodic_task(10.0, test.s(‘hello‘), name=‘add every 10‘)

    # Calls test(‘world‘) every 30 seconds
    sender.add_periodic_task(30.0, test.s(‘world‘), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=21, minute=42, day_of_week=5),
        test.s(‘Happy Mondays!‘),
    )

@app.task
def test(arg):
    print(arg)

修改celery.py,加入periodic_task.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery(‘proj‘,
             broker=‘redis://localhost‘,
             backend=‘redis://localhost‘,
             include=[‘celery_pro.tasks‘,‘celery_pro.periodic_tasks‘])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == ‘__main__‘:
    app.start()

在服务端启动 celery -A celery_pro worker -l debug

在客户端启动 celery -A celery_pro.periodic_tasks beat -l debug

在服务端如果看到打印的hell ,world说明定时任务配置成功

上面是通过调用函数添加定时任务,也可以像写配置文件 一样的形式添加, 下面是每30s执行的任务

在celery.py中添加

app.conf.beat_schedule = {
    ‘add-every-30-seconds‘: {
        ‘task‘: ‘cerely_pro.tasks.add‘, #执行的具体方法
        ‘schedule‘: 5.5,  #每秒钟执行
        ‘args‘: (16, 16)   #执行的具体动作的参数
    },
}
app.conf.timezone = ‘UTC‘

更多定制

上面的定时任务比较简单,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间

rom celery.schedules import crontab

app.conf.beat_schedule = {
    #在每周一早上7:30执行
    ‘add-every-monday-morning‘: {
        ‘task‘: ‘celery_pro.tasks.add‘,
        ‘schedule‘: crontab(hour=7, minute=30, day_of_week=1),
        ‘args‘: (16, 16),
    },

还有更多定时配置方式如下:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour=‘*/3‘) Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour=‘0,3,6,9,12,15,18,21‘)
Same as previous.
crontab(minute=‘*/15‘) Execute every 15 minutes.
crontab(day_of_week=‘sunday‘) Execute every minute (!) at Sundays.
crontab(minute=‘*‘,
hour=‘*‘,day_of_week=‘sun‘)
Same as previous.
crontab(minute=‘*/10‘,
hour=‘3,17,22‘,day_of_week=‘thu,fri‘)
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour=‘*/2,*/3‘) Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour=‘*/5‘) Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour=‘*/3,8-17‘) Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month=‘2‘) Execute on the second day of every month.
crontab(0, 0,
day_of_month=‘2-30/3‘)
Execute on every even numbered day.
crontab(0, 0,
day_of_month=‘1-7,15-21‘)
Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month=‘11‘,
month_of_year=‘5‘)
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year=‘*/3‘)
Execute on the first month of every quarter.

3.4Celery+Django实现异步任务分发

1.在setting.py的文件同一级别创建celery.py

 1 from __future__ import absolute_import, unicode_literals
 2 import os
 3 from celery import Celery
 4
 5 # 设置Django的环境变量
 6 os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘PerfectCRM.settings‘)
 7
 8 #设置app的默认处理方式,如果不设置默认是rabbitMQ
 9 app = Celery(‘proj‘,
10              broker=‘redis://localhost‘,
11              backend=‘redis://localhost‘
12 )
13
14 #配置前缀
15 app.config_from_object(‘django.conf:settings‘, namespace=‘CELERY‘)
16
17 #自动扫描app下的tasks文件
18 app.autodiscover_tasks()
19
20
21 @app.task(bind=True)
22 def debug_task(self):
23     print(‘Request: {0!r}‘.format(self.request))

2.修改当前目录下的__init__文件

1 from __future__ import absolute_import, unicode_literals
2
3 #启动时检测celery文件
4 from .celery import app as celery_app
5
6 __all__ = [‘celery_app‘]

3.在app下新增tasks文件,写要执行的任务

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

4.在另一个app下新增tasks文件

1 from __future__ import absolute_import, unicode_literals
2 from celery import shared_task
3 import time,random
4
5 @shared_task
6 def randnum(start, end):
7     time.sleep(3)
8     return random.ranint(start,end)

5.在views下增加处理逻辑

 1 from crm import tasks
 2 from celery.result import AsyncResult
 3 import random
 4 #计算结果
 5 def celery_call(request):
 6     randnum =random.randint(0,1000)
 7     t = tasks.add.delay(randnum,6)
 8     print(‘randum‘,randnum)
 9     return HttpResponse(t.id)
10
11 #获取结果
12 def celery_result(request):
13     task_id = request.GET.get(‘id‘)
14     res = AsyncResult(id=task_id)
15     if res.ready():
16         return HttpResponse(res.get())
17     else:
18         return HttpResponse(res.ready())

6.测试

首先启动Django,从web端输入url调用celery_call方法

例:http://192.168.17.133:9000/crm/celery_call,此方法会返回一个task_id(41177118-3647-4830-b8c8-7be76d9819d7)

带着这个task_id 访问http://192.168.17.133:9000/crm/celery_result?id=41177118-3647-4830-b8c8-7be76d9819d7如果可以看到结果说明配置成功

Dnango+Celery实现定时任务

  • 安装Django,Celery中间件

    pip3 install django-celery-beat

  • 在Django的settings文件中,新增app,名称如下

   INSTALLED_APPS = (

    .....,

    ‘django_celery_beat‘, #新增的app

    )

  • 输入命令

    python manage.py migrate #创建与Django有关定时计划任务的新表

  • 通过celery beat开启定时任务

    celery -A PrefectCRM beat -l info -S django

  • 启动Django服务,进入admin配置页面

    python3 manager.py runserver 0.0.0.0:9000

    并设置settings.py中的

    ALLOW_HOSTS=[‘*‘]

  • 可以在原有业务表的基础之上看到新的三张表

    

后记:经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到

原文地址:https://www.cnblogs.com/wangshuyang/p/9067230.html

时间: 2024-10-10 07:35:16

python之celery队列模块的相关文章

Python 的消息队列模块(pika)

parameters = pika.ConnectionParameters('localhost',5672,'/',credentials    )     #参数 connection = pika.BlockingConnection(parameters)    #连接 channel = connection.channel() # 连接之后的 channel(引导) channel.queue_declare(queue='hello')    # 申明 print ' [*] W

Python Celery队列

Celery队列简介: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery. 使用场景: 1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如

【理论】python使用celery异步处理请求

Flask中使用celery队列处理执行时间较长的请求. 一. 安装celery pip install celery flask redis 二. celery简介 Celery是个异步分布式任务队列 通过Celery在后台跑任务并不像线程那么简单,但是用Celery的话,能够是应用有较好的扩展性,因为Celery是个分布式架构,下面介绍Celery的三个核心组件: 1. 生产者(Celery client): 生产者发送消息,在Flask上工作时,生产者在Flask应用内运行 2. 消费者(

python之消息队列

引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题.消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC).本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一. RabbitMQ简介 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源

Python:使用threading模块实现多线程(转)

分类: python   标签: thread    评论: 暂无评论   阅读:5,420 views 综述 Python这门解释性语言也有专门的线程模型,Python虚拟机使用GIL(Global Interpreter Lock,全局解释器锁)来互斥线程对共享资源的访问,但暂时无法利用多处理器的优势. 在Python中我们主要是通过thread和 threading这两个模块来实现的,其中Python的threading模块是对thread做了一些包装的,可以更加方便的被使用,所以我们使用

python基础31[常用模块介绍]

python基础31[常用模块介绍] python除了关键字(keywords)和内置的类型和函数(builtins),更多的功能是通过libraries(即modules)来提供的. 常用的libraries(modules)如下: 1)python运行时服务 * copy: copy模块提供了对复合(compound)对象(list,tuple,dict,custom class)进行浅拷贝和深拷贝的功能. * pickle: pickle模块被用来序列化python的对象到bytes流,从

Python高级数据结构-Collections模块

在Python数据类型方法精心整理,不必死记硬背,看看源码一切都有了之中,认识了python基本的数据类型和数据结构,现在认识一个高级的:Collections 这个模块对上面的数据结构做了封装,增加了一些很酷的数据结构,比如: a)Counter: 计数器,用于统计元素的数量 b)OrderDict:有序字典 c)defaultdict:值带有默认类型的字典 d)namedtuple:可命名元组,通过名字来访问元组元素 e)deque :双向队列,队列头尾都可以放,也都可以取(与单向队列对比,

Python之进程 - multiprocessing模块

? 我们已经了解了,运行中的程序就是一个进程.所有的进程都是通过它的父进程来创建的.因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程.多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快.以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块. ? 仔细说来,multiprocess不是一个模块而是python中一个操作.管理进程的包. 之所以叫multi是取自multipl

Python 运维常用模块

基础库:sys.os(os.path.os.stat).time.logging.prarmiko.re.random Python运维常用的20个库 1.psutil是一个跨平台库(https://github.com/giampaolo/psutil)能够实现获取系统运行的进程和系统利用率(内存,CPU,磁盘,网络等),主要用于系统监控,分析和系统资源及进程的管理. 2.IPy(http://github.com/haypo/python-ipy),辅助IP规划. 3.dnspython(h