在flask中使用celery的实践

前言

在web开发中我们经常会遇到一些耗时的操作,比如发送邮件/短信,执行各种任务等等,这时我们会采取异步的方式去执行这些任务,而celery就是这样的一个异步的分布式任务处理框架,官方文档
今天,我们的主题是celery如何与flask一起工作,我们都知道,flask是一个非常小巧的web框架,有许许多多的扩展,celery也不例外,我们先看下目前常用的几个flask-celery的扩展:

  1. Flask-Celery: celery作者本人开发的,其实不算扩展,功能就是安装celery及其相关组件,这里不谈。
  2. Flask-Celery-Helper:曾经的扩展,作者已不维护,不支持现在的4.0版本
  3. Flask-CeleryExt:支持4.0版本,目前比较好用的扩展

除这些扩展之外,其实flask的官方文档中已经给出了在flask中使用celery的方式,不过,那是一个单文件中运行flask的demo,在实际项目中使用,还是有许多需要注意的地方,接下来,我们就一起探究下如何在flask项目中使用celery。

项目结构

├── celery_task                   # celery任务相关
│?? ├── __init__.py
│?? ├── tasks.py
│?? └── test.py
├── manage.py                     # celery worker实例
├── requirements.txt              # 依赖包
└── test_api                      # flask 项目
    ├── api                       # 蓝本相关
    │?? ├── __init__.py
    │?? └── v1
    │??     ├── __init__.py
    │??     └── views.py
    ├── extensions.py             # 扩展初始化
    ├── __init__.py               # flask app
    ├── models.py                 # 模型文件
    └── settings.py               # 配置文件

官方示例代码

本项目中没有使用扩展,只是基于官方文档中的示例做进一步的应用。

from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config[‘CELERY_RESULT_BACKEND‘],
        broker=app.config[‘CELERY_BROKER_URL‘]
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

这是一个celery的工厂函数,使用flask app中的配置设置celery相关的属性,并且更改了celery对象的Task,使其能够使用flask的应用上下文,这一点非常重要。我们将这段代码放置到flask项目初始化文件中去也就是testapi/__init_\.py

构建celery对象

celerytask/__init_\.py

rom test_api import create_app, make_celery

app = create_app()
celery = make_celery(app)

class MyTask(celery.Task): # celery 基类

    def on_success(self, retval, task_id, args, kwargs):
        # 执行成功的操作
        print(‘MyTasks 基类回调,任务执行成功‘)
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # 执行失败的操作
        # 任务执行失败,可以调用接口进行失败报警等操作
        print(‘MyTasks 基类回调,任务执行失败‘)
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

这里我对Task做了进一步的定制,用于添加一些任务信息。

编写任务


import datetime
import time
import os
import random
from flask import current_app
from test_api.models import User
from test_api.extensions import db

from celery_task import celery, MyTask

@celery.task(bind=True, base=MyTask)
def apptask(self):
    print(current_app.config)
    print("==============%s " % current_app.config["SQLALCHEMY_DATABASE_URI"])
    print("++++++++++++++%s " % os.getenv("DATABASE_URL"))
    time.sleep(5)
    user = User(username="user%s" % random.randint(1,100))
    db.session.add(user)
    db.session.commit()
    return ‘success‘

这个任务很简单,使用User模型类异步向数据库中添加数据,为了体现耗时操作,使用sleep函数模拟。

视图函数中使用

test_api/api/v1/views.py

from flask import jsonify
from celery_task.tasks import apptask
from test_api.api.v1 import api_v1
from test_api.extensions import db
from flask import current_app

@api_v1.route("/", methods=["GET"])
def index():
    r = apptask.apply_async()
    return jsonify({"status": "success"})

视图函数非常的简单,只做了提交任务的操作。

启动并测试

启动celery

为了避免循环导入问题,我们在项目根目录下新建manage.py

from test_api import create_app, make_celery

app = create_app()
celery = make_celery(app)

if __name__ == ‘__main__‘:
    app.run()

这个文件只用来启动celery,启动命令如下:

# celery worker -A manage:celery -l debug

看到如下输出,表明启动成功:

-------------- [email protected] v4.4.0 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2020-03-03 21:14:13
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         test_api:0x7f87c31a4e48
- ** ---------- .> transport:   redis://127.0.0.1:6379/3
- ** ---------- .> results:     redis://127.0.0.1:6379/4
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . celery_task.tasks.apptask

