celery实现任务统一收集、分发执行

首先解释下目标的概念:celery任务消息会由各种途径(比如手动通过python shell触发、通过tornado触发等)发往统一的一个celery broker,然后任务消息会由不同server上的worker去获取并执行。具体点说就是,借助celery消息路由机制,celery broker中开不同的消息队列来接收相应类型的任务消息,然后不同server上开启worker来处理目标消息队列里面的任务消息,即任务统一收集、分发到不同server上执行。

测试

项目架构如下:一个服务,一部分task运行在server1上,一部分task运行在server2上,所有的任务都可以通过网页向tornado(部署在server1上)发起、tornado接到网页请求调用相应的task handler、task handler向celery broker相应的queue发任务消息、最后server1上的worker和server2上的worker各自去相应的队列中获取任务消息并执行任务。server1是上海集群的10.121.72.94,server2是济阳集群的10.153.104.76,celery
broker是redis数据库:redis://10.121.76.204:17016/1。

首先来看一下server1上的代码结构,

| start_worker.sh

| proj

|__init__.py      (空文件)

|celery.py

|hotplay_task.py

| hotplay_tornado_server.py

上面的代码包含了响应网页请求的tornado server构建代码、server1上的celery服务。

先来看server1上的celery调度器,

celery.py


#-*-coding=utf-8-*-

from __future__ import absolute_import

from celery import Celery

from kombu import Queue

 

app = Celery("proj",

        broker = "redis://10.121.76.204:17016/1",

             include = [‘proj.hotplay_task‘]

             )

 

app.conf.update(

        CELERY_DEFAULT_QUEUE = ‘hotplay_sh_default_queue‘,

        #CELERY_QUEUES = (Queue(‘hotplay_jy_queue‘),),  #该队列是给server2用的,并不需要在这里申明

    )

hotplay_task.py


from __future__ import absolute_import

 

import sys

import os

import hashlib

import time

import subprocess

 

 

from proj.celery import app

 

 

reload(sys)

sys.setdefaultencoding(‘utf-8‘)

 

sys.path.append(os.path.join(os.path.dirname(__file__), "./"))

 

HOTPLAY_CATCHUP_DIR = ‘/home/uaa/prog/hotplay_v2/online_task/catch_up‘

 

@app.task(bind=True)

def do_init_catchup(self, user_name, album_id, album_name, channel_name):

    print ‘start to init catch up of user %s album %s:%s in channel %s‘%(user_name, album_id, album_name, channel_name)

    job_args = ‘source %s/init_catch_up.sh %s %s %s %s > ./logs/%s_%s.log‘%(HOTPLAY_CATCHUP_DIR, user_name, album_id, album_name, channel_name, album_id, user_name)

    print ‘job_args:‘, job_args

    P = subprocess.Popen(job_args,shell=True)  

    rt_code = P.wait()

    if rt_code == 0:

        print ‘job success...‘

    else:

        print ‘job error:%d‘%(rt_code)

    #    print ‘job error:%d, will retry in 5 min‘%(rt_code)

    #    raise self.retry(countdown=300)

 

@app.task(bind=True)

def do_catchup(self, hotplay_id, start_dt, end_dt):

    print ‘start to catch up of %s:%s-%s‘%(hotplay_id, start_dt, end_dt)

    job_args = ‘source %s/catch_up_all_run.sh %s %s %s > ./logs/%s.log 2>&1‘%(HOTPLAY_CATCHUP_DIR, hotplay_id, start_dt, end_dt, hotplay_id)

    print ‘job_args:‘, job_args

    P = subprocess.Popen(job_args,shell=True)  

    rt_code = P.wait()

    if rt_code == 0:

        print ‘job success...‘

    else:

        print ‘job error:%d‘%(rt_code)

    #    print ‘job error:%d, will retry in 5 min‘%(rt_code)

    #    raise self.retry(countdown=300)

start_worker.sh


nohup celery -A proj worker -n hotplay_default_worker -c 3 -Q hotplay_sh_default_queue -l info &

上面的代码定义了一个celery实例,该实例有两个队列,注册了两个celery task function,最后启动一个worker来处理默认队列hotplay_sh_default_queue(celery.py中重命名过的默认队列)中的任务消息。

tornado server是所有celery任务的发起者,server1和server2上celery task都由tornado
server相应的handler发起。

hotplay_tornado_server.py


#-*-coding=utf-8-*-

from __future__ import absolute_import

import sys

import os

import tornado.web

