flask celery 使用方法

一、安装

由于celery4.0不支持window,如果在window上安装celery4.0将会出现下面的错误
flask_clery

你现在只能安装
pip install celery==3.1

二、安装py for redis 模块

pip install redis

三、安装redis服务

网上很多文章都写得模棱两可,把人坑的不要不要的!!!

Redis对于Linux是官方支持的,但是不支持window,网上很多作者写文章都不写具体的系统环境,大多数直接说pip install redis就可以使用redis了,真的是坑人的玩意,本人深受其毒害

对于windows,如果你需要使用redis服务,那么进入该地址下载
https://github.com/MSOpenTech/redis/releases

redis安装包,双击完成就可以了

如果你在window上不安装该redis包,将会提示

redis.exceptions.ConnectionError: Error 10061 connecting to localhost:6379.

或者

redis.exceptions.ConnectionError

需要注意是:安装目录不能安装在C盘,否则会出现权限依赖错误

四、添加redis环境变量

D:\Program Files\Redis

五、初始化redis

进入redis安装目录,打开cmd运行命令redis-server.exe redis.windows.conf,如果出错

  • 双击目录下的redis-cli.exe
  • 在出现的窗口中输入shutdown
  • 继续输入exit

六、lask 集成celyer

在Flask配置中添加配置

123
 # Celery 配置CELERY_BROKER_URL = ‘redis://localhost:6379/0‘ # broker是一个消息传输的中间件CELERY_RESULT_BACKEND = ‘redis://localhost:6379/1‘ # 任务执行器
  • 在flask工程的__init__目录下生产celery实例
    注意!!以下代码必须在 flask app读取完配置文件后编写,否则会报错
123456789101112131415161718
def make_celery(app):    celery = Celery(app.import_name, broker=app.config[‘CELERY_BROKER_URL‘],                    backend=app.config[‘CELERY_RESULT_BACKEND‘])    celery.conf.update(app.config)    TaskBase = celery.Task

    class ContextTask(TaskBase):        abstract = True

        def __call__(self, *args, **kwargs):            with app.app_context():                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask    return celery

celery = make_celery(app)

完整示例如下

123456789101112131415161718192021
app = Flask(__name__)app.config.from_object(config[‘default‘])

def make_celery(app):    celery = Celery(app.import_name, broker=app.config[‘CELERY_BROKER_URL‘],                    backend=app.config[‘CELERY_RESULT_BACKEND‘])    celery.conf.update(app.config)    TaskBase = celery.Task

    class ContextTask(TaskBase):        abstract = True

        def __call__(self, *args, **kwargs):            with app.app_context():                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask    return celery

celery = make_celery(app)

在cmd中启动celery服务

执行命令celery -A your_application.celery worker loglevel=info,your_application为你工程的名字,在这里为 get_tieba_film

调用

123456789101112131415
@app.route(‘/‘)@app.route(‘/index‘)def index():    print ("耗时的任务")    # 任务已经交给异步处理了    result = get_film_content.apply_async(args=[1])    # 如果需要等待返回值,可以使用get()或wait()方法    # result.wait()    return ‘耗时的任务已经交给了celery‘

@celery.task()def get_film_content(a):    util = SpiderRunUtil.SpiderRun(TieBaSpider.FilmSpider())    util.start()

绑定

一个绑定任务意味着任务函数的第一个参数总是任务实例本身(self),就像 Python 绑定方法类似:

123
@task(bind=True) def add(self, x, y):     logger.info(self.request.id)

任务继承

任务装饰器的 base 参数可以声明任务的基类

1234567
import celeryclass MyTask(celery.Task):    def on_failure(self, exc, task_id, args, kwargs, einfo):        print(‘{0!r} failed: {1!r}‘.format(task_id, exc))@task(base=MyTask)def add(x, y):    raise KeyError()

任务名称

每个任务必须有不同的名称。
如果没有显示提供名称,任务装饰器将会自动产生一个,产生的名称会基于这些信息:
1)任务定义所在的模块,
2)任务函数的名称

显示设置任务名称的例子:

12345
>>> @app.task(name=‘sum-of-two-numbers‘)>>> def add(x, y):... return x + y>>> add.name‘sum-of-two-numbers‘

最佳实践是使用模块名称作为命名空间,这样的话如果有一个同名任务函数定义在其他模块也不会产生冲突。

123
>>> @app.task(name=‘tasks.add‘)>>> def add(x, y):... return x + y

七、安装flower

将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现

1
pip install flower

启动flower(默认会启动一个webserver,端口为5555):

1
celery flower --address=127.0.0.1 --port=5555

进入http://localhost:5555即可查看。

doc

八、常见错误

1
ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0:

原因是:redis-server 没有启动
解决方案:到redis安装目录下执行redis-server.exe redis.windows.conf
检查redis是否启动:redis-cli ping

1
line 442, in on_task_received

解决:

12345678910
Did you remember to import the module containing this task?Or maybe you are using relative imports?Please see http://bit.ly/gLye1c for more information.The full contents of the message body was:{‘timelimit‘: (None, None), ‘utc‘: True, ‘chord‘: None, ‘args‘: [4, 4], ‘retries‘: 0, ‘expires‘: None, ‘task‘: ‘main.add‘, ‘callbacks‘: None,‘errbacks‘: None, ‘taskset‘: None, ‘kwargs‘: {}, ‘eta‘: None, ‘id‘: ‘97000322-93be-47e9-a082-4620e123dc5e‘} (210b)Traceback (most recent call last):  File "d:\vm_env\flask_card\lib\site-packages\celery\worker\consumer.py", line 442, in on_task_received    strategies[name](message, body,KeyError: ‘main.add‘