[2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Hub
[2020-03-03 21:14:13,632: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Pool
[2020-03-03 21:14:13,690: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,691: DEBUG/MainProcess] | Worker: Starting Consumer
[2020-03-03 21:14:13,691: DEBUG/MainProcess] | Consumer: Starting Connection
[2020-03-03 21:14:13,708: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3
[2020-03-03 21:14:13,708: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,708: DEBUG/MainProcess] | Consumer: Starting Events
[2020-03-03 21:14:13,718: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:13,718: DEBUG/MainProcess] | Consumer: Starting Mingle
[2020-03-03 21:14:13,718: INFO/MainProcess] mingle: searching for neighbors
[2020-03-03 21:14:14,743: INFO/MainProcess] mingle: all alone
[2020-03-03 21:14:14,743: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,744: DEBUG/MainProcess] | Consumer: Starting Gossip
[2020-03-03 21:14:14,748: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,748: DEBUG/MainProcess] | Consumer: Starting Heart
[2020-03-03 21:14:14,750: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,750: DEBUG/MainProcess] | Consumer: Starting Tasks
[2020-03-03 21:14:14,756: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,756: DEBUG/MainProcess] | Consumer: Starting Control
[2020-03-03 21:14:14,759: DEBUG/MainProcess] ^-- substep ok
[2020-03-03 21:14:14,759: DEBUG/MainProcess] | Consumer: Starting event loop
[2020-03-03 21:14:14,759: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2020-03-03 21:14:14,760: INFO/MainProcess] [email protected] ready.
[2020-03-03 21:14:14,760: DEBUG/MainProcess] basic.qos: prefetch_count->8

启动flask:

# flask run

* Serving Flask app "test_api" (lazy loading)
 * Environment: development
 * Debug mode: on
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 237-492-852

调试接口:

# curl http://127.0.0.1:5000/api/v1/
{
  "status": "success"
}

查看celery日志:

[2020-03-03 21:17:31,330: WARNING/ForkPoolWorker-2]
[2020-03-03 21:17:31,330: DEBUG/MainProcess] Task accepted: celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] pid:2341
[2020-03-03 21:17:36,391: WARNING/ForkPoolWorker-2] MyTasks 基类回调,任务执行成功
[2020-03-03 21:17:36,392: INFO/ForkPoolWorker-2] Task celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] succeeded in 5.0624741315841675s: ‘success‘

任务执行成功,查看数据库数据:

mysql> select * from user order by id;
+----+----------+
| id | username |
+----+----------+
|  1 | user26   |
|  2 | user69   |
|  3 | user71   |
|  4 | user35   |
|  5 | user13   |
|  6 | user54   |
|  7 | user88   |
|  8 | user63   |
|  9 | user87   |
| 10 | user90   |
| 11 | user3    |
| 12 | user18   |
| 13 | user65   |
+----+----------+

数据已被插入,实验成功!

总结

有几个坑希望大家注意下

1. app初始化文件中蓝图导入位置问题引起循环导入,导致import Error

出错文件: testapi/__init_\.py

import os
import click

from flask import Flask, jsonify
from test_api.api.v1 import api_v1   # 蓝图在上方导入,循环报错产生
from test_api.settings import config
from test_api.models import User

from celery import Celery

def make_celery(app):
...
def create_app(config_name=None):
    if config_name is None:
        config_name = os.getenv(‘FLASK_ENV‘, ‘development‘)

    app = Flask(‘test_api‘)
    app.config.from_object(config[config_name])

    register_extensions(app)
    register_blueprints(app)
    register_commands(app)
    register_errors(app)
    return app

# 注册蓝图函数
def register_blueprints(app):
    app.register_blueprint(api_v1, url_prefix=‘/api/v1‘)

启动celery和请求接口时均会报错,错误堆栈如下:

  from test_api import create_app, make_celery
  File "/tmp/test/test_api/__init__.py", line 5, in <module>
    from test_api.api.v1 import api_v1
  File "/tmp/test/test_api/api/v1/__init__.py", line 9, in <module>
    from test_api.api.v1 import views
  File "/tmp/test/test_api/api/v1/views.py", line 2, in <module>
    from celery_task.tasks import apptask
  File "/tmp/test/celery_task/__init__.py", line 1, in <module>
    from test_api import create_app, make_celery
ImportError: cannot import name ‘create_app‘

解决方法:

将蓝图的导入下放置蓝图注册函数中testapi/__init_\.py:

...
def register_blueprints(app):
    from test_api.api.v1 import api_v1
    app.register_blueprint(api_v1, url_prefix=‘/api/v1‘)
...

2. celery无法读取到flask-sqlalemy的连接配置信息

提交任务,celery报错如下:

  ...
  options = self.get_options(sa_url, echo)
  File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 575, in get_options
    self._sa.apply_driver_hacks(self._app, sa_url, options)
  File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 877, in apply_driver_hacks
    if sa_url.drivername.startswith(‘mysql‘):
AttributeError: ‘NoneType‘ object has no attribute ‘drivername‘

通过调试我发现,flask的app的配置是可以拿到的,因为我们在工厂函数中推送了应用上下文,我的数据库配置信息是以键值的形式写在了.env文件中,这也是目前flask推荐的方式。那为什么celery取不到数据库连接配置呢?其实,启动celery的app和我们web服务所用app是两个独立的app,celery无法通过.env中的环境变量取到相应的值,这里有三种解决办法:

  • 不使用环境变量的方式,直接将相关信息写在配置文件中例如: SQLALCHEMY_DATABASE_URI = "mysql+pymysql://xxx:[email protected]:3306/test?charset=utf8"
  • 将配置写到系统环境变量中(/etc/profile)
  • 使用dotenv加载.env中的环境变量

相比之下,方案三是采纳比较多的,于是我们在test_api/settings.py文件中加入如下代码:

from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())

