Python使用multiprocessing实现一个最简单的分布式作业调度系统

Python使用multiprocessing实现一个最简单的分布式作业调度系统

介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。

想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。

实现

Job

首先创建一个Job类,为了测试简单,只包含一个job id属性

job.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

class Job:
    def __init__(self, job_id):
        self.job_id = job_id

Master

Master用来派发作业和显示运行完成的作业信息

master.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job

class Master:

    def __init__(self):
        # 派发出去的作业队列
        self.dispatched_job_queue = Queue()
        # 完成的作业队列
        self.finished_job_queue = Queue()

    def get_dispatched_job_queue(self):
        return self.dispatched_job_queue

    def get_finished_job_queue(self):
        return self.finished_job_queue

    def start(self):
        # 把派发作业队列和完成作业队列注册到网络上
        BaseManager.register(‘get_dispatched_job_queue‘, callable=self.get_dispatched_job_queue)
        BaseManager.register(‘get_finished_job_queue‘, callable=self.get_finished_job_queue)

        # 监听端口和启动服务
        manager = BaseManager(address=(‘0.0.0.0‘, 8888), authkey=‘jobs‘)
        manager.start()

        # 使用上面注册的方法获取队列
        dispatched_jobs = manager.get_dispatched_job_queue()
        finished_jobs = manager.get_finished_job_queue()

        # 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业
        job_id = 0
        while True:
            for i in range(0, 10):
                job_id = job_id + 1
                job = Job(job_id)
                print(‘Dispatch job: %s‘ % job.job_id)
                dispatched_jobs.put(job)

            while not dispatched_jobs.empty():
                job = finished_jobs.get(60)
                print(‘Finished Job: %s‘ % job.job_id)

        manager.shutdown()

if __name__ == "__main__":
    master = Master()
    master.start()

Slave

Slave用来运行master派发的作业并将结果返回

slave.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job

class Slave:

    def __init__(self):
        # 派发出去的作业队列
        self.dispatched_job_queue = Queue()
        # 完成的作业队列
        self.finished_job_queue = Queue()

    def start(self):
        # 把派发作业队列和完成作业队列注册到网络上
        BaseManager.register(‘get_dispatched_job_queue‘)
        BaseManager.register(‘get_finished_job_queue‘)

        # 连接master
        server = ‘127.0.0.1‘
        print(‘Connect to server %s...‘ % server)
        manager = BaseManager(address=(server, 8888), authkey=‘jobs‘)
        manager.connect()

        # 使用上面注册的方法获取队列
        dispatched_jobs = manager.get_dispatched_job_queue()
        finished_jobs = manager.get_finished_job_queue()

        # 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业
        while True:
            job = dispatched_jobs.get(timeout=1)
            print(‘Run job: %s ‘ % job.job_id)
            time.sleep(1)
            finished_jobs.put(job)

if __name__ == "__main__":
    slave = Slave()
    slave.start()

测试

分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下

master

$ python master.py
Dispatch job: 1
Dispatch job: 2
Dispatch job: 3
Dispatch job: 4
Dispatch job: 5
Dispatch job: 6
Dispatch job: 7
Dispatch job: 8
Dispatch job: 9
Dispatch job: 10
Finished Job: 1
Finished Job: 2
Finished Job: 3
Finished Job: 4
Finished Job: 5
Finished Job: 6
Finished Job: 7
Finished Job: 8
Finished Job: 9
Dispatch job: 11
Dispatch job: 12
Dispatch job: 13
Dispatch job: 14
Dispatch job: 15
Dispatch job: 16
Dispatch job: 17
Dispatch job: 18
Dispatch job: 19
Dispatch job: 20
Finished Job: 10
Finished Job: 11
Finished Job: 12
Finished Job: 13
Finished Job: 14
Finished Job: 15
Finished Job: 16
Finished Job: 17
Finished Job: 18
Dispatch job: 21
Dispatch job: 22
Dispatch job: 23
Dispatch job: 24
Dispatch job: 25
Dispatch job: 26
Dispatch job: 27
Dispatch job: 28
Dispatch job: 29
Dispatch job: 30

slave1

$ python slave.py
Connect to server 127.0.0.1...
Run job: 1
Run job: 2
Run job: 3
Run job: 5
Run job: 7
Run job: 9
Run job: 11
Run job: 13
Run job: 15
Run job: 17
Run job: 19
Run job: 21
Run job: 23

slave2

$ python slave.py
Connect to server 127.0.0.1...
Run job: 4
Run job: 6
Run job: 8
Run job: 10
Run job: 12
Run job: 14
Run job: 16
Run job: 18
Run job: 20
Run job: 22
Run job: 24

转载请以链接形式标明本文地址