import tornado.ioloop

import tornado.httpserver

 

from celery.execute import send_task

from proj.hotplay_task import do_init_catchup, do_catchup

 

reload(sys)

sys.setdefaultencoding(‘utf-8‘)

 

 

TORNADO_SERVER_PORT=10501

 

class InitCatchupHandler(tornado.web.RequestHandler):

    def get(self, path):

        user_name = self.get_argument("user_name", None)

        album_id = self.get_argument("album_id",None)

        album_name = self.get_argument("album_name",None)

        channel_name = self.get_argument("channel_name", None)

        print "request user_name+album_id+album_name+channel_name:%s+%s_%s+%s"%(user_name, album_id, album_name, channel_name)

        if album_id == ‘0‘:

            self.write(‘test tornado server init catch up handler. sucess. just return\n‘)

            return

        

        try:

            self.write("0")

            do_init_catchup.delay(user_name, album_id, album_name, channel_name)

        except:

            self.write("-1")

 

class DoCatchupHandler(tornado.web.RequestHandler):

    def get(self, path):

        hotplay_id = self.get_argument("hotplay_id",None)

        start_dt = self.get_argument("start_dt",None)

        end_dt = self.get_argument("end_dt",None)

        print "request hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)

        if hotplay_id == ‘0‘:

            self.write(‘test tornado server catch up handler. sucess. just return\n‘)

            return

        

        try:

            self.write("0")

            do_catchup.delay(hotplay_id, start_dt, end_dt)

        except:

            self.write("-1")

 

class DoCatchupJYHandler(tornado.web.RequestHandler):

    def get(self, path):

        hotplay_id = self.get_argument("hotplay_id",None)

        start_dt = self.get_argument("start_dt",None)

        end_dt = self.get_argument("end_dt",None)

        print "request jy hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)

        #if hotplay_id == ‘0‘:

        #    self.write(‘test tornado server catch up handler. sucess. just return\n‘)

        #    return

        send_task(‘tasks.test1‘, args=[hotplay_id, start_dt, end_dt], queue=‘hotplay_jy_queue‘)  #tasks.test1是server2上celery任务函数的file_name.func_name

                                                                                                 #file_name是任务函数所在文件相对于celery worker的路径

        #try:

        #    self.write("0")

        #    do_catchup.delay(hotplay_id, start_dt, end_dt)

        #except:

        #    self.write("-1")

 

application = tornado.web.Application(

            [

            (r"/init_catchup/(.*)", InitCatchupHandler),

            (r"/do_catchup/(.*)", DoCatchupHandler),

            (r"/do_catchup_jy/(.*)", DoCatchupJYHandler),

            ], 

            template_path = "template", static_path="static"

            )

            

if __name__ == ‘__main__‘:

    http_server = tornado.httpserver.HTTPServer(application)

    http_server.listen(TORNADO_SERVER_PORT)

    tornado.ioloop.IOLoop.instance().start()

代码中定义了3个handler,前两个负责在接收到相应的网页请求后,发起server1上定义的两个task function任务消息,消息发往celery broker的默认队列hotplay_sh_default_queue(使用task_name.delay函数发出的请求会加入到默认队列,使用task_name.apply_async或send_task函数则可以指定目标队列),最后由server1上的worker执行。网页请求的格式类似——http://10.121.72.94:10501/do_catchup_jy/?hotplay_id=pxftest&start_dt=2015-08-12&end_dt=2015-08-14。第3个handler发起一个名为tasks.test1的任务消息,发往celery
broker的另一个队列hotplay_jy_queue,tasks.test1任务并没有在server1上的celery调度器中实现(也叫注册),而是放在了server2上,相应的,处理队列hotplay_jy_queue的worker也在server2上运行。这里,由于tasks.test1task
function没有注册在server1上,所以使用send_task函数来发送任务消息;这是因为task_name.delay、task_name.apply_async函数发送任务请求需要先import task_name相应的python function,而send_task函数发送任务消息其实就相当于往celery
broker发送一个字符串类似的任务请求、不需要调用事先写好的task function,然后该字符串类似的任务消息由worker获取、worker根据任务消息去寻找实际的task function来执行。这种机制也是celery实现任务统一收集、分发执行的基础。

来看server2上的celery调度器,

|tasks.py (注意,要和tornado server中send_task()函数用的file_name一样)

|start_server.sh

由于只是功能测试,写得比较简单,

tasks.py


#-*-coding=utf-8-*-

from __future__ import absolute_import

