Celery的实践指南

celery原理:

celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信。

典型的场景为:

  1. 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker。
  2. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行。

实践中的典型场景:

  1. 简单的定时任务:

    1. 替换crontab的celery写法:

      1. from celery import Celery
        from celery.schedules import crontab

        app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")

        app.conf.update(CELERYBEAT_SCHEDULE = {
            "add": {
                "task": "celery_demo.add",
                "schedule": crontab(minute="*"),
                "args": (16, 16)
            },
        })

        @app.task
        def add(x, y):
            return x + y

    2. 运行celery的worker,让他作为consumer运行,自动从broker上获得任务并执行。
      1. `celery -A celery_demo worker`
    3. 运行celery的client,让其根据schedule,自动生产出task msg,并发布到broker上。
      1. `celery -A celery_demo beat`
    4. 安装并运行flower,方便监控task的运行状态
      1. `celery flower -A celery_demo`
      2. 或者设置登录密码 `

        celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
  2. 多同步任务-链式任务-
  3. 失败自动重试的task
    1. 失败重试方法: 将task代码函数参数增加self,同时绑定bind。
    2. demo代码:
      1. @app.task(bind=True, default_retry_delay=300, max_retries=5)
        def my_task_A(self):
            try:
                print("doing stuff here...")
            except SomeNetworkException as e:
                print("maybe do some clenup here....")
                self.retry(e)
    3. 自动重试后,是否将任务重新入queue后排队,还是等待指定的时间?可以通过self.retry()参数来指定。
  4. 派发到不同Queue队列的task
    1. 一个task自动映射到多个queue中的方法, 通过配置task和queue的routing_key命名模式。

      1. 比如:把queue的exchange和routing_key配置成通用模式:
      2. 再定义task的routing_key的名称:
    2. 可用的不同exchange策略:
      1. direct:直接根据定义routing_key
      2. topic:exchange会根据通配符来将一个消息推送到多个queue。
      3. fanout:将消息拆分,分别推送到不同queue,通常用于超大任务,耗时任务。
    3. 参考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
  5. 高级配置
    1. result是否保存
    2. 失败邮件通知:
    3. 关闭rate limit:
  6. auto_reload方法(*nix系统):
    1. celery通过监控源代码目录的改动,自动地进行reload
    2. 使用方法:1.依赖inotify(Linux) 2. kqueue(OS X / BSD)
    3. 安装依赖:

      $ pip install pyinotify
    4. (可选) 指定fsNotify的依赖:

      $ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
    5. 启动: celery -A appname worker --autoreload
  7. auto-scale方法:
    1. 启用auto-scale
    2. 临时增加worker进程数量(增加consumer):

      $ celery -A proj control add_consumer foo -d worker1.local
    3. 临时减少worker进程数量(减少consumer):
  8. 将scheduled task的配置从app.conf变成DB的方法:
    1. 需要在启动时指定custom schedule 类名,比如默认的是: celery.beat.PersistentScheduler 。

      1.  celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
  9. 启动停止worker的方法:
    1. 启动 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing

      1. root用户可以使用celeryd
      2. 非特权用户:celery multi start worker1 -A appName  —autoreload  --pidfile="$HOME/run/celery/%n.pid"  --logfile="$HOME/log/celery/%n.log"
      3. 或者 celery worker —detach
    2. 停止
    3. ps auxww | grep ‘celery worker‘ | awk ‘{print $2}‘ | xargs kill -9
  10. 与Flask集成的方法
    1. 集成后flask将充当producer来创建并发送task给broker,在celery启动的独立worker进程将从broker中获得task并执行,同时将结果返回。
    2. flask中异步地获得task结果的方法:add.delay(x,y),有时需要对参数进行命名后传递 或者 add.apply_async(args=(x,y), countdown=30)
    3. flask获得
  11. 与flask集成后的启动问题
    1. 由于celery的默认routing_key是根据生产者在代码中的import级别来设定的,所以worker端在启动时应该注意其启动目录应该在项目顶级目录上,否者会出现KeyError。
  12. 性能提升: eventlet 和 greenlet

官方参考:http://docs.celeryproject.org/en/latest/userguide/index.html

时间: 2024-08-03 11:35:32

Celery的实践指南的相关文章

操作系统CnetOS_7—systemd管理实践指南

systemd管理实践指南 管理systemd CentOS 7 使用systemd替换了SysV.Systemd目的是要取代Unix时代以来一直在使用的init系统,兼容SysV和LSB的启动脚本,而且够在进程启动过程中更有效地引导加载服务. systemctl命令是系统服务管理器指令,它实际上将 service 和 chkconfig 这两个命令组合到一起. Systemd :系统启动和服务器守护进程管理器,负责在系统启动或运行时,激活系统资源,服务器进程和其它进程 Systemd 新特性:

[转]《一线架构师实践指南》—— 读后总结

原文:<一线架构师实践指南>—— 读后总结 之前总觉得架构是一件很高大上的工作,跟普通的编码设计不太一样.前一段实践,自己也尝试做过架构的工作,可惜经验不足导致架构非常混乱.这里读完这本书,大体上对架构的工作有所了解,也稍微摸清了些门道. 我理解的架构 我理解的架构就是基于某些需求,设计代码的基础框架.既然是基于需求,那么肯定要面临不少需求的扩展以及变更,这时就需要架构能够灵活方便的适应变化.因此,架构的工作我的理解更多的是提前预料到未来的变化,提前做好改变的准备. 架构设计的大体思路为: 时

渗透测试实践指南(1)

各种编码方案 base64 url html unicode 渗透测试实践指南(2016/1/14) 3.3 端口扫描 3.4 漏洞扫描 工具:Nessus 3.5 如何实战 4 漏洞利用 4.2 工具:Medusa(暴力破解,以破解ssh为例),Hydra 4.3 工具:Metasploit介绍使用 4.4 john the ripper 破解系统密码 (windows linux) 4.5 密码重置 4.6 嗅探网络流量 (受限于广播或hub) 4.7 macof:泛洪攻击交换机 工具集:D

机器学习实践指南:案例应用解析

试读及购买链接 <机器学习实践指南:案例应用解析>是机器学习及数据分析领域不可多得的一本著作,也是为数不多的既有大量实践应用案例又包括算法理论剖析的著作,作者针对机器学习算法既抽象复杂又涉及多门数学学科的特点,力求理论联系实际,始终以算法应用为主线,由浅入深以全新的角度诠释机器学习. 前 言第一部分 准备篇第1章 机器学习发展及应用前景 21.1 机器学习概述 21.1.1 什么是机器学习 31.1.2 机器学习的发展 31.1.3 机器学习的未来 41.2 机器学习应用前景 51.2.1 数

PYTHON 最佳实践指南(转)

add by zhj: 本文参考了The Hitchhiker's Guide to Python,当然也加入了作者的一些东西.The Hitchhiker's Guide to Python 的github地址是https://github.com/kennethreitz/python-guide,貌似还能用pip安装该包.先占个坑,后面有时间再把文章转过来 原文:PYTHON 最佳实践指南

20个设计数据库的最佳实践指南

数据库设计看上去很简单,但是如果不经意随意设计,可能会为日后维护拓展或性能方面埋下祸根.以下是20个设计数据库的最佳实践指南: 1. 使用完整的一致的数据表名称和字段名,如:School, StudentCourse, CourseID 2.数据表名称使用单数,比如使用StudentCourse 而不是StudentCourses,数据表代表实体的一个集合,因此没有必要使用复数名称. 3. 数据表名称不要使用空格,比如StudentCourse 比Student Course更好. 4.数据表名

数据仓库专题(16)-分布式数据仓库实践指南-目录篇

前言: 准备系统化整理一套分布式数据仓库建模实践指南,先把目录列出来吧,算是给自己设计一个目标吧. 第一部分 基础篇 第一章 数据仓库概念与定义 1.1 数据管理体系 1.2 数据仓库概念 1.3 数据仓库职责 第二章 数据仓库体系结构 2.1 Inmon CIF 2.2 Kimball 2.3 对于与分析 第三章 维度建模基础 3.1 kimball四步建模法 3.2 维度设计 3.3 事实表设计 第二部分 实践篇 第一章 路线图 第二章 业务分析-深浅有度 第三章 数据分析-区别对待 第四章

celery最佳实践

作为一个Celery使用重度用户,看到Celery Best Practices这篇文章,不由得菊花一紧.干脆翻译出来,同时也会加入我们项目中celery的实战经验. 至于Celery为何物,看这里Celery. 通常在使用Django的时候,你可能需要执行一些长时间的后台任务,没准你可能需要使用一些能排序的任务队列,那么Celery将会是一个非常好的选择. 当把Celery作为一个任务队列用于很多项目中后,作者积累了一些最佳实践方式,譬如如何用合适的方式使用Celery,以及一些Celery提

lua游戏开发实践指南学习笔记1

本文是根据lua游戏开发实践指南做的一些学习笔记,仅用于继续自己学习的一些知识. Lua基础 1.  语言定义: 在lua语言中,标识符有很大的灵活性(变量和函数名),不过用户不呢个以数字作为起始符,也要避免下划线(_)接大写字母,因为这种格式为lua自身保留如_Start. 建议用户使用如下格式和命名规则来定义变量.常量和函数名: ①  常量用全大写和下划线,例如:MY_CONSTANT ②  变量第一个字母小写,例如:myVariable ③  全角变量第一个字母用小写g表示,例如:gMyG