分布式计算--(分布式+多进程+多线程+多协程)

先来个最简单的例子:

把1-10000每个数求平方

服务器server:

用两个队列存储任务、结果

定义两个函数

要实现分布式得继承multiprocessing.managers.BaseManager

在主函数里multiprocessing.freeze_support()开启分布式支持

注册两个函数给客户端调用

创建管理器,设置ip地址和开启端口、链接密码。

用两个队列加任务、收结果。用刚刚注册的函数

把1-10000压入队列,

把结果压入队列

最后完成关闭服务器

客户端client:

也需要继承multiprocessing.managers.BaseManager

定义一个协程处理一个数据,同时把结果压入结果队列

定义一个线程处理10个数据,开启10个协程

定义一个进程,进程驱动10个线程

主函数:同客户端注册两个函数

同客户端创建管理器,设置ip地址和开启端口、链接密码。

链接服务器

同客户端调用注册的函数,两个队列

套四层循环:10个进程、100个线程、1000个协程

循环进程函数

上代码:

服务器server:

#coding:utf-8
import multiprocessing  #分布式进程
import multiprocessing.managers #分布式进程管理器
import random,time  #随机数,时间
import Queue #队列

task_queue=Queue.Queue() #任务
result_queue=Queue.Queue() #结果

def  return_task(): #返回任务队列
    return task_queue
def return_result(): #返回结果队列
    return   result_queue

class  QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据
    pass

if __name__=="__main__":
    multiprocessing.freeze_support()#开启分布式支持
    QueueManger.register("get_task",callable=return_task)#注册函数给客户端调用
    QueueManger.register("get_result", callable=return_result)
    manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #创建一个管理器,设置地址与密码
    manger.start() #开启
    task,result=manger.get_task(),manger.get_result() #任务,结果
    for  i  in range(10000):
        print "task add data",i
        task.put(i)
    print "waitting for------"
    for  i  in range(10000):
        res=result.get(timeout=100)
        print "get data",res

    manger.shutdown()#关闭服务器

客户端client:

#coding:utf-8
import multiprocessing  #分布式进程
import multiprocessing.managers  # 分布式进程管理器
import random,time  #随机数,时间
import Queue #队列
import threading
import gevent
import gevent.monkey

class  QueueManger(multiprocessing.managers.BaseManager):# 继承,进程管理共享数据
    pass
def  gevetygo(num ,result): #协程处理一个数据
    print num*num
    result.put(num*num)

def  threadgo(datalist,result): # 线程处理10个数据,开启10个协程
    tasklist=[]
    for  data  in datalist:
        tasklist.append(gevent.spawn(gevetygo, data,result))
    gevent.joinall(tasklist)

def  processgo(ddatalist,result): # [[1,2,3],[4,5,6]] 进程驱动了10个线程
    threadlist=[]
    for  datalist in ddatalist:
        mythread=threading.Thread(target=threadgo,args=(datalist,result))
        mythread.start()
        threadlist.append(mythread)
    for mythread in threadlist:
        mythread.join()

if __name__=="__main__":
    QueueManger.register("get_task")  # 注册函数调用服务器
    QueueManger.register("get_result")
    manger=QueueManger(address=("192.168.112.11",8848),authkey="123456")
    manger.connect()  # 链接服务器
    task= manger.get_task()
    result =manger.get_result()  # 任务,结果

    # 1000
    # 10个进程
    # 100个线程
    # 1000个协程

    for  i  in range(10):
        cubelist = []  # [[[1],[2]]]
        for j in range(10):
            arealist = []
            for k in range(10):
                linelist = []
                for l in range(10):
                    data = task.get()
                    linelist.append(data)
                arealist.append(linelist)
            cubelist.append(arealist)

        processlist = []
        for myarealist in cubelist:
            process = multiprocessing.Process(target=processgo, args=(myarealist, result))
            process.start()
            processlist.append(process)
        for process in processlist:
            process.join()

遇到的坑:一个月之前弄分布式的时候写ip地址怎么都开启不了,后来换了台电脑就支持了= =。

如果只是在自己电脑上弄的话,写127.0.0.1也可以运行,如果你也遇到ip地址怎么都开启不了的情况

原文地址:https://www.cnblogs.com/symkmk123/p/9418304.html

时间: 2024-08-30 01:13:37

分布式计算--(分布式+多进程+多线程+多协程)的相关文章

python并发之多进程、多线程、协程和异步

