python celery多worker、多队列、定时任务

多worker、多队列

celery是一个分布式的任务调度模块,那么怎么实现它的分布式功能呢,celery可以支持多台不同的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议。

简单理解:

可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,

而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。

exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker。

下面我们来看一个列子:

vi tasks.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")  # 指定配置文件

@app.task
def taskA(x,y):
return x + y

@app.task
def taskB(x,y,z):
return x + y + z

@app.task
def add(x,y):
return x + y

编写配置文件,配置文件一般单独写在一个文件中。

vi celeryconfig.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://47.106.106.220:5000/1"
CELERY_RESULT_BACKEND = "redis://47.106.106.220:5000/2"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)
# 路由
CELERY_ROUTES = {
‘tasks.taskA‘:{"queue":"for_task_A","routing_key":"for_task_A"},
‘tasks.taskB‘:{"queue":"for_task_B","routing_key":"for_task_B"}
}

远程客户端上编写测试脚本

vi test.py

from tasks import *
re1 = taskA.delay(100, 200)
print(re1.result)
re2 = taskB.delay(1, 2, 3)
print(re2.result)
re3 = add.delay(1, 2)
print(re3.status)

启动两个worker来分别指定taskA、taskB,开两个窗口分别执行下面语句。

celery -A tasks worker -l info -n workerA.%h -Q for_task_A

celery -A tasks worker -l info -n workerB.%h -Q for_task_B

远程客户端上执行脚本可以看到如下输出:

python test.py
300
6
PENDING

在taskA所在窗口可以看到如下输出:

.......
.......
.......
task_A

[tasks]
  . tasks.add
  . tasks.taskA
  . tasks.taskB

[2018-05-27 19:23:49,235: INFO/MainProcess] Connected to redis://47.106.106.220:5000/1
[2018-05-27 19:23:49,253: INFO/MainProcess] mingle: searching for neighbors
[2018-05-27 19:23:50,293: INFO/MainProcess] mingle: all alone
[2018-05-27 19:23:50,339: INFO/MainProcess] [email protected] ready.
[2018-05-27 19:23:56,051: INFO/MainProcess] sync with [email protected]
[2018-05-27 19:24:28,855: INFO/MainProcess] Received task: tasks.taskA[8860e78a-b82b-4715-980c-ae125dcab2f9]
[2018-05-27 19:24:28,872: INFO/ForkPoolWorker-1] Task tasks.taskA[8860e78a-b82b-4715-980c-ae125dcab2f9] succeeded in 0.0162177120219s: 300

在taskB所在窗口可以看到如下输出:

.......
.......
.......
task_B
[tasks]
  . tasks.add
  . tasks.taskA
  . tasks.taskB

[2018-05-27 19:23:56,012: INFO/MainProcess] Connected to redis://47.106.106.220:5000/1
[2018-05-27 19:23:56,022: INFO/MainProcess] mingle: searching for neighbors
[2018-05-27 19:23:57,064: INFO/MainProcess] mingle: sync with 1 nodes
[2018-05-27 19:23:57,064: INFO/MainProcess] mingle: sync complete
[2018-05-27 19:23:57,112: INFO/MainProcess] [email protected] ready.
[2018-05-27 19:24:33,885: INFO/MainProcess] Received task: tasks.taskB[5646d0b7-3dd5-4b7f-8994-252c5ef03973]
[2018-05-27 19:24:33,910: INFO/ForkPoolWorker-1] Task tasks.taskB[5646d0b7-3dd5-4b7f-8994-252c5ef03973] succeeded in 0.0235358460341s: 6

我们看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

celery -A tasks worker -l info -n worker.%h -Q celery

再次在远程客户端执行test.py,可以看到结果执行成功,并且刚新启动的worker窗口有如下输出:

.......
.......
.......
[tasks]
  . tasks.add
  . tasks.taskA
  . tasks.taskB

[2018-05-27 19:25:44,596: INFO/MainProcess] Connected to redis://47.106.106.220:5000/1
[2018-05-27 19:25:44,611: INFO/MainProcess] mingle: searching for neighbors
[2018-05-27 19:25:45,660: INFO/MainProcess] mingle: sync with 2 nodes
[2018-05-27 19:25:45,660: INFO/MainProcess] mingle: sync complete
[2018-05-27 19:25:45,711: INFO/MainProcess] [email protected] ready.
[2018-05-27 19:25:45,868: INFO/MainProcess] Received task: tasks.add[f9c5ca2b-623e-4c0a-9c45-a99fb0b79ed5]
[2018-05-27 19:25:45,880: INFO/ForkPoolWorker-1] Task tasks.add[f9c5ca2b-623e-4c0a-9c45-a99fb0b79ed5] succeeded in 0.0107084610499s: 3

Celery与定时任务

在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:

cat celeryconfig.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://47.106.106.220:5000/1"
CELERY_RESULT_BACKEND = "redis://47.106.106.220:5000/2"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)

CELERY_ROUTES = {
‘tasks.taskA‘:{"queue":"for_task_A","routing_key":"for_task_A"},
‘tasks.taskB‘:{"queue":"for_task_B","routing_key":"for_task_B"}
}

# 新增加的定时任务部分
CELERY_TIMEZONE = ‘UTC‘
CELERYBEAT_SCHEDULE = {
    ‘taskA_schedule‘ : {
        ‘task‘:‘tasks.taskA‘,
        ‘schedule‘:2,
        ‘args‘:(5,6)
    },
    ‘taskB_scheduler‘ : {
        ‘task‘:"tasks.taskB",
        "schedule":10,
        "args":(10,20,30)
    },
    ‘add_schedule‘: {
        "task":"tasks.add",
        "schedule":5,
        "args":(1,2)
    }
}

