Celery,Tornado,Supervisor构建和谐的分布式系统

Celery 分布式的任务队列

与rabbitmq消息队列的区别与联系:

  • rabbitmq 调度的是消息,而Celery调度的是任务.
  • Celery调度任务时,需要传递参数信息,传输载体可以选择rabbitmq.
  • 利用rabbitmq的持久化和ack特性,Celery可以保证任务的可靠性.

优点:

  • 轻松构建分布式的Service Provider。
  • 高可扩展性,增加worker也就是增加了队列的consumer。
  • 可靠性,利用消息队列的durable和ack,可以尽可能降低消息丢失的概率,当worker崩溃后,未处理的消息会重新进入消费队列。
  • 用户友好,利用flower提供的管理工具可以轻松的管理worker。
    flower
  • 使用tornado-celery,结合tornado异步非阻塞结构,可以提高吞吐量,轻松创建分布式服务框架。
  • 学习成本低,可快速入门

快速入门

定义一个celery实例main.py:

1234
from celery import Celeryapp = Celery(‘route_check‘, include=[‘check_worker_path‘],         broker=‘amqp://user:[email protected]_host:port//‘)app.config_from_object(‘celeryconfig‘)

include指的是需要celery扫描是否有任务定义的模块路径。例如add_task 就是扫描add_task.py中的任务

celery的配置文件可以从文件、模块中读取,这里是从模块中读取,celeryconfig.py为:

12345678910111213141516171819202122232425262728293031323334353637383940
from multiprocessing import cpu_count

from celery import platformsfrom kombu import Exchange, Queue

CELERYD_POOL_RESTARTS = FalseCELERY_RESULT_BACKEND = ‘redis://:[email protected]_host:port/db‘CELERY_QUEUES = (    Queue(‘default‘, Exchange(‘default‘), routing_key=‘default‘),    Queue(‘common_check‘, Exchange(‘route_check‘), routing_key=‘common_check‘),    Queue(‘route_check‘, Exchange(‘route_check‘), routing_key=‘route_check‘, delivery_mode=2),    Queue(‘route_check_ignore_result‘, Exchange(‘route_check‘), routing_key=‘route_check_ignore_result‘,          delivery_mode=2))CELERY_ROUTES = {    ‘route_check_task.check_worker.common_check‘: {‘queue‘: ‘common_check‘},    ‘route_check_task.check_worker.check‘: {‘queue‘: ‘route_check‘},    ‘route_check_task.check_worker.check_ignore_result‘: {‘queue‘: ‘route_check_ignore_result‘}}CELERY_DEFAULT_QUEUE = ‘default‘CELERY_DEFAULT_EXCHANGE = ‘default‘CELERY_DEFAULT_EXCHANGE_TYPE = ‘direct‘CELERY_DEFAULT_ROUTING_KEY = ‘default‘# CELERY_MESSAGE_COMPRESSION = ‘gzip‘CELERY_ACKS_LATE = TrueCELERYD_PREFETCH_MULTIPLIER = 1CELERY_DISABLE_RATE_LIMITS = TrueCELERY_TIMEZONE = ‘Asia/Shanghai‘CELERY_ENABLE_UTC = TrueCELERYD_CONCURRENCY = cpu_count() / 2CELERY_TASK_SERIALIZER = ‘json‘CELERY_RESULT_SERIALIZER = ‘json‘CELERY_TASK_PUBLISH_RETRY = TrueCELERY_TASK_PUBLISH_RETRY_POLICY = {    ‘max_retries‘: 3,    ‘interval_start‘: 10,    ‘interval_step‘: 5,    ‘interval_max‘: 20}platforms.C_FORCE_ROOT = True

这里面是一些celery的配置参数

在上面include的add_task.py定义如下:

1234567
#encoding:utf8

from main import app

@app.taskdef add(x,y):    return x+y

启动celery
celery -A main worker -l info -Ofair

  • -A 后面是包含celery定义的模块,我们在main.py中定义了app = Celery...
    测试celery:
  • -l 日志打印的级别,这里是info
  • -Ofair 这个参数可以让Celery更好的调度任务
12345678910
# encoding:utf8__author__ = ‘brianyang‘

import add_task

result = add_task.add.apply_async((1,2))print type(result)print result.ready()print result.get()print result.ready()

输出是

1234
<class ‘celery.result.AsyncResult‘>False3True

当调用result.get()时,如果还没有返回结果,将会阻塞直到结果返回。这里需要注意的是,如果需要返回worker执行的结果,必须在之前的config中配置CELERY_RESULT_BACKEND这个参数,一般推荐使用Redis来保存执行结果,如果不关心worker执行结果,设置CELERY_IGNORE_RESULT=True就可以了,关闭缓存结果可以提高程序的执行速度。
在上面的测试程序中,如果修改为:

12345678
# encoding:utf8__author__ = ‘brianyang‘

import add_task

result = add_task.add.(1,2)print type(result)print result

输出结果为:

12
<type ‘int‘>3

相当于直接本地调用了add方法,并没有走Celery的调度。
通过flower的dashbord可以方便的监控任务的执行情况:
task list
task detail
还可以对worker进行重启,关闭之类的操作
taks_op
使用Celery将一个集中式的系统拆分为分布式的系统大概步骤就是:

  • 根据功能将耗时的模块拆分出来,通过注解的形式让Celery管理
  • 为拆分的模块设置独立的消息队列
  • 调用者导入需要的模块或方法,使用apply_async进行异步的调用并根据需求关注结果。
  • 根据性能需要可以添加机器或增加worker数量,方便弹性管理。

需要注意的是:

  • 尽量为不同的task分配不同的queue,避免多个功能的请求堆积在同一个queue中。
  • celery -A main worker -l info -Ofair -Q add_queue启动Celery时,可以通过参数Q加queue_name来指定该worker只接受指定queue中的tasks.这样可以使不同的worker各司其职。
  • CELERY_ACKS_LATE可以让你的Celery更加可靠,只有当worker执行完任务后,才会告诉MQ,消息被消费。
  • CELERY_DISABLE_RATE_LIMITS Celery可以对任务消费的速率进行限制,如果你没有这个需求,就关闭掉它吧,有益于会加速你的程序。

tornado-celery

tornado应该是python中最有名的异步非阻塞模型的web框架,它使用的是单进程轮询的方式处理用户请求,通过epoll来关注文件状态的改变,只扫描文件状态符发生变化的FD(文件描述符)。
由于tornado是单进程轮询模型,那么就不适合在接口请求后进行长时间的耗时操作,而是应该接收到请求后,将请求交给背后的worker去干,干完活儿后在通过修改FD告诉tornado我干完了,结果拿走吧。很明显,Celery与tornado很般配,而tornado-celery是celery官方推荐的结合两者的一个模块。
整合两者很容易,首先需要安装:

  • tornado-celery
  • tornado-redis
    tornado代码如下:
12345678910111213141516171819202122232425262728293031
# encoding:utf8__author__ = ‘brianyang‘

import tceleryimport tornado.genimport tornado.web

from main import appimport add_task

tcelery.setup_nonblocking_producer(celery_app=app)

class CheckHandler(tornado.web.RequestHandler):    @tornado.web.asynchronous    @tornado.gen.coroutine    def get(self):        x = int(self.get_argument(‘x‘, ‘0‘))        y = int(self.get_argument(‘y‘, ‘0‘))        response = yield tornado.gen.Task(add_task.add.apply_async, args=[x, y])        self.write({‘results‘: response.result})        self.finish

application = tornado.web.Application([    (r"/add", CheckHandler),])

if __name__ == "__main__":    application.listen(8889)    tornado.ioloop.IOLoop.instance().start()

在浏览器输入:http://127.0.0.1:8889/add?x=1&y=2
结果为:

通过tornado+Celery可以显著的提高系统的吞吐量。

Benchmark

使用Jmeter进行压测,60个进程不间断地的访问服务器:
接口单独访问响应时间一般在200~400ms

  • uwsgi + Flask方案:
    uwsgi关键配置:

    12
    processes       = 10threads         = 3

Flask负责接受并处理请求,压测结果:
qps是46,吞吐量大概是2700/min
uwsgi+Flask

  • tornado+Celery方案:
    Celery配置:
    CELERYD_CONCURRENCY = 10也就是10个worker(进程),压测结果:
    qps是139,吞吐量大概是8300/min
    tornado+Celery
    从吞吐量和接口相应时间各方面来看,使用tornado+Celery都能带来更好的性能。

Supervisor

  • 什么是supervisor
    supervisor俗称Linux后台进程管理器
  • 适合场景
    – 需要长期运行程序,除了nohup,我们有更好的supervisor
    – 程序意外挂掉,需要重启,让supervisor来帮忙
    – 远程管理程序,不想登陆服务器,来来来,supervisor提供了高大上(屁~)的操作界面.
    之前启动Celery命令是celery -A main worker -l info -Ofair -Q common_check,当你有10台机器的时候,每次更新代码后,都需要登陆服务器,然后更新代码,最后再杀掉Celery进程重启,恶不恶心,简直恶心死了。
    让supervisor来,首先需要安装:
    pip install supervisor
    配置文件示例:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
[unix_http_server]file=/tmp/supervisor.sock   ; path to your socket filechmod=0777username=adminpassword=admin

[inet_http_server]port=0.0.0.0:2345username=adminpassword=admin

[supervisord]logfile=/var/log/supervisord.log ; supervisord log filelogfile_maxbytes=50MB       ; maximum size of logfile before rotationlogfile_backups=10          ; number of backed up logfilesloglevel=info               ; info, debug, warn, tracepidfile=/var/run/supervisord.pid ; pidfile locationnodaemon=false              ; run supervisord as a daemonminfds=1024                 ; number of startup file descriptorsminprocs=200                ; number of process descriptorsuser=root                   ; default userchildlogdir=/var/log/            ; where child log files will live

[rpcinterface:supervisor]supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets.username=adminpassword=admin[program:celery]command=celery -A main worker -l info -Ofair

directory=/home/q/celeryTestuser=rootnumprocs=1stdout_logfile=/var/log/worker.logstderr_logfile=/var/log/worker.logautostart=trueautorestart=truestartsecs=10

; Need to wait for currently executing tasks to finish at shutdown.; Increase this if you have very long running tasks.stopwaitsecs = 10

; When resorting to send SIGKILL to the program to terminate it; send SIGKILL to its whole process group instead,; taking care of its children as well.killasgroup=true

; Set Celery priority higher than default (999); so, if rabbitmq is supervised, it will start first.priority=1000

示例文件很长,不要怕,只需要复制下来,改改就可以
比较关键的几个地方是:

1234
[inet_http_server]port=0.0.0.0:2345username=adminpassword=admin

这个可以让你通过访问http://yourhost:2345 ,验证输入admin/admin的方式远程管理supervisor,效果如下:
remote supervisor
[program:flower]这里就是你要托管给supervisor的程序的一些配置,其中autorestart=true可以在程序崩溃时自动重启进程,不信你用kill试试看。
剩下的部分就是一些日志位置的设置,当前工作目录设置等,so esay~

supervisor优点:

  • 管理进程简单,再也不用nohup & kill了。
  • 再也不用担心程序挂掉了
  • web管理很方便

缺点:

  • web管理虽然方便,但是每个页面只能管理本机的supervisor,如果我有一百台机器,那就需要打开100个管理页面,很麻烦.

怎么办~

supervisor-easy闪亮登场

通过rpc调用获取配置中的每一个supervisor程序的状态并进行管理,可以分组,分机器进行批量/单个的管理。方便的不要不要的。来两张截图:

    • 分组管理:
      group
    • 分机器管理:
      server
      通过简单的配置,可以方便的进行管理。
时间: 2024-10-07 19:08:13

Celery,Tornado,Supervisor构建和谐的分布式系统的相关文章

celery和supervisor配合使用,实现supervisor管理celery进程

在这里我选择redis作为celery异步任务的中间人,系统选择CentOS6.5 64位.redis.celery和supervisor的安装参见官方文档. 安装完毕后: 1, 创建celery的实例/home/user_00/learn/tasks.py文件 tasks.py: # -*-coding:utf-8-*- from celery import Celery, platforms app = Celery('tasks', backend='redis://localhost:6

面对软件错误构建可靠的分布式系统(阅读笔记)

阅读笔记 joe Armstrong 段先德 译 核心问题:如何在存在软件错误的情况下编写具有合理行为的软件 ,如何避免像死锁.死循环等问题 ERLANG的世界观,一切皆进程.将任务分离成层次化的一系列任务,强隔离的进程负责来执行每个具体化的任务,进程之间不共享状态(实际上ETS跨越了这个准则). 只能通过消息传递来通信,必须注意进程消息的堵塞问题 工作者和监督者构成一个完整的系统,监督者的作用就是监控整个系统的运行状况.并对突发情况进行可靠的处理. behaviour库的设计思想就是将程序的并

ubuntu下python+tornado+supervisor+nginx部署

由于之前在医院采集的数据都是拍照得到的处方图片,而需要用到的是处方的文本形式.因此这两个星期写了个小程序把服务器的图片显示给用户(到时候雇一些人),让用户根据图片录入文字信息. 之前都是用java写web,想到自己最近学机器学习要用python,所以用python来写一下,此外,因为想用点新东西,也介于程序比较小,所以考虑用mongodb来存储(虽然确实没有必要). 基本架构是这样:(后台语言)python +(web框架和web服务器)tornado + (数据库)mongodb  +(进程管

Nginx + tornado + supervisor部署

参考链接:supervisor + Tornado + Nginx 使用详解, 用tornado ,Supervisord ,nginx架网站, tornado官方文档 项目文档树: . ├── chnservices │   └── channels.py ├── etc │   ├── chnservices.conf │   ├── nginx │   │   └── nginx.conf │   ├── supervisord.conf │   └── supervisord.conf.

[后端]nginx+tornado+supervisor提升并发量 @ 备忘

部署有nginx的机器每一个核都会启动一个worker进程,用来接受处理客户端发来的请求.为了做负载均衡,worker会根据一定的规则将请求分发到后面的某一台机器上.由于我的nginx机器后面只有一台四核机器,所以我是这样分发请求的,配置文件中相应位置这样写: upstream news_baijia{ server 0.0.0.0:9999; server 0.0.0.0:9998; server 0.0.0.0:9997; server 0.0.0.0:9996; } 这相当于将请求分发到0

tornado + supervisor + nginx 的一点记录

看了比较多的blog基本都是这个架构: supervisor ------------ app1 |-------app2 |-------.... |-------appn |-------nginx |-------redis 统一都交给supervisor来管理.总觉得哪里不对: 1) nginx作为supervisor的子进程,会有问题,它貌似会不断的去执行启动(导致大量的错误日志:端口已经被占用) 2)   nginx 和 redis 的启动与配置与app之间应该是没有耦合关系的,和s