一.多线程 二.协程(又称微线程,纤程) 协程,与线程的抢占式调度不同,它是协作式调度.协程在python中可以由generator来实现. 首先要对生成器和yield有一个扎实的理解. 调用一个普通的python函数,一般是从函数的第一行代码开始执行,结束于return语句.异常或者函数执行(也可以认为是隐式地返回了None). 一旦函数将控制权交还给调用者,就意味着全部结束.而有时可以创建能产生一个序列的函数,来“保存自己的工作”,这就是生成器(使用了yield关键字的函数). 能够“产生一

多线程配合协程

协程配合线程 asyncio.run_coroutine_threadsafe 该方法的语法如下: asyncio.run_coroutine_threadsafe(coro, loop) 其实在协程中也可以使用多线程,有时候我们需要在主线程中启动一个子线程去做别的任务,这个时候我们就要用到下面的方法了,先上一个流畅的Python中的代码. import time import asyncio from threading import Thread now = lambda: time.tim

游戏服务器之多进程架构通信 协程切换只是简单地改变执行函数栈,不涉及内核态与用户态转化,也涉及上下文切换,

游戏服务器之多进程架构通信 https://gameinstitute.qq.com/community/detail/124098 https://www.zhihu.com/question/23508968 游戏服务器与普通服务器有什么区别? 游戏开发中的TCP.UDP.HTTP.WebSocket四种网络通讯协议对比 https://gameinstitute.qq.com/community/detail/127562 https://www.jianshu.com/p/4eb37c1

flask多线程多协程操作

local的作用:各个线程各开辟一块空间互不影响 基于local""" import threading from threading import local import time obj = local() def task(i): obj.xxxxx = i time.sleep(2) print(obj.xxxxx,i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start()

Cpython解释器下实现并发编程——多进程、多线程、协程、IO模型

一.背景知识 进程即正在执行的一个过程.进程是对正在运行的程序的一个抽象. 进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一.操作系统的其他所有内容都是围绕进程的概念展开的.   一.操作系统相关的知识 详情见链接:http://www.cnblogs.com/linhaifeng/p/6295875.html 即使可以利用的CPU只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力.将一个单独的CPU变成多个虚拟的CPU(多道技术:时

协程Coroutine

协程是一种用户态的轻量级线程. server的发展如下: IO密集型应用: 多进程->多线程->事件驱动->协程 CPU密集型应用:多进程-->多线程 如果说多进程对于多CPU,多线程对应多核CPU,那么事件驱动和协程则是在充分挖掘不断提高性能的单核CPU的潜力. 异步事件驱动模型中,把会导致阻塞的操作转化为一个异步操作,主线程负责发起这个异步操作,并处理这个异步操作的结果.由于所有阻塞的操作都转化为异步操作,理论上主线程的大部分时间都是在处理实际的计算任务,少了多线程的调度时间,

进程、线程和协程的理解-自己随笔

1. IO 操作不占用CPU(从硬盘读数据,从网络读数据,从内存读取数据) 计算占用CPU,例如1+1=2的计算就是占用CPU的. python 多线程,不适合CPU密集操作系统的任务,适合IO操作密集型的任务. 2. 进程.线程和协程之间的关系和区别也困扰我一阵子了,最近有一些心得,写一下. 进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度. 线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的). 协程和线程一样共享堆,不共享栈,协程由程序

浅谈我对协程的理解

我心中的协程 最近在研究网络服务框架方面的东西,发现了一个神奇的东西-协程. 一句话说明什么是线程:协程是一种用户态的轻量级线程. 一句话并不能完全概括协程的全部,但是起码能让我们对协程这个概念有一个基本的印象. 从硬件发展来看,从最初的单核单CPU,到单核多CPU,多核多CPU,似乎已经到了极限了,但是单核CPU性能却还在不断提升.server端也在不断的发展变化.如果将程序分为IO密集型应用和CPU密集型应用,二者的server的发展如下: IO密集型应用: 多进程->多线程->事件驱动-

单线程、多线程、多进程、协程比较,以爬取新浪军事历史为例

演示python单线程.多线程.多进程.协程 1 import requests,json,random 2 import re,threading,time 3 from lxml import etree 4 5 lock=threading.Lock() 6 semaphore=threading.Semaphore(100) ###每次限制只能100线程 7 8 user_agent_list = [ 9 "Mozilla/5.0 (Windows NT 6.1; WOW64) Appl