Python 并行分布式框架:Celery

Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

架构设计

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQRedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

  • 任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  • 任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

另外, Celery还支持不同的并发和序列化的手段

  • 并发

    PreforkEventletgevent, threads/single threaded

  • 序列化

    picklejsonyamlmsgpackzlibbzip2 compression, Cryptographic message signing 等等

安装和运行

Celery的安装过程略为复杂,下面的安装过程是基于我的AWS EC2的Linux版本的安装过程,不同的系统安装过程可能会有差异。大家可以参考官方文档。

首先我选择RabbitMQ作为消息中间件,所以要先安装RabbitMQ。作为安装准备,先更新YUM。

?


1

sudo 
yum -y update

RabbitMQ是基于erlang的,所以先安装erlang

?


1

2

3

4

5

6

7

8


# Add and enable relevant application repositories:

# Note: We are also enabling third party remi package repositories.

wget http:
//dl
.fedoraproject.org
/pub/epel/6/x86_64/epel-release-6-8
.noarch.rpm

wget http:
//rpms
.famillecollet.com
/enterprise/remi-release-6
.rpm

sudo 
rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm

# Finally, download and install Erlang:

yum 
install 
-y erlang

然后安装RabbitMQ

?


1

2

3

4

5

6


# Download the latest RabbitMQ package using wget:

wget  

# Add the necessary keys for verification:

rpm --
import

# Install the .RPM package using YUM:

yum 
install 
rabbitmq-server-3.2.2-1.noarch.rpm

启动RabbitMQ服务

?


1

rabbitmq-server start

RabbitMQ服务已经准备好了,然后安装Celery, 假定你使用pip来管理你的python安装包

?


1

pip 
install 
Celery

为了测试Celery是否工作,我们运行一个最简单的任务,编写tasks.py

?


1

2

3

4

5

6

7

8


from 
celery 
import 
Celery

app 
= 
Celery(
‘tasks‘
, backend
=
‘amqp‘
, broker
=
‘amqp://[email protected]//‘
)

app.conf.CELERY_RESULT_BACKEND 
= 
‘db+sqlite:///results.sqlite‘

@app
.task

def 
add(x, y):

    
return 

+ 
y

在当前目录运行一个worker,用来执行这个加法的task

?


1

celery -A tasks worker --loglevel=info

其中-A参数表示的是Celery App的名字。注意这里我使用的是SQLAlchemy作为结果存储。对应的python包要事先安装好。

worker日志中我们会看到这样的信息

?


1

2

3

4

5


- ** ---------- [config]

- ** ---------- .> app:         tasks:0x1e68d50

- ** ---------- .> transport:   amqp:
//guest
:**@localhost :5672
//

- ** ---------- .> results:     db+sqlite:
///results
.sqlite

- *** --- * --- .> concurrency: 8 (prefork)

其中,我们可以看到worker缺省使用prefork来执行并发,并设置并发数为8

下面的任务执行的客户端代码:

?


1

2

3

4

5

6

7

8

9


from 
tasks 
import 
add

import 
time

result 
= 
add.delay(
4
,
4
)

while 
not 
result.ready():

  
print 
"not ready yet"

  
time.sleep(
5
)

print 
result.get()

用python执行这段客户端代码,在客户端,结果如下

?


1

2


not ready   

8

Work日志显示

?


1

2


[2015-03-12 02:54:07,973: INFO
/MainProcess
] Received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f]

[2015-03-12 02:54:08,006: INFO
/MainProcess
] Task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] succeeded 
in 
0.0309705100954s: 8

这里我们可以发现,每一个task有一个唯一的ID,task异步执行在worker上。

这里要注意的是,如果你运行官方文档中的例子,你是无法在客户端得到结果的,这也是我为什么要使用SQLAlchemy 来存储任务执行结果的原因。官方的例子使用AMPQ,有可能Worker在打印日志的时候取出了task的运行结果显示在worker日志中,然而 AMPQ作为一个消息队列,当消息被取走后,队列中就没有了,于是客户端总是无法得到任务的执行结果。不知道为什么官方文档对这样的错误视而不见。

如果大家想要对Celery做更进一步的了解,请参考官方文档

时间: 2024-10-20 04:54:50

Python 并行分布式框架:Celery的相关文章

