cerely异步分布式

1、释义:

  Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery。

举几个实例场景中可用的例子:

  • 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
  • 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

Celery 本身并不提供消息服务,在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

2、Celery的优点:

  • 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

3、Celery基本工作流程图

4、示例

这里我们使用redis
连接url的格式为:
redis://:[email protected]:port/db_number
例如:
BROKER_URL = ‘redis://localhost:6379/0‘

安装celery和redis

  • pip install celery
  • pip install redis

使用celery包含三个方面:

  • 定义任务函数
  • 运行celery服务
  • 客户应用程序的调用

先创建一个脚本 tasks.py

from celery import Celery        #导入了celery

broker = ‘redis://172.16.94.85:6379/1‘backend = ‘redis://172.16.94.85:6379/2‘app = Celery(‘tasks‘, broker=broker, backend=backend)   #创建了celery实例app,实力话的过程中指定任务名tasks(和文件名一致),传入了broker和backend

@app.task    #装饰器def add(x, y):         #创建任务函数add    print("running...", x, y)    return x + y

在当前命令行终端运行(启动worker,worker名要和脚本名一致):

celery -A tasks worker --loglevel=info

此时会看见一对输出,包括注册的任务

新建 test.py并执行:

#!/usr/bin/env python# -*- coding:utf-8 -*-# @Time   : 2018/5/26 8:17# @Author : JWQ# @File   : demo1.py

from tasks import add   #导入tasks模块

re = add.delay(10, 20)print(re.result)   #获取结果print(re.ready)       #是否处理print(re.get(timeout=1))      #获取结果print(re.status)      #是否处理

执行test.py后在celery行能看到相关的操作日志:

[2018-05-25 11:31:28,373: WARNING/ForkPoolWorker-1] (‘running...‘, 4, 4)
[2018-05-25
11:31:28,394: INFO/ForkPoolWorker-1] Task
tasks.add[30b145f9-14f7-4cd8-aa5e-7b6105c52325] succeeded in
0.0216804221272s: 8
[2018-05-25 11:31:58,991: INFO/MainProcess] Received task: tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762]  
[2018-05-25 11:31:58,995: WARNING/ForkPoolWorker-1] (‘running...‘, 4, 4)
[2018-05-25
11:31:58,998: INFO/ForkPoolWorker-1] Task
tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762] succeeded in
0.00274921953678s: 8

打开 backend的redis,也可以看见celery执行的信息。

在python环境中调用的add函数,实际上是在应用程序中调用这个方法。需要注意,如果把返回值赋值给一个变量,那么原来的应用程序也会被阻塞,需要等待异步任务返回的结果。因此,实际使用中,不需要把结果赋值。

5、Celery模块调用

既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,我觉得就要提到celery的消息路由机制,就要提一下AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去,如图:

6、多worker,多队列

先写脚本task.py

[[email protected] celery]# cat tasks.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")

@app.task
def taskA(x,y):
return x + y

@app.task
def taskB(x,y,z):
return x + y + z

上面的代码中,首先定义了一个Celery的对象,然后通过celeryconfig.py对celery对象进行设置。之后又分别定义了三个task,分别是taskA, taskB和add。

接下来写celeryconfig.py,需要注意代码的缩进格式:

[[email protected] celery]# cat celeryconfig.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://192.168.48.131:6379/1"
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)

CELERY_ROUTES = {
‘tasks.taskA‘:{"queue":"for_task_A","routing_key":"for_task_A"},
‘tasks.taskB‘:{"queue":"for_task_B","routing_key":"for_task_B"}
}

配置文件一般单独写在一个文件中

启动一个worker来指定taskA

celery -A tasks worker -l info -n workerA.%h -Q for_task_A
celery -A tasks worker -l info -n workerB.%h -Q for_task_B

脚本测试:

from tasks import *
re1 = taskA.delay(100, 200)
print(re1.result)
re2 = taskB.delay(1, 2, 3)
print(re2.result)
re3 = add.delay(1, 2, 3)
print(re3.status)     #PENDING

我们看到add的状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

celery -A tasks worker -l info -n worker.%h -Q celery

print(re3.status)    #SUCCESS

7、Celery与定时任务

在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。

下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:

