python:利用celery分布任务

Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。celery看起来似乎很庞大。celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。 celery的特点是:

  简单,易于使用和维护,有丰富的文档。

  高效,单个celery进程每分钟可以处理数百万个任务。

  灵活,celery中几乎每个部分都可以自定义扩展。

celery非常易于集成到一些web开发框架中。

任务队列是一种跨线程、跨机器工作的一种机制。

任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理。

celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。

一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

安装celery

pip install -U Celery

celery支持多种消息中介

其中最完备的是RabbitMQ和Redis。

pip install -U flower #安装任务监控工具

usage: celery <command> [options]

可选参数

Global Options:
-A APP, --app APP
-b BROKER, --broker BROKER
--result-backend RESULT_BACKEND
--loader LOADER
--config CONFIG
--workdir WORKDIR
--no-color, -C
--quiet, -q

具体实现简单的任务,我这里使用的rabbitmq作为borker

#addtask.pyfrom celery import Celery

app = Celery("addtask",borker="amqp://admin:[email protected]//")  #使用rabbitmq

@app.task
def add(x,y):
    return x + y

第二个脚本

#run.py
import addtask

if __name__ == "__main__":
    result = addtask.add.delay(5,5)   #delay是apply_async()方法的快件方式让我们更好的执行任务。#my_task.apply_async((2, 2), queue=‘my_queue‘, countdown=10) 任务my_task将会被发送到my_queue队列中,并且在发送10秒之后执行
print(result) #result.result 获取结果
运行celery服务
celery -A addtask worker --loglevel=info
使用redis
#tasks.py
from celery import Celery