还是按之前启动三个worker

celery -A tasks worker -l info -n workerA.%h -Q for_task_A

celery -A tasks worker -l info -n workerB.%h -Q for_task_B

celery -A tasks worker -l info -n worker.%h -Q celery

启动定时任务

[[email protected] scripts]# celery -A tasks beat
celery beat v4.1.1 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2018-05-27 19:39:29
Configuration ->
    . broker -> redis://47.106.106.220:5000/1
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

在之前启动worker的三个窗口分别可以看到定时任务正在运行:

celery -A tasks worker -l info -n workerA.%h -Q for_task_A

[2018-05-27 19:41:27,432: INFO/ForkPoolWorker-1] Task tasks.taskA[60f41780-c9a2-477b-be46-6620ef07631f] succeeded in 0.00289130600868s: 11
[2018-05-27 19:41:29,428: INFO/MainProcess] Received task: tasks.taskA[27220f52-dde2-471a-a87c-3f533d67217c]
............

celery -A tasks worker -l info -n workerB.%h -Q for_task_B

[2018-05-27 19:41:18,420: INFO/ForkPoolWorker-1] Task tasks.taskB[b6f9aee3-e6b4-4f10-9428-457d9bb844cf] succeeded in 0.00282042898471s: 60
[2018-05-27 19:41:28,416: INFO/MainProcess] Received task: tasks.taskB[44dfea0b-b725-4874-bea2-9b66e8da573b]............

celery -A tasks worker -l info -n worker.%h -Q celery 

[2018-05-27 19:41:23,428: INFO/ForkPoolWorker-1] Task tasks.add[315a9cca-3c95-4517-9289-2ece15cd46a4] succeeded in 0.00355823297286s: 3
[2018-05-27 19:41:28,423: INFO/MainProcess] Received task: tasks.add[c4a1b2c7-ecb7-4af4-85c1-a341b3ec6726]............

原文地址:https://www.cnblogs.com/changkuk/p/10124171.html

时间: 2024-08-07 05:20:04

python celery多worker、多队列、定时任务的相关文章

Celery - 一个懂得 异步任务 , 定时任务 , 周期任务 的芹菜

1.什么是Celery?Celery 是芹菜Celery 是基于Python实现的模块, 用于执行异步定时周期任务的其结构的组成是由    1.用户任务 app    2.管道 broker 用于存储任务 官方推荐 redis rabbitMQ  / backend 用于存储任务执行结果的    3.员工 worker 2.Celery的简单实例 1 from celery import Celery 2 import time 3 4 #创建一个Celery实例,这就是我们用户的应用app 5

【Python celery】 -- 2019-08-08 18:03:28

目录 原文: http://106.13.73.98/__/156/ 安装:pip install celery celery 是基于 Python 实现的模块,用于执行异步定时周期任务. celery 组成结构: 用户任务 app: 用于生成任务 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果 员工 worker:负责执行任务 @(Python celery) 简单示例 员工文件(workers.py): import time from celery im

【Python celery】 -- 2019-08-08 20:39:56

目录 原文: http://106.13.73.98/__/156/ 安装:pip install celery celery 是基于 Python 实现的模块,用于执行异步定时周期任务. celery 组成结构: 用户任务 app: 用于生成任务 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果 员工 worker:负责执行任务 @(Python celery) 简单示例 员工文件(workers.py): import time from celery im

python celery介绍和基本使用

08 python celery介绍和基本使用 celery分布式任务队列 RPC远程,当执行一条命令,等待远程执行结果返回客户端. 在Linux上可以在后台执行,不影响其他任务执行.(涉及到异步) 1.分布式任务运算celery 参考:https://www.cnblogs.com/alex3714/p/6351797.html 任务计划:https://www.cnblogs.com/peida/archive/2013/01/08/2850483.html Crontab操作系统本身任务计

Python - celery 相关报错 - AttributeError: type object '_multiprocessing.win32' has no attribute 'WAIT_OBJECT_0'

报错场景 执行   celery worker -A tasks -l INFO  打开 worker 的时候报错无法进行 报错解决 Celery 的版本过高, 进行降级处理即可 pip install celery==3.1.25 降级后再次执行会触发 另一报错 此报错原因是 redis 的版本过高导致 对 redis 进行降级即可  pip install redis==2.10.6 Python - celery 相关报错 - AttributeError: type object '_m

Python数据类型:双端队列deque-比列表list性能更高的一种数据类型

Python数据类型:双端队列 说到容器类型,大家第一时间想到的多半是list,而list确实也能解决大部分的需要,但碰到列表内的数据量相当大的时候,性能问题就显得尤为重要:再或者列表被恶意注入一个无穷大的数据量时,就可能会暴露出安全问题来.较好的替代方法是:collections.deque.获得性能或安全的同时,牺牲的是数据大小. deque队列两端取出或者删除时间复杂度都是O(1),而列表都是O(n),队列的性能比列表更好 简介 欢迎关注我的公众号:Python雁横 collections

Python Celery队列

Celery队列简介: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery. 使用场景: 1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如

44. Python Celery多实例 定时任务

celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢? celery可以支持多台不同的计算机执行不同的任务或者相同的任务. 如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议. 具体可以查看AMQP文档详细了解. 简单理解: 可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue, 而这是通过Exchange来实现的,发送消息到"消息队列"中时,可

celery执行异步任务和定时任务

一.什么是Clelery Celery是一个简单.灵活且可靠的,处理大量消息的分布式系统 专注于实时处理的异步任务队列 同时也支持任务调度 Celery架构 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成. 消息中间件 Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成.包括,RabbitMQ, Redis等等 任务执行单元 Worker是Celery提供