44. Python Celery多实例 定时任务

celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢?

celery可以支持多台不同的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议。

具体可以查看AMQP文档详细了解。

简单理解:

可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,

而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。

如图:

exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker

写个例子:

vim demon3.py

from celery import Celery
app = Celery()
app.config_from_object("celeryconfig")
@app.task
def taskA(x, y):
    return x * y
@app.task
def taskB(x, y, z):
    return x + y + z
@app.task
def add(x, y):
    return x + y

vim celeryconfig.py

from kombu import Queue
BORKER_URL = "redis://192.168.48.131:6379/1"				#1库
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"	#2库
CELERY_QUEUES = {
    Queue("default", Exchange("default"), routing_key = "default"),
    Queue("for_task_A", Exchange("for_task_A"), routing_key = "for_task_A"),
    Queue("for_task_B", Exchange("for_task_B"), routing_key = "for_task_B")
}
#路由
CELERY_ROUTES = {
    "demon3.taskA":{"queue": "for_task_A",  "routing_key": "for_task_A"},
    "demon3.taskB":{"queue": "for_task_B",  "routing_key": "for_task_B"}
}

下面把两个脚本导入服务器:

指定taskA启动一个worker:

# celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

同理:

# celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

下面远程客户端调用:新文件

vim remote.py

from demon3 import *
r1 = taskA.delay(10, 20)
print (r1.result)
print (r1.status)
r2 = taskB.delay(10, 20, 30)
time.sleep(1)
prnit (r2.result)
print (r2.status)
#print (dir(r2))
r3 = add.delay(100, 200)
print (r3.result)
print (r3.status)	#PENDING

看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。

下面,我们来启动一个worker来执行celery队列中的任务

# celery -A tasks worker -l info -n worker.%h -Q celery 		##默认的

可以看到这行的结果为success

print(re3.status)    #SUCCESS

定时任务:

Celery 与 定时任务

在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。

下面我们接着在配置文件:celeryconfig.py,添加关于 CELERYBEAT_SCHEDULE 变量到脚本中去:

CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
    'taskA_schedule' : {
        'task':'tasks.taskA',
        'schedule':20,
        'args':(5,6)
    },
'taskB_scheduler' : {
    'task':"tasks.taskB",
    "schedule":200,
    "args":(10,20,30)
    },
'add_schedule': {
    "task":"tasks.add",
    "schedule":10,
    "args":(1,2)
    }
}

注意格式,否则会有问题

启动:

celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

celery -A tasks worker -l info -n worker.%h -Q celery

celery -A demon3 beat

时间: 2024-11-13 14:05:54

44. Python Celery多实例 定时任务的相关文章

python celery多worker、多队列、定时任务

多worker.多队列 celery是一个分布式的任务调度模块,那么怎么实现它的分布式功能呢,celery可以支持多台不同的计算机执行不同的任务或者相同的任务. 如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议. 简单理解: 可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue, 而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing

【Python celery】 -- 2019-08-08 18:03:28

目录 原文: http://106.13.73.98/__/156/ 安装:pip install celery celery 是基于 Python 实现的模块,用于执行异步定时周期任务. celery 组成结构: 用户任务 app: 用于生成任务 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果 员工 worker:负责执行任务 @(Python celery) 简单示例 员工文件(workers.py): import time from celery im

【Python celery】 -- 2019-08-08 20:39:56

目录 原文: http://106.13.73.98/__/156/ 安装:pip install celery celery 是基于 Python 实现的模块,用于执行异步定时周期任务. celery 组成结构: 用户任务 app: 用于生成任务 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果 员工 worker:负责执行任务 @(Python celery) 简单示例 员工文件(workers.py): import time from celery im

Python Celery队列

Celery队列简介: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery. 使用场景: 1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如

python celery介绍和基本使用

08 python celery介绍和基本使用 celery分布式任务队列 RPC远程,当执行一条命令,等待远程执行结果返回客户端. 在Linux上可以在后台执行,不影响其他任务执行.(涉及到异步) 1.分布式任务运算celery 参考:https://www.cnblogs.com/alex3714/p/6351797.html 任务计划:https://www.cnblogs.com/peida/archive/2013/01/08/2850483.html Crontab操作系统本身任务计

【引用】python 静态函数 类函数 实例函数

1.关于定义类的一些奇特之处  今天在Python中定义一个类,很奇怪,不需要事先声明它的成员变量吗?暂时不知,先记录下来: class Account(object):    "一个简单的类"    account_type="Basic"    def __init__(self,name,balance):        "初始化一个新的Account实例"        self.name=name        self.balance

python 类和实例

面向对象最重要的概念就是类(Class)和实例(Instance),必须牢记类是抽象的模板,比如Student类,而实例是根据类创建出来的一个个具体的“对象”,每个对象都拥有相同的方法,但各自的数据可能不同. 仍以Student类为例,在Python中,定义类是通过class关键字: class Student(object): pass class后面紧接着是类名,即Student,类名通常是大写开头的单词,紧接着是(object),表示该类是从哪个类继承下来的,继承的概念我们后面再讲,通常,

Python logging模块实例教程

position:static(静态定位) 当position属性定义为static时,可以将元素定义为静态位置,所谓静态位置就是各个元素在HTML文档流中应有的位置 podisition定位问题.所以当没有定义position属性时,并不说明该元素没有自己的位置,它会遵循默认显示为静态位置,在静态定位状态下无法通过坐标值(top,left,right,bottom)来改变它的位置. position:absolute(绝对定位) 当position属性定义为absolute时,元素会脱离文档流

关于python的单实例模式

单实例模式一直是常用的设计模式,对于python的单实例模式,其实其本身就有实现 http://stackoverflow.com/questions/31875/is-there-a-simple-elegant-way-to-define-singletons-in-python/31887#31887 里面说到module,module只会初始化一次,天然的singleton.这是最为python的解决方案.将你所需要的属性和方法,直接暴露在模块中变成模块的全局变量和方法即可. 另外,如果