【理论】python使用celery异步处理请求

Flask中使用celery队列处理执行时间较长的请求。

一. 安装celery

pip install celery flask  redis

二. celery简介

Celery是个异步分布式任务队列
通过Celery在后台跑任务并不像线程那么简单,但是用Celery的话,能够是应用有较好的扩展性,因为Celery是个分布式架构,下面介绍Celery的三个核心组件:
1. 生产者(Celery client): 生产者发送消息,在Flask上工作时,生产者在Flask应用内运行
2. 消费者(Celert worker): 消费者用于处理后台任务。消费者可以是本地的也可以是远程的。我们可以在运行Flask的server上运行一个单一的消费者,当业务量上涨之后再去添加更多的消费者
3. 消息传递着(Celery broker): 生产者和消费者的信息交互使用的是消息队列,Celery支持若干方式的消息队列,其中最长用的是RabbitMQ和Redis, 我们在使用过程中使用的Redis

三. redis配置与使用

redis配置文件/etc/redis.conf

1.设置为后台启动
 daemonize yes
2.redis端口设置
 port 6379 # default prot
3.日志文件
 logfile /home/liuyadong/work/log/redis.log
4.数据保存文件
 dir /home/liuyadong/data/redisData

通过下面命令指定配置文件启动redis:
redis-server /etc/redis.conf

通过下面命令测试是否启动成功:
redis-cli -p 6379
下面这样表示成功(进入了命令行模式):
redis 127.0.0.1:6379> 

查看启动端口:
sudo netstat -ntlp | grep 6379
tcp        0      0 127.0.0.1:6379              0.0.0.0:*                   LISTEN      49380/redis-server 

四. celery使用简介

1.Choosing a broker
 最常用的broker包括: RabbitMQ 和 Redis, 我们使用Redis, Redis的安装及启动等查看第二部分

2.intall celery
 pip install celery

3.Application
 使用celery的第一步是创建一个application, 通常叫做‘app‘。具体的创建一个app的代码如下:
 $ cat tasks.py
 #!/usr/bin/env python
 from celery import Celery

 app = Celery(‘tasks‘, broker=‘redis://localhost‘)
 @app.tasks
 def add(x, y):
     return x + y

 Note: Celery第一个参数必须是当前module的模拟购置,本次实例中为:tasks

4.Running the celery worker server
  $ celery -A tasks worker --loglevel=info

5.Calling the tasks
 可以通过delay()或者apply_sync()方法来调用一个task
 >>> from tasks import add
 >>> add.delay(4, 4)

6. Keeping Results
 我们可以将task的执行状态保存起来,可以保存到broker中, 可以通过CELERY_RESULT_BACKEND字段来设置保存结果。
 也可以通过Celery的backend参数来设置
 app.Celery(‘tasks‘, broker=‘redis://localhost‘, backend=‘redis://localhost‘)

 >>> result = add.delay(4, 4)
 可以通过ready()方法来判断程序执行是否完成,执行完成返回True.
 >>> result.ready()
 False

 下面是AsyncResult对象的其他调用方法介绍:
 1) AsyncResult.get(timeout=None, propagate=True, interval=0.5, no_ack=True, follow_parents=True)

 timeout    : 设置一个等待的预操作时间,单位是s, 方法返回执行结果
 propagate  : 如果task执行失败,则Re-taise Exception
 interval   : 等待一定时间重新执行操作,如果使用amqp来存储backend则此参数无效
 no_ack     : Enable amqp no ack (automatically acknowledge message)
              If this is False then the message will not be acked
 follow_parents :  Reraise any exception raised by parent task

 2) AsyncResult.state 或 status属性
 方法返回当前task的执行状态,返回值包括下面多种情况:
 PENDING: task正在等待执行
 STARTED: task已经开始执行了
 RETRY  : task重新执行了,这可能是由于第一次执行失败引起的
 FAILURE: task执行引发了异常,并且结果的属性当中包括了异常是由哪个task引起的
 SUCCESS: task执行成功,结果的属性当中包括执行结果

 3) AsyncResult.success()
 如果返回True,则表示task执行成功

 4) AsyncResult.traceback()
 得到一个执行失败的task的traceback