本文地址:http://blog.csdn.net/kongxx/article/details/50883804

时间: 2024-10-31 12:33:11

Python使用multiprocessing实现一个最简单的分布式作业调度系统的相关文章

用python+django+twistd 开发一个属于自己的运维系统

开源的运维系统不少,比如nagios.zabbix.cati等等,但是遇到自己个性化的运维需求的时候,总是显的力不从心!最近在学习python,所以就考虑用python+django+twisted来定做一个完全个性化的运维系统. 运维系统有几个主要的功能:监控.分析.报警.更甚者直接根据分析的结果进行反应操作.而以上几点通过上述的框架可以比较容易的实现. 下面上图说明: 使用freemind整理了下思路: 下面是一些代码段,完整的代码下载见文档底部: Server: #!/usr/bin/en

实现一个基于WCF的分布式缓存系统

前言: 用到分布式的东西很多了,一直想做一个简单的分布式小项目练练手学习下.后来决定来一个简单的分布式缓存的系统. 在企业应用开发中缓存的用例不胜枚举,但是每次更多的是单机的部署与使用,没有对应的需求是一个原因,另一个原因总是好高骛远做过的总是不想再进行修正. 这次的分布式就从最简单的分布式缓存开始.说简单是因为没有实现分布式缓存高深的寻址,或者对备份处理的牛X实现.只是实现了“分布”这个目的,不足之处还请大家指导. 分布的实现方式有哪些? 既然做“分布”,当然要看看主流的“分布”实现方式.小弟

python django TDD实现一个带简单注册的记事本例子

1.新建 django应用   NotPad 2.新建项目 note 3.新建功能测试 mkdir function_test touch function_test/ __init__.py 在 /function_test下新建tests.py 功能测试   /function_test/tests.py代码 from selenium import webdriver from selenium.webdriver.common.keys import Keys import time f

用Python写一个最简单的网络爬虫

什么是网络爬虫?这是百度百科的解释: 网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动的抓取万维网信息的程序或者脚本.另外一些不常使用的名字还有蚂蚁,自动索引,模拟程序或者蠕虫. 爬虫可以做什么?爬虫可以帮助我们在茫茫互联网中爬取我们需要的特定数据,这个特定数据可以是任何想获得的数据. 爬虫是一个让人热血的话题,因为当你在写爬虫的时候,你会感觉到自己是在做一件很NB的事,而每当写出一个爬虫,就会在此基础上不断尝试写出更NB的爬虫,有

python爬虫的一个常见简单js反爬

python爬虫的一个常见简单js反爬 我们在写爬虫是遇到最多的应该就是js反爬了,今天分享一个比较常见的js反爬,这个我已经在多个网站上见到过了. 我把js反爬分为参数由js加密生成和js生成cookie等来操作浏览器这两部分,今天说的是第二种情况. 目标网站 列表页url:http://www.hnrexian.com/archives/category/jk. 正常网站我们请求url会返回给我们网页数据内容等,看看这个网站返回给我们的是什么呢? 我们把相应中返回的js代码格式化一下,方便查

使用Python 写一个最简单的WebServer

import  socket def handle_request(client):    buf = client.recv(1024)    client.send("HTTP/1.1 200 OK\r\n\r\n")    client.send('<h30>Hello World</h10>') def main():    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)    sock.bin

python ---多进程 Multiprocessing

和 threading 的比较 多进程 Multiprocessing 和多线程 threading 类似, 他们都是在 python 中用来并行运算的. 不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL. 创建多进程 import multiprocessing as mp import threading as td def

【机器学习算法-python实现】采样算法的简单实现

1.背景 采样算法是机器学习中比较常用,也比较容易实现的(出去分层采样).常用的采样算法有以下几种(来自百度知道): 一.单纯随机抽样(simple random sampling) 将调查总体全部观察单位编号,再用抽签法或随机数字表随机抽取部分观察单位组成样本. 优点:操作简单,均数.率及相应的标准误计算简单. 缺点:总体较大时,难以一一编号. 二.系统抽样(systematic sampling) 又称机械抽样.等距抽样,即先将总体的观察单位按某一顺序号分成n个部分,再从第一部分随机抽取第k

python的multiprocessing模块进程创建、资源回收-Process,Pool

python的multiprocessing有两种创建进程的方式,每种创建方式和进程资源的回收都不太相同,下面分别针对Process,Pool及系统自带的fork三种进程分析. 1.方式一:fork() 举例: 1 import os 2 pid = os.fork() # 创建一个子进程 3 os.wait() # 等待子进程结束释放资源 4 pid为0的代表子进程. 缺点:1.兼容性差,只能在类linux系统下使用,windows系统不可使用:2.扩展性差,当需要多条进程的时候,进程管理变得