# 我们这里案例使用redis作为broker
app = Celery(‘demo‘, broker=‘redis://:[email protected]/1‘)

# 创建任务函数
@app.task
def my_task():
    print("任务函数正在执行....")
celery -A tasks worker --loglevel=info
#run.py
import tasks

from tasks import my_task
my_task.delay()
使用Redis作为存储结果的方案,任务结果存储配置我们通过Celery的backend参数来设定。我们将tasks模块修改如下:

from celery import Celery

# 我们这里案例使用redis作为broker
app = Celery(‘demo‘,
             backend=‘redis://:[email protected]:6379/2‘,
             broker=‘redis://:[email protected]:6379/1‘)

# 创建任务函数
@app.task
def my_task(a, b):
    print("任务函数正在执行....")
    return a + b

配置celery

通过APP配置celery

from celery import Celery
app = Celery(‘demo‘)
# 增加配置
app.conf.update(
    result_backend=‘redis://:[email protected]:6379/2‘,
    broker_url=‘redis://:[email protected]:6379/1‘,
)

转有配置文件

下面我们在tasks.py模块 同级目录下创建配置模块celeryconfig.py:
result_backend = ‘redis://:[email protected]:6379/2‘
broker_url = ‘redis://:[email protected]:6379/1‘

tasks.py

from celery import Celery
import celeryconfig

# 我们这里案例使用redis作为broker
app = Celery(‘demo‘)

# 从单独的配置模块中加载配置
app.config_from_object(‘celeryconfig‘)

原文地址:https://www.cnblogs.com/thotf/p/11136856.html

时间: 2024-10-17 05:20:01

python:利用celery分布任务的相关文章

python利用企业微信api来进行发送自定义报警的类实现

python利用企业微信api来进行发送自定义报警的类实现 企业微信注册 打开http://work.weixin.qq.com/企业微信主页: 点击企业注册: 填写相关信息,营业执照和注册号可以不用填,直接下一步,按照提示操作即可: 注册完成后,登陆,就显示如下界面: 点击我的企业标签: 看到如上界面,复制CorpID对应的值: 点击企业应用: 点击 创建应用: 填写对应内容,点击创建应用即可: 然后再点击企业应用,就可以在自建应用里看到自己创建的应用: 点击应用图标,看到如下图 复制Agen

【理论】python使用celery异步处理请求

Flask中使用celery队列处理执行时间较长的请求. 一. 安装celery pip install celery flask redis 二. celery简介 Celery是个异步分布式任务队列 通过Celery在后台跑任务并不像线程那么简单,但是用Celery的话,能够是应用有较好的扩展性,因为Celery是个分布式架构,下面介绍Celery的三个核心组件: 1. 生产者(Celery client): 生产者发送消息,在Flask上工作时,生产者在Flask应用内运行 2. 消费者(

python利用pymssql链接sqlserver数据库

1.引入pymssql包 import pymssql 2.建立连接: conn = pymssql.connect(server=conf['db_address'], port="1433",user=conf['db_username'], password=conf['db_password'], database=conf['db_name'], charset="UTF-8") cursor = conn.cursor() 3.执行SQL语句: curs

杂项之python利用pycrypto实现RSA

杂项之python利用pycrypto实现RSA 本节内容 pycrypto模块简介 RSA的公私钥生成 RSA使用公钥加密数据 RSA使用私钥解密密文 破解博客园登陆 pycrypto模块简介 pycrypto模块是python中用来处理加密解密等信息安全相关的一个很重要模块. 该模块支持的加密方式: 对称加密方式: AES DES ARC4 散列值计算: MD5 SHA HMAC 公钥加密和签名: RSA DSA 基本上常见的关于信息安全类的算法都可以支持,所以,这是一个很强大的模块. 安装

hbase之python利用thrift操作hbase数据和shell操作

前沿: 以前都是用mongodb的,但是量大了,mongodb显得不那么靠谱,改成hbase撑起一个量级. HBase是Apache Hadoop的数据库,能够对大型数据提供随机.实时的读写访问.HBase的目标是存储并处理大型的数据.HBase是一个开源的,分布式的,多版本的,面向列的存储模型.它存储的是松散型数据. HBase提供了丰富的访问接口. HBase Shell Java clietn API Jython.Groovy DSL.Scala REST Thrift(Ruby.Pyt

C++和python利用struct结构传输二进制数据实现

网络编程中经常会涉及到二进制数据传输的问题,在C++中常用的传输方式有文本字符串和结构体封包.如果能将要发送的数据事先放进连续的内存区,然后让send函数获取这片连续内存区的首地址就可以完成数据的发送了,文本字符串如char型数组,以字节为单位,在内存中是顺序存储的,所以可以直接用send函数发送.但是如果要同时发送多个不同类型的数据时,它们在内存中存储的地址是随机的,不是顺序存储的,而且它们之间的相对位置也无法确定,这样就需要一种数据组织方式来明确各数据之间的相对位置.结构体显然就是一种的数据

【Python】Python利用有道翻译开发API应用示例

Python源码是关于Python利用有道翻译开发API应用示例.这是一个很有意思又简单的API应用练习题,方法中用到了有道词典开放API应用,合成的类似于命令行词典应用Python小程序.功能简单,但效果却很好. 这里要注意的是:有道API的请求频率限制,限制频率为每小时1000次,如果超过限制会被封禁. 提示:如果想一直用这个可以自己申请一个KEY,申请的过程非常简单的,只要替换原有的KEY_FROM和KEY就可以了. Python利用有道翻译开发API应用示例,源码如下: #!/usr/b

python 关于celery的异步任务队列的基本使用(celery+redis)【采用配置文件设置】

工程结构说明: __init__.py:实例化celery,并加载配置模块 celeryconfig.py:配置模块 task1:任务1,实现加法 task2:任务2,实现乘法 app.py:应用,任务生产者 1.__init__.py:实例化celery,并加载配置模块 # -*- coding: utf-8 -*- from celery import Celery myapp=Celery('demo') #通过Celery实例加载配置模块celeryconfig.py myapp.con

[Python] 利用commands模块执行Linux shell命令

用Python写运维脚本时,经常需要执行linux shell的命令,Python中的commands模块专门用于调用Linux shell命令,并返回状态和结果,下面是commands模块的3个主要函数: 1. commands.getoutput('shell command') 执行shell命令,返回结果(string类型) >>> commands.getoutput('pwd') '/home/oracle' 2. commands.getstatus('file') 该函数