7.Configuration celert
 默认的配置对于大多数用户来说已经足够好了,但是我们仍有许多想让celery按照我们的想法去work,通过configuration实现是一个好的方式。 configutation可以通过app设置,也可以通过一个单独的模块进行设置。
 比如,通过app设置CELERY_TASK_SERIALIZER属性:app.conf.CELERY_TASK_SERIALIZER = ‘json‘
 如果你一次性有许多需要配置,则可以通过update()方法实现:
 app.conf.update(
    CELERY_TASK_SERIALIZER=‘json‘,
    CELERY_ACCEPT_CONTENT=[‘json‘], # Ignore other content
    CELERY_RESULT_SERIALIZER=‘json‘,
    CELERY_TIMEZONE=‘Europe/Oslo‘,
    CELERY_ENABLE_UTC=True,
 )

 你也可以通过app.config_from_object() method告诉Celery通过一个模块来生成configuration: app.config_from_object(‘celeryconfig‘) 这个模块通常叫做 celeryconfig,但实际上你可以叫任何名字。
  $ cat celeryconfig.py
  CELERY_ROUTES = {‘tasks.add‘: ‘low-priority‘, ‘tasks.add‘: {‘rate_limit‘: ‘10/m‘}

8.Where to go from here
 如果你想了解更多请阅读: http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps
 之后阅读: http://docs.celeryproject.org/en/latest/userguide/index.html#guide
时间: 2024-10-13 11:39:13

【理论】python使用celery异步处理请求的相关文章

python—Celery异步分布式

Celery异步分布式 Celery是一个python开发的异步分布式任务调度模块 Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等 使用redis连接url的格式为: redis://:[email protected]:port/db_number 例如: BROKER_URL = 'redis://localhost:6379/0' 1)huang.py from celery import Celery bro

python 关于celery的异步任务队列的基本使用(celery+redis)【采用配置文件设置】

工程结构说明: __init__.py:实例化celery,并加载配置模块 celeryconfig.py:配置模块 task1:任务1,实现加法 task2:任务2,实现乘法 app.py:应用,任务生产者 1.__init__.py:实例化celery,并加载配置模块 # -*- coding: utf-8 -*- from celery import Celery myapp=Celery('demo') #通过Celery实例加载配置模块celeryconfig.py myapp.con

python中asynchat异步socket命令/响应处理

该模块基于asyncore简化了异步客户端和服务器,并使其更容易元素处理由任意的字符串结束,或者是可变长度的的协议.它提供了抽象类async_chat,提供collect_incoming_data()和found_terminator()方法.循环和asyncore的一样,有2种信道:asyncore.dispatcher和asynchat.async_chat,可以自由混合信道.通常asyncore.dispatcher服务器通道在接收到连接请求时产生新的asynchat.async_cha

Django使用Celery异步任务队列

1  Celery简介 Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行. 任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ.Redis). 1.1  Celery原理 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成. 消息中间件:Celery本身不提供消息服务,但

Celery异步任务框架

目录 Celery架构 消息中间件 任务执行单元 任务结果存储 Celery的安装配置 Celery执行异步任务 使用步骤 立即任务和延时任务 定时任务 使用场景 django项目中使用 celery.py(celery服务) tasks.py(任务) 坑点: Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html Celery 官方文档中文

[gevent源码分析] c-ares异步DNS请求

c-ares是异步DNS请求库,libcurl,libevent,wireshark都使用了c-ares,gevent1.0版本前使用的是libevent, 所以它的DNS请求也是使用c-ares,1.0版本后使用cython封装了c-ares. c-ares官方文档,http://c-ares.haxx.se/docs.html. gevent中DNS默认使用的是线程池版本的,可通过设置GEVENT_RESOLVER=ares环境变量使用c-ares异步库. 如何证明的确是异步呢,试着跑一遍你

springMVC项目异步处理请求的错误Async support must be enabled on a servlet and for all filters involved in async

从github上down下来一个项目,springMVC-chat.作者全是用的注解,也就是零配置. 这可苦了我,经过千辛万苦,最终集成到现在的项目中有一点样子了,结果报出来下面的错误.红色部分.解决方法为,在web.xml中也就是springMVC的总配置文件中加上一句话: <async-supported>true</async-supported> 这句话的位置一定要放正确,否则,一切都是徒劳.至于配置spring异步支持(其实是配置servlet异步支持)的放置位置见下图.

python + pycurl + 高效批量get请求

pycurl包是用C编写的libcurl  python接口,速度高于urllib.Libcurl 是一个支持FTP, FTPS, HTTP, HTTPS, GOPHER, TELNET, DICT, FILE 和 LDAP的客户端URL传输库.libcurl也支持HTTPS认证,HTTP POST,HTTP PUT,FTP上传,代理,Cookies,基本身份验证,FTP文件断点继传,HTTP代理通道等等. def doBatchGet(url_list): """批量get

Android中的异步网络请求

本篇文章我们来一起写一个最基本的Android异步网络请求框架,借此来了解下Android中网络请求的相关姿势.由于个人水平有限,文中难免存在疏忽和谬误,希望大家可以指出,谢谢大家:) 1. 同步网络请求 以HTTP的GET请求为例,我们来介绍一下Android中一个基本的同步请求框架的实现.直接贴代码: public class HttpUtils { public static byte[] get(String urlString) { HttpURLConnection urlConne