find_dotenv函数会在当前以及父目录中搜寻.env文件,load_dotenv函数则负责加载环境变量。如此,大功告成。我们可以继续愉快撸代码啦。
附:项目源码

原文地址:https://blog.51cto.com/hld1992/2475295

时间: 2024-08-01 19:53:07

在flask中使用celery的实践的相关文章

在Flask中使用Celery

基于Celery的后台任务 在 Flask 中使用 Celery 原文地址:https://www.cnblogs.com/believepd/p/10645208.html

【译】在Flask中使用Celery

为了在后台运行任务,我们可以使用线程(或者进程). 使用线程(或者进程)的好处是保持处理逻辑简洁.但是,在需要可扩展的生产环境中,我们也可以考虑使用Celery代替线程. Celery是什么? Celery是个异步分布式任务队列. 通过Celery在后台跑任务并不像用线程那么的简单,但是用Celery的话,能够使应用有较好的可扩展性,因为Celery是个分布式架构.下面介绍Celery的三个核心组件. 生产者(Celery client).生产者(Celery client)发送消息.在Flas

python之celery在flask中使用

现在继续学习在集成的框架中如何使用celery. 在Flask中使用celery 在Flask中集成celery需要做到两点: 创建celery的实例对象的名字必须是flask应用程序app的名字,否则celery启动会失败: celery必须能顺利加载初始化文件. celery在flask中初始化 由于celery进程的运行和flask进程的运行是相互独立的,但是在框架中我们希望只使用一份配置文件,这样可以简化配置的工作. from celery import Celery from flas

Celery的实践指南

celery原理: celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信. 典型的场景为: 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行. 实践中的典型场景:

celery最佳实践

作为一个Celery使用重度用户,看到Celery Best Practices这篇文章,不由得菊花一紧.干脆翻译出来,同时也会加入我们项目中celery的实战经验. 至于Celery为何物,看这里Celery. 通常在使用Django的时候,你可能需要执行一些长时间的后台任务,没准你可能需要使用一些能排序的任务队列,那么Celery将会是一个非常好的选择. 当把Celery作为一个任务队列用于很多项目中后,作者积累了一些最佳实践方式,譬如如何用合适的方式使用Celery,以及一些Celery提

celery最佳实践(转)

原文:http://my.oschina.net/siddontang/blog/284107 目录[-] 1,不要使用数据库作为你的AMQP Broker 2,使用更多的queue(不要只用默认的) 3,使用具有优先级的workers 4,使用Celery的错误处理机制 5,使用Flower 6,没事别太关注任务退出状态 7,不要给任务传递 Database/ORM 对象 最后 作为一个Celery使用重度用户,看到Celery Best Practices这篇文章,不由得菊花一紧.干脆翻译出

Flask中使用mysql

Flask中使用mysql 先安装相关模块: pip  install  Flask-MySQL 先准备一下数据库 登录: mysql  -u  root  -p 创建Database和创建Table mysql> CREATE DATABASE EmpData; mysql> use EmpData; mysql> CREATE TABLE User( userId INT NOT NULL AUTO_INCREMENT, userName VARCHAR(100) NOT NULL,

flask中&#39;bool&#39; object has no attribute &#39;__call__&#39;问题

#写flask时报错 <ul class="nav navbar-nav"> <li><a href="/">Home</a></li> </ul> </div> <ul class="nav navbar-nav navbar-right"> {% if current_user.is_authenticated() %} <li><

解决Flask中文件操作出现UnicodeDecodeError UnicodeDecodeError: &#39;ascii&#39; codec can&#39;t decode byte 0xe6 in positio

写一个Flask应用的功能时需要读文件,文件内容含指定字符串的话(即有个if key in filecontent的比较)就把文件内容输出到页面,,结果报错UnicodeDecodeError,查阅Flask的文档却似乎讲Flask默认哪里都是utf8编码,可现在却出了个由于字符是utf8而不是ascii报的错 最后解决了 我灵机一点把filecontent解码一下,写成filecontent.decode('utf8'),就顺利的运行了 版权声明:本文为博主原创文章,未经博主允许不得转载. 解