原因:任务没有注册或注册不成功,只有在启动的时候提示有任务的时候,才能使用该任务
flask_celery
解决:

  1. 你在那个类中使用celery就在哪个类中执行celery -A 包名.类名.celery worker -l info
  2. 根据上一部提示的任务列表给任务设置对应的名称
    如在Test中
123456
from main import app, celery

@celery.task(name="main.Test.add".)def add(x, y):    print "ddddsws"    return x + y

目录结构:

123456
+ Card  # 工程    + main       + admin          - Task.py        - __init__.py        - Test.py

则应该启动的命令为:

1
celery -A main.Test.celery worker -l info

同时,如果你的Task.py也有任务,那么你还应该重新创建一个cmd窗口执行

1
celery -A main.admin.Task.celery worker -l info

celery的工作进程可以创建多个
flask_celery
flask_celery

参考链接

转自:https://www.laoyuyu.me/2018/02/10/python_flask_celery/

celery用户指南,强烈推荐看

redis安装

celery使用
https://redis.io/topics/quickstart

原文地址:https://www.cnblogs.com/huchong/p/9078661.html

时间: 2024-11-13 09:44:14

flask celery 使用方法的相关文章

flask celery 安装说明

操作系统环境 CentOS 7.4 X64 rabbitmq-server # yum install -y epel-release # yum install erlang # yum install -y rabbitmq-server 也可以添加-detached属性来后台运行 rabbitmq-server -detached 不要kill停止RabbitMQ,使用rabbitmqctl命令 rabbitmqctl stop 配置用户和权限 [[email protected] sof

Flask实战第67天:Flask+Celery实现邮件和短信异步发送

之前在项目中我们发送邮件和 短信都是阻塞的,现在我们来利用Celery来优化它们 官方使用文档: http://flask.pocoo.org/docs/1.0/patterns/celery/ redis服务器及插件,还有cerely在上节我们已经安装好,这里就不重复过程了. 首先,来完成邮件 在项目下新建tasks.py from flask import Flask import config from celery import Celery from flask_mail import

flask celery 的神坑

一.flask运行在debug模式的时候,celery无法收到flask中发送给celery的异步任务 run.py if __name__ == '__main__': # app.run(host="0.0.0.0", port=8000, debug=True) # 以debug模式运行flask # 使用debug模式时,celery异步任务不能执行,但定时任务可以执行 app.run(host="0.0.0.0", port=8000) task.py f

【译】在Flask中使用Celery

为了在后台运行任务,我们可以使用线程(或者进程). 使用线程(或者进程)的好处是保持处理逻辑简洁.但是,在需要可扩展的生产环境中,我们也可以考虑使用Celery代替线程. Celery是什么? Celery是个异步分布式任务队列. 通过Celery在后台跑任务并不像用线程那么的简单,但是用Celery的话,能够使应用有较好的可扩展性,因为Celery是个分布式架构.下面介绍Celery的三个核心组件. 生产者(Celery client).生产者(Celery client)发送消息.在Flas

python之celery在flask中使用

现在继续学习在集成的框架中如何使用celery. 在Flask中使用celery 在Flask中集成celery需要做到两点: 创建celery的实例对象的名字必须是flask应用程序app的名字,否则celery启动会失败: celery必须能顺利加载初始化文件. celery在flask中初始化 由于celery进程的运行和flask进程的运行是相互独立的,但是在框架中我们希望只使用一份配置文件,这样可以简化配置的工作. from celery import Celery from flas

Celery的实践指南

celery原理: celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信. 典型的场景为: 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行. 实践中的典型场景:

Python Flask框架——全栈开发(知了课堂)

章节1:Flask视图和URL 课时1[虚拟环境]为什么需要虚拟环境06:28 课时2[虚拟环境]virtualenv创建虚拟环境13:55 课时3[虚拟环境]virtualenvwrapper使用16:42 课时4[Flask预热]课程介绍45:34 课时5[Flask预热]Flask课程准备工作11:30 课时6[Flask预热]URL组成部分详解14:02 课时7[Flask预热]web服务器+应用服务器+web应用框架14:16 课时8[Flask URL]第一个flask程序详解24:

Flask插件系列之flask_celery

现在继续学习在集成的框架中如何使用celery. 在Flask中使用celery 在Flask中集成celery需要做到两点: 创建celery的实例对象的名字必须是flask应用程序app的名字,否则celery启动会失败: celery必须能顺利加载初始化文件. celery在flask中初始化 由于celery进程的运行和flask进程的运行是相互独立的,但是在框架中我们希望只使用一份配置文件,这样可以简化配置的工作. from celery import Celery from flas

flask笔记一

最近学习flask,由于web开发方面接触的并不是很多,所以看官方文档有点焦头烂额,好多的概念不理解. <Flask web 开发>比较基础,先用这本书做个入门. 1.Flask实例化对象用来接收客户端的所有请求. from flask import Flaskapp = Flask(__name__) 2.路由实现URL和python函数之间的映射,应用python中的修饰器. @app.route('/')def index():return '<h1>Hello World!