Python 并行分布式框架 Celery

Celery 简介 除了redis,还可以使用另外一个神器---Celery.Celery是一个异步任务的调度工具. Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农. 在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是"中间人"的意思,在这里 Broker 起到一个中间人的角色.在工头提

【转】Python 并行分布式框架 Celery

原文链接:https://blog.csdn.net/freeking101/article/details/74707619 Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/ celery配置:http://docs.jinkan.o

分布式任务队列celery用法详解

celery基础介绍:这个图我们可以看出,celery基本结构也就是三部分1 第一部分 broker也就是中间件消息队列,作用就是用来接收应用的请求这一部分常见玩法可以是rabbitmq和redis等2 第二部分 worker 也就是工作队列 也就是celery本身的任务队列服务,一般情况下大型的生产应用我们会结合supervisor来管理这么多的worker3 第三部分 result 存储,就是把执行的结果,状态等信息进行存储,常规用法我们可以用rabbitmq redis,mysql,mon

量化投资策略:常见的几种Python回测框架(库)

量化投资策略:常见的几种Python回测框架(库) 原文地址:http://blog.csdn.net/lawme/article/details/51454237 本文章为转载文章.这段时间在研究量化策略方向,研究了Zipline一段时间,但是后续发现他仅支持美国股票,收集量化策略文章,转载到博客中. 在实盘交易之前,必须对量化交易策略进行回测.在此,我们评价一下常用的Python回测框架(库).评价的尺度包括用途范围(回测.虚盘交易.实盘交易),易用程度(结构良好.文档完整)和扩展性(速度快

Python 常用Web框架的比较

从GitHub中整理出的15个最受欢迎的Python开源框架.这些框架包括事件I/O,OLAP,Web开发,高性能网络通信,测试,爬虫等. Django: Python Web应用开发框架 Django 应该是最出名的Python框架,GAE甚至Erlang都有框架受它影响.Django是走大而全的方向,它最出名的是其全自动化的管理后台:只需要使用起ORM,做简单的对象定义,它就能自动生成数据库结构.以及全功能的管理后台. Diesel:基于Greenlet的事件I/O框架 Diesel提供一个

并行编程框架 ForkJoin

本文假设您已经了解一般并行编程知识,了解Java concurrent部分如ExecutorService等相关内容. 虽说是Java的ForkJoin并行框架,但不要太在意Java,其中的思想在其它语言环境也是同样适用的.因为并发编程在本质上是一样的.就好像如何找到优秀的Ruby程序员?其实要找的只是一个优秀的程序员.当然,如果语言层面直接支持相关的语义会更好. 引言 Java 语言从一开始就支持线程和并发性语义.Java5增加的并发工具又解决了一般应用程序的并发需求,Java6.Java7又

Python之Web框架介绍

所有的语言Web框架本质其实就是起一个socket服务端,监听一个端口,然后运行起来 Web框架包含两部分,一部分是socket,另外一部分是业务的逻辑处理,根据请求的不同做不同的处理 Python的Web框架分成了两类, 即包含socket也包含业务逻辑处理的(tornado) 不包含socket(框架本身通过第三方模块实现socket)只包含业务逻辑处理(django,Flask) WSGI的全称是Web Server Gateway Interface,翻译过来就是Web服务器网关接口.具

java并行调度框架封装及示例

参考资料:  阿里巴巴开源项目 CobarClient  源码实现. 分享作者:闫建忠 分享时间:2014年5月7日 --------------------------------------------------------------------------------------- 并行调度封装类设计: BXexample.java package org.hdht.business.ordermanager.quartzjob; import java.util.ArrayList;

Python开源爬虫框架scrapy的了解与认识

很多学习Python编程语言的朋友都会学习Python网络爬虫技术,也有专门学习网络爬虫技术的,那么如何学习Python爬虫技术呢,今天就给大家讲讲使用Python抓取数据时非常受欢迎的Python抓取框架scrapy,下面一起学习下Scrapy的架构,便于更好的使用这个工具. 一.概述 下图显示了Scrapy的大体架构,其中包含了它的主要组件及系统的数据处理流程(绿色箭头所示).下面就来一个个解释每个组件的作用及数据的处理过程. 二.组件 1.Scrapy Engine(Scrapy引擎) S