Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。celery看起来似乎很庞大。celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。 celery的特点是:
简单,易于使用和维护,有丰富的文档。
高效,单个celery进程每分钟可以处理数百万个任务。
灵活,celery中几乎每个部分都可以自定义扩展。
celery非常易于集成到一些web开发框架中。
任务队列是一种跨线程、跨机器工作的一种机制。
任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理。
celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。
一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。
安装celery
pip install -U Celery
celery支持多种消息中介
其中最完备的是RabbitMQ和Redis。
pip install -U flower #安装任务监控工具
usage: celery <command> [options]
可选参数
Global Options:
-A APP, --app APP
-b BROKER, --broker BROKER
--result-backend RESULT_BACKEND
--loader LOADER
--config CONFIG
--workdir WORKDIR
--no-color, -C
--quiet, -q
具体实现简单的任务,我这里使用的rabbitmq作为borker
#addtask.pyfrom celery import Celery app = Celery("addtask",borker="amqp://admin:[email protected]//") #使用rabbitmq @app.task def add(x,y): return x + y
第二个脚本
#run.py
import addtask
if __name__ == "__main__":
result = addtask.add.delay(5,5) #delay是apply_async()方法的快件方式让我们更好的执行任务。#my_task.apply_async((2, 2), queue=‘my_queue‘, countdown=10) 任务my_task将会被发送到my_queue队列中,并且在发送10秒之后执行
print(result) #result.result 获取结果
运行celery服务
celery -A addtask worker --loglevel=info
使用redis
#tasks.py from celery import Celery # 我们这里案例使用redis作为broker app = Celery(‘demo‘, broker=‘redis://:[email protected]/1‘) # 创建任务函数 @app.task def my_task(): print("任务函数正在执行....")
celery -A tasks worker --loglevel=info
#run.py import tasks from tasks import my_task my_task.delay()
使用Redis作为存储结果的方案,任务结果存储配置我们通过Celery的backend参数来设定。我们将tasks模块修改如下: from celery import Celery # 我们这里案例使用redis作为broker app = Celery(‘demo‘, backend=‘redis://:[email protected]:6379/2‘, broker=‘redis://:[email protected]:6379/1‘) # 创建任务函数 @app.task def my_task(a, b): print("任务函数正在执行....") return a + b
配置celery
通过APP配置celery
from celery import Celery app = Celery(‘demo‘) # 增加配置 app.conf.update( result_backend=‘redis://:[email protected]:6379/2‘, broker_url=‘redis://:[email protected]:6379/1‘, )
转有配置文件
下面我们在tasks.py模块 同级目录下创建配置模块celeryconfig.py: result_backend = ‘redis://:[email protected]:6379/2‘ broker_url = ‘redis://:[email protected]:6379/1‘
tasks.py
from celery import Celery import celeryconfig # 我们这里案例使用redis作为broker app = Celery(‘demo‘) # 从单独的配置模块中加载配置 app.config_from_object(‘celeryconfig‘)
原文地址:https://www.cnblogs.com/thotf/p/11136856.html