from celery import Celery

from kombu import Queue

 

app = Celery("test",

        broker = "redis://10.121.76.204:17016/1"

    #         include = [‘test.tasks‘]

             )

 

app.conf.update(

        CELERY_DEFAULT_QUEUE = ‘hotplay_sh_default_queue‘,  #可省略,但不能和server1的配置不一样

        CELERY_QUEUES = (Queue(‘hotplay_jy_queue‘),),

    )

 

@app.task()

def test1(hotplay_id, start_dt, end_dt): #注意,名字要和tornado_server中send_task()函数用的func_name名字一样

    print ‘hotplay_id is %s, stat from %s to %s‘%(hotplay_id, start_dt, end_dt)

start_server.sh


celery -A tasks worker -n hotplay_jy_worker -c 2 -Q hotplay_jy_queue -l info

server2上调度器主要就是开了一个worker来取tornado server发往hotplay_jy_queue队列的任务并执行,当然,任务在哪里执行、相应的任务函数就应该放在哪里。此外,server2和server1上的celery实例app的消息队列配置应该保持一致,因为它们是对同一个celery
broker的配置。

总结

最后总结下上面项目架构的实现:所有的celery任务都由tornado server发起,统一由celery broker收集、不过分别由celery
broker的hotplay_sh_default_queue和hotplay_jy_queue两个消息队列接收,最后分别由server1和server2上的worker去执行。

在上面的项目架构中,tornado server是和server1上的celery调度器放在一起的,这是有必要的,因为send_task函数发送任务消息的时候,至少应该要知道celery
broker等信息,而这些信息在server1的celery调度器上有(请注意hotplay_tornado_server.py中from
proj.hotplay_taskimport
do_init_catchup,
do_catchup语句,该语句不仅import两个任务函数,还获取了celery实例app的信息,从而获得了celery broker等配置信息)。在这之后,如果有其他任务要集成进来,直接在hotplay_tornado_server.py中增加相应的handler(调用send_task函数向目标队列发送相应的任务消息,目标队列不需要在server1上申明)、并在其他server上写好相应的celery调度器(申明消息队列、实现celery
task function、开启worker)即可。
这时,tornado server负责所有任务(不止是本文提到的3个任务)的触发(通过网页触发比较方便)、然后使用send_task函数往某一个固定的celery
broker发送任务消息、不同种类的任务消息发到celery broker上特定的消息队列,每种任务的执行由任务部署的服务器上的celery调度器(就和server2上的调度器)完成,由各个服务器上的celery调度器的worker会到自己目标队列中取任务消息来执行。这样做的好处是:一个broker搞定所有任务,不过有多少种不同的任务、broker上就会有多少个消息队列。

后续

上文总结中提到tornado server需要和server1上的celery调度器放在一起,以获取celery broker的信息,经过尝试,tornado server是可以完全独立出来的。

在tornado server的py文件中添加以下代码:

from celery import Celery

app = Celery(broker = "redis://10.121.76.204:17016/1",)

接着,改send_task(‘tasks.test1‘,
args=[hotplay_id, start_dt,
end_dt],queue=‘hotplay_jy_queue‘)

app.send_task(‘tasks.test1‘, args=[hotplay_id, start_dt, end_dt], queue=‘hotplay_jy_queue‘)

然后,就可以去掉下面两行了:

from celery.executeimport
send_task

from proj.hotplay_taskimport
do_init_catchup, do_catchup

这样子,tornado server就可以完全独立出来运行,而不必再和任何任务绑在一起以获得celery broker的信息,因为celery broker的信息直接写在tornado server的代码里了。当然,hotplay_tornado_server.py代码经过上面的修改、完全独立出来后,do_init_catchup.delay(user_name,
album_id,
album_name,
channel_name)和do_catchup.delay(hotplay_id,
start_dt,
end_dt)需要用send_task函数改写,

app.send_task(‘proj.hotplay_task.do_init_catchup‘, args=[user_name, album_id, album_name, channel_name])    #send to default queue: hotplay_default_sh_queue

app.send_task(‘proj.hotplay_task.do_catchup‘, args=[hotplay_id, start_dt, end_dt])