CELERY_TIMEZONE = ‘UTC‘
CELERYBEAT_SCHEDULE = {
    ‘taskA_schedule‘ : {
        ‘task‘:‘tasks.taskA‘,
        ‘schedule‘:20,
        ‘args‘:(5,6)
    },
    ‘taskB_scheduler‘ : {
        ‘task‘:"tasks.taskB",
        "schedule":200,
        "args":(10,20,30)
    },
    ‘add_schedule‘: {
        "task":"tasks.add",
        "schedule":10,
        "args":(1,2)
    }

注意格式,否则会有问题

Celery启动定时任务:

celery –A tasks beat

Celery启动定时任务:

这样taskA每20秒执行一次taskA.delay(5, 6)
taskB每200秒执行一次taskB.delay(10, 20, 30)
Celery每10秒执行一次add.delay(1, 2)

原文地址:https://www.cnblogs.com/Jweiqing/p/9096427.html

时间: 2024-07-31 13:14:01

cerely异步分布式的相关文章

python—Celery异步分布式

Celery异步分布式 Celery是一个python开发的异步分布式任务调度模块 Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等 使用redis连接url的格式为: redis://:[email protected]:port/db_number 例如: BROKER_URL = 'redis://localhost:6379/0' 1)huang.py from celery import Celery bro

【理论】python使用celery异步处理请求

Flask中使用celery队列处理执行时间较长的请求. 一. 安装celery pip install celery flask redis 二. celery简介 Celery是个异步分布式任务队列 通过Celery在后台跑任务并不像线程那么简单,但是用Celery的话,能够是应用有较好的扩展性,因为Celery是个分布式架构,下面介绍Celery的三个核心组件: 1. 生产者(Celery client): 生产者发送消息,在Flask上工作时,生产者在Flask应用内运行 2. 消费者(

一分钟读懂MySQL分布式消息的处理

在很多MYSQL环境中,对于MYSQL的分布式事物处理一直是个难题,在当前互联网环境中,大多数应用系统是基于SOA的很多复杂接口之间的调用,并且事物之间的处理优先级也是有先后的,所以对于实际入库的数据而言,不同的系统,对于当前入库的处理方式是不一样的,这样就衍生出了对于订阅MYSQL消息的需求. 在公司内部,这套分布式消息系统负责了各个子接口之间数据的衔接,同时肩负后端DW数据仓库的实时消息计算,多数的RDBMS数据,被分解成各种子消息队列,通过不同的topic被各种消费者订阅. 一.如何分解消

分布式梯队下降

分布式梯队下降 并行模型 模型并行(model parallelism): 分布式系统中的不同机器(GPU/CPU等)负责网络模型的不同部分---例如,神经网络模型的不同网络层被分配到不同的机器,或者同一层内部道德不同参数被分配到不同机器. 数据并行(data parallelism): 不同的机器有同一个模型的多个副本,每个机器分配到不同的数据,然后将所有机器的计算结构按照某种方式合并. 并行方法 同步数据并行方法(Synchronous Data Parallel Methods) 把数据切

Celery异步的分布式任务调度理解

什么是Celery呢? Celery是一个用Python开发的异步的分布式任务调度模块. Celery本身不包含消息服务,使用第三方消息服务,也就是Broker,来传递任务,目前支持的有Rebbimq,Redis,数据库以及其他的一些比如Amazon SQS,Monogdb和IronMQ . Celery支持同步和异步执行两种模式.同步模式为任务调用方等待任务执行完成,这种方式等同于RPC(Remote Procedure Call), 异步方式为任务在后台执行,调用方调用后就去做其他工作,之后

让程序员少写几万行代码:七个实用的分布式开源框架

来自:码云周刊 分布式系统的出现是为了用廉价的.普通的机器完成单个计算机无法完成的计算.存储任务.其目的是利用更多的机器,处理更多的数据.截止目前,分布式系统已普遍被应用在互联网企业中,相关的开源软件也层出不穷. 小编为大家整理了码云上广受好评的分布式服务框架,希望能给大家带来一点帮助,不足之处,欢迎讨论交流:) 1.项目名称:分布式架构开发套件 jeesuite-libs 项目简介:Jeesuite 是一个 Java 后台分布式架构开发套件.涵盖缓存.消息队列.db 操作(读写分离.分库路由.

深度强化学习(Deep Reinforcement Learning)入门:RL base & DQN-DDPG-A3C introduction

转自https://zhuanlan.zhihu.com/p/25239682 过去的一段时间在深度强化学习领域投入了不少精力,工作中也在应用DRL解决业务问题.子曰:温故而知新,在进一步深入研究和应用DRL前,阶段性的整理下相关知识点.本文集中在DRL的model-free方法的Value-based和Policy-base方法,详细介绍下RL的基本概念和Value-based DQN,Policy-based DDPG两个主要算法,对目前state-of-art的算法(A3C)详细介绍,其他

大型网站技术架构-入门梳理【转】

罗列了大型网站架构涉及到的概念,附上了简单说明 前言 本文是对<大型网站架构设计>(李智慧 著)一书的梳理,类似文字版的"思维导图" 全文主要围绕"性能,可用性,伸缩性,扩展性,安全"这五个要素 性能,可用性,伸缩性这几个要素基本都涉及到应用服务器,缓存服务器,存储服务器这几个方面 概述 三个纬度:演化.模式.要素 五个要素: 性能,可用性,伸缩性,扩展性,安全 演化历程 图例可参考 大型网站架构演化历程: 初始阶段的网站架构:一台服务器,上面同时拥有应

我的职业与事业:研发小而美的软件产品

运用逻辑设施和人文思想创作帮助人们处理问题或使人们心灵快乐的软件产品. 逻辑与情感交融的艺术. 技能总结 用户界面:           HTML/Javascript/CSS       jQuery/Bootstrap/Extjs 应用服务器:        jetty / tomcat 通信协议:           HTTP/IP/TCP/JSON/XML/字符串 服务应用程序:    JVM Platform/Java/Groovy/Mysql/Linux/CloudComputing