erl_0020 《面对软件错误构建可靠的分布式系统》读书笔记001 “面向并发COPL”

在现实世界中,顺序化的(sequential)活动非常罕见.当我们走在大街上的时候,如果只看到一件事情发生的话我们一定会感到不可思议,我们期望碰到许多同时进行的活动. 如果我们不能对同时发生的众多事件所造成的结果进行分析和预测的话,那么我们将会面临巨大的危险,像开车这类的任务我们就不可能完成了.事实上我们是可以做那些需要处理大量并发信息的事情的,这也表明我们本来就是具有很多感知机制的,正是这些机制让我们能够本能地理解并发,而无需有意识地思考. 然而对于计算机编程来说,情况却突然变得相反.把活动安

代构建高可用分布式系统的利器——Netty

特性: 高性能,事件驱动,异步非阻塞 Java 开源框架 基于 NIO 的客户端,服务端编程框架 稳定性和伸缩性 常用于建立 TCP/IP 底层的连接,能够建立高性能的 Http 服务器. 正因为高性能.异步非阻塞等特性,很多高性能项目将其作为底层的通信基础,比如阿里的 Dubbo. 活跃的主要领域: 高性能领域 多线程并发领域 异步通信领域 IO通信 关于这块,以前看到有篇写的非常好的文章解释: 关于同步.异步.并行.并发等 下面为简略的介绍. BIO 一个线程负责连接. 就是一个线程来负责监

Nginx+uwsgi+celery+supervisor部署Django前后端分离项目

本实验实现了负载均衡.反向代理.动静分离,还实现了根据客户端设备user-agent进行转发,也就是移动端和PC端访问的页面不一样. 1. 项目部署逻辑图 2. 环境准备 服务器:6台VM操作系统:CentOS7LB.www.wap:安装Nginxuwsgi1.uwsgi2:安装nfs-utils.Python3解释器.virtualenvNFS:安装NFSMRCS:安装MySQL.Redis.virtualenv 注意:这里不介绍软件的安装Nginx安装参考:http://blog.51cto