最后说明一下,tornado server完全独立出来的好处:如果不完全独立出来,那么和tornado
server放在一起的celery调度器需要修改的话,则celery worker和tornado server也需要重启(tornado server代码调用了celery调度器的任务函数以及broker信息,所以要重启),tornado
server至少和一个celery调度器存在耦合;完全独立后,解除了tornado
server代码和celery调度器之间的耦合,这时tornado server中使用send_task函数发送任务消息、无需经过实际实现的celery任务函数,所以任何celery调度器的改动(只要别改任务函数名和任务函数的参数)都无需重启tornado
server、而只要重启celery worker即可,也就是说任务的提交和任务的执行完全分离开来了。

参考:

http://www.avilpage.com/2014/11/scaling-celery-sending-tasks-to-remote.html

https://groups.google.com/forum/#!topic/celery-users/E37wUyOcd3I

http://programming.nullanswer.com/question/29340011

http://www.imankulov.name/posts/celery-for-internal-api.html

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-15 02:54:46

celery实现任务统一收集、分发执行的相关文章

K8S使用filebeat统一收集应用日志

今年3月份在公司的内部k8s培训会上,开发同事对应用整合进pod提出了几个问题,主要围绕在java应用的日志统一收集.集中存放和java jvm内存监控数据收集相关的点上,本文将介绍使用filebeat实现pod日志的统一收集,集中存放使用集群外的elasticsearch,后续可以加上kibana及模板文件实现更友好的数据展示. 一.准备和测试tomcat基础镜像该镜像主要是配置jdk环境变量和tomcat软件包部署,如果有特殊的需求,例如安装其他软件包.配置tomcat https等也可以在

celery 中任务的结构以及执行

起因 最近打算实现异步任务,回想起当年看celery的场景,重新整理下celery的机制 1. 任务入队列 假定一个函数定义如下 def add(a, b, c=0): print a + b + c 任务被序列化后,以字符串的形式入队列 {"body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3

iOS 捕获全局异常,统一收集

参考博文:http://www.cnblogs.com/easonoutlook/archive/2012/12/27/2835979.html 开发程序的过程中不管我们已经如何小心,总是会在不经意间遇到程序闪退.流畅的操作被无情地Crash打断,当程序运行Crash的时候,系统会把运行的最后时刻的运行信息记录下来,存储到一个文件中,也就是我们所说的Crash文件,当时如果是真机测试离开Xcode的时候Crash掉,我们是无法知道crash的具体位置的.现在做一个程序统一记录crash的位置.先

Rsyslog 实现Nginx日志统一收集功能

一.rsyslog 介绍 ryslog 是一个快速处理收集系统日志的程序,提供了高性能.安全功能和模块化设计.rsyslog 是syslog 的升级版,它将多种来源输入输出转换结果到目的地,据官网介绍,现在可以处理100万条信息. 二.Rsyslog应用 Rsyslog服务端配置 下面有一些参数没有做中文解释,是因为我也不是太了解,没办法做出解释,测试不出具体效果 需要自己去看看文档来解决了. 配置文件/etc/rsyslog.conf $ModLoad imuxsock            

python日志添加功能,主要记录程序运行中的日志,统一收集并分析

转自:https://www.cnblogs.com/jsondai/p/9663633.html 一.日志的级别 debug(调试信息) info() warning(警告信息)error(错误信息) critical(致命信息) 从左往右越来越严重 日志等级(level) 描述 DEBUG 最详细的日志信息,典型应用场景是 问题诊断 INFO 信息详细程度仅次于DEBUG,通常只记录关键节点信息,用于确认一切都是按照我们预期的那样进行工作 WARNING 当某些不期望的事情发生时记录的信息(

celery执行异步任务和定时任务

一.什么是Clelery Celery是一个简单.灵活且可靠的,处理大量消息的分布式系统 专注于实时处理的异步任务队列 同时也支持任务调度 Celery架构 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成. 消息中间件 Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成.包括,RabbitMQ, Redis等等 任务执行单元 Worker是Celery提供

celery异步执行任务框架

Celery 官方 Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/ Celery异步任务框架 """ 1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket) 2)celery服务为为其他项目服务提

基于Celery的并行处理工程-OpenWorker快速安装

OpenWorker,欢迎参与:https://github.com/supergis/OpenWorker.OpenWorker是基于Python的并行处理框架,将集成Celery.Flower.Jobtastic和Rodeo工程,可以通过控制台或Web进行管理.提交任务等. Celery 是一个简单灵活的Python并行处理框架,但是相关的几个工程需要独自安装和配置,给小白的使用带来困难.OpenWorker将这几个工程放到一起, 并增加了统一的安装脚本,让部署和安装.运行都更加方便.Ope

Python Celery队列

Celery队列简介: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery. 使用场景: 1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如