python全栈开发day33-进程间的通信、进程间的数据共享,进程池

一、昨日内容回顾:

    1.  守护进程

        1)、p.saemon,

        2 )、p.terminate

        3 )、p.join

    2.  同步控制

      1)、锁,Lock

        互斥锁,解决数据安全、进程之间资源抢占问题。

      2)、信号量,Semaphore

        锁+计数器

      3)、事件,Event

        通过一个标志位flag来控制进程的阻塞和执行。

    3.  多进程实现tcp协议的socket的sever端

        1)子进程中不能使用input

        2)允许端口的重用设置

        3)妥善处理sk的close确保操作系统的资源能够被及时回收。

        

import socket
from multiprocessing import Process

def func(conn):
    conn.send(b‘hello‘)
    data = conn.recv(1024)
    print(data.decode(‘utf-8‘))
    conn.close()

if __name__ == ‘__main__‘:
    sk = socket.socket()
    sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sk.bind((‘127.0.0.1‘, 9000))
    sk.listen(5)
    try:
        while True:
            con, addr = sk.accept()
            p = Process(target=func, args=(con,))
            p.start()
    finally:
        sk.close()

server

import socket

sk = socket.socket()
sk.connect((‘127.0.0.1‘, 9000))
data = sk.recv(1024)
print(data)
msg = input(‘>>>‘).encode(‘utf-8‘)
sk.send(msg)

client

二、今日内容总结:

    1、进程间的通信:

        1)、队列 Queue:队列是加锁的,在多进程之间对数据的管理是安全的

            维护了一个先进先出的顺序,且保证了数据在进程之间是安全的。

            put,get,full,empty,get_nowait,put_nowait

          生产者和消费者模型:

            (1)、解决生产消费供需关系,生产的东西不够吃,就再开启一个进程生产。。。

             (2)、解决消费者不能结束消费完物品的循环和阻塞问题,队列中引入None,

                让消费者再取的时候判断是否遇到None,遇到则结束。有几个消费者就队列中就put几个None

             (3)、解决生产者生产完成后主程序才结束问题,对生产者的进程进程join阻塞

          JoinableQueue:

            join和task_done方法:

            join会阻塞队列,直至队列中的数据被取完,且执行了一个task_done,程序才会继续执行。

from multiprocessing import Process, Queue
import time, random

def consumer(name, q):
    while True:
        time.sleep(random.randint(1, 3))
        food = q.get()
        if food is None: break
        print(‘%s吃了%s‘ % (name, food))

def producer(name, food, q):
    for i in range(10):
        time.sleep(random.randint(1, 5))
        q.put(‘%s生产了%s%s‘ % (name, food, i))
        print(‘%s生产了数据%s%s‘ % (name, food, i))

if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=producer, args=(‘egon‘, ‘面包‘, q))
    p2 = Process(target=producer, args=(‘taibai‘, ‘骨头‘, q))
    p1.start()
    p2.start()
    c1 = Process(target=consumer, args=(‘alex‘, q))
    c2 = Process(target=consumer, args=(‘firedragon‘, q))
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)

生产者和消费者模型例子None

from multiprocessing import JoinableQueue,Process
import time

def consumer(name,q):
    while True:
        obj = q.get()
        time.sleep(0.3)
        print(‘%s吃了一个%s‘ % (name, obj))
        q.task_done()

if __name__ == ‘__main__‘:
    q = JoinableQueue()
    for i in range(10):
        q.put(‘food%s‘ % i)
    p1 = Process(target=consumer, args=(‘alex‘, q))
    p1.daemon = True
    p1.start()
    q.join()    # 阻塞队列,直至队列的数据被取完,且执行了一个task_done()
                # p1为守护进程,主程序代码执行完毕后,守护进程随之结束,里边的循环自然也结束了。

生产者和消费者模型JoinableQueue

          

        2)、管道 Pipe:底层实现是pickle,对数据的管理是不安全的,队列的实现机制就是管道+锁

            双向通信:利用pickle实现的

            收不到,就阻塞

          # 管道的EOFError是怎么报出来的(同时关闭主进程的lp和子进程的lp就会报出EOFError)
          # 管道在数据管理上是不安全的
          # 队列的实现机制 就是 管道+锁

            

from multiprocessing import Pipe,Process
# lp,rp = Pipe()
# lp.send(‘hello‘)
# print(rp.recv())
# # print(rp.recv())    # 没有数据在此阻塞进程
# # rp.send()   # 不能发送空数据
# lp.send([1,2,3])
# print(rp.recv())

def consumer(lp,rp):
    lp.close()   #1.这个发关闭
    while True:
        print(rp.recv())

if __name__ == ‘__main__‘:
    lp, rp = Pipe()
    Process(target=consumer, args=(lp, rp)).start()
    Process(target=consumer, args=(lp, rp)).start()
    Process(target=consumer, args=(lp, rp)).start()
    Process(target=consumer, args=(lp, rp)).start()
    #rp.close()
    for i in range(100):
        lp.send(‘food%i‘ % i)
    lp.close()  #2.这个关闭 这两关闭才会报错EOFError

管道例子

    2、进程之间的数据共享,Manager

         Manager创建的数据(如字典等)可以在进程之间共享,涉及数据操作要加上锁,不然会出现数据错乱。

         m = Manager()            dic = m.dict({‘count’:100})

          with Manager() as m:            dic = m.dict({‘count’:100}) 但涉及dic的操作代码必须在with的缩进执行

        

from multiprocessing import Lock,Manager,Process

def func(dic_tmp, lock_tmp):
    with lock_tmp:
        dic_tmp[‘count‘] -= 1

if __name__ == ‘__main__‘:
    lock = Lock()
    with Manager() as m:
        dic = m.dict({‘count‘: 50})
        p_lst = []
        for i in range(50):
            p = Process(target=func, args=(dic, lock))
            p.start()
            p_lst.append(p)
        for i in p_lst:
            i.join()
        print(dic)

50个进程同时操作一个字典的例子

    3、进程池

       进程池使用场景PK多进程: 

        1.对于纯计算的代码,使用进程池更好(个人理解,高效利用cpu没有了节省了进程的开启和回收时间,也节省操作系统调度进程切换的时间)

        2.对于高IO的代码,没有更好选择的情况下使用多进程。

        总结:使用进程池比起多进程,节省了开启进程回收进程资源的时间,给操作系统调度进程降低了难度。

       进程池apply(同步)添加入池方法和apply_async(异步)

       使用进程池提交任务方法:

          p = Pool(5)

          p.appy(func=***,args=(,)))   #同步提交任务 没有多进程的优势

          p.apply_async(func=***,args=(,))  #异步提交任务

          p.close()   #关闭进程池,阻止向进程池添加新的任务

          p.join()  #依赖close,进程池必须先close后join(个人理解应该是要阻塞执行完进程池的任务,才进入非阻塞状态)

         ------代码-------

        

from multiprocessing import Pool,Process
import time,random

def wahaha(num):
    time.sleep(random.randint(1,3))
    print(‘num:%s‘ % num**num)

if __name__ == ‘__main__‘:

    # -------------------------适合高计算--------------------------------------------
    p = Pool(5)
    # start = time.time()
    # for i in range(100):
    #     p.apply_async(func=wahaha,args=(i,))
    #
    # p.close()
    # p.join()
    # print(time.time()-start)

    start = time.time()
    p.map(func=wahaha,iterable=range(101))
    print(time.time()-start)
# -------------------------适合高IO--------------------------------------------
    # start = time.time()
    # p_lst = []
    # for i in range(101):
    #     p = Process(target=wahaha,args=(i,))
    #     p.start()
    #     p_lst.append(p)
    # for i in p_lst:
    #     i.join()
    # print(time.time() - start)

       使用map添加任务的方法以及它和普通(apply_async)方法的区别:

          p.map(func=***,iterable=range(101))

          优点:就是一个任务函数,个一个itetable,节省了for循环和close,join,是一种简便写法。

          区别:apply_async和map相比,操作复杂,但是可以通过get方法获取返回值,而map不行。

        

def wahaha(num):
    print(num)
    return num*‘*‘

if __name__ == ‘__main__‘:

    p = Pool(5)
    start = time.time()
    result_lst = []
    for i in range(100):
        res = p.apply_async(func=wahaha,args=(i,))
        result_lst.append(res)
    print(result_lst)
    for j in result_lst:print(j.get())
    p.close()
    p.join()
    print(time.time()-start)

apply_async使用get获取返回值

       回调函数:可以接收func函数的返回值。但callback函数在主进程中运行。

         p.apply_async(func=***,args=(*,),callback=回调函数))

        

from multiprocessing import Pool,Process
import os

def wahaha(num):
    print(‘子进程:‘,os.getpid())
    return num**num

def callb(argv):
    print(os.getpid())
    print(argv)

if __name__ == ‘__main__‘:
    print(‘主进程‘, os.getpid())
    p = Pool(5)
    p.apply_async(func=wahaha,args=(1,),callback=callb)
    p.close()
    p.join()

callback

三、预习和扩展:

原文地址:https://www.cnblogs.com/wuchenggong/p/9178393.html

时间: 2024-08-01 07:51:18

python全栈开发day33-进程间的通信、进程间的数据共享,进程池的相关文章

python全栈开发基础【第二十一篇】互斥锁以及进程之间的三种通信方式(IPC)以及生产者个消费者模型

一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理. 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全. 1.上厕所的小例子:你上厕所的时候肯定得锁门吧,有人来了看见门锁着,就会在外面等着,等你吧门开开出来的时候,下一个人才去上厕所. from multiprocessing import Process,Lock import os import

python全栈开发,Day41(线程概念,线程的特点,进程和线程的关系,线程和python理论知识,线程的创建)

昨日内容回顾 队列 队列:先进先出.数据进程安全 队列实现方式:管道+锁 生产者消费者模型:解决数据供需不平衡 管道 双向通信,数据进程不安全 EOFError: 管道是由操作系统进行引用计数的 必须在所有进程中关闭管道后才能生成EOFError异常 数据共享(不常用) Manager list dict 数据进程不安全的 进程池 存放进程的容器 在进程创建之初,创建固定个数的进程 会被多个任务循环利用 节省了进程创建和销毁的时间开销 降低了操作系统调度进程的压力 信号量和进程池的区别 信号量:

python全栈开发目录

python全栈开发目录 linux命令 初识python python基础数据类型 函数编程.set.深浅拷贝 内置函数 文件操作 装饰器 迭代器和生成器 常用模块 初识类和对象 类和对象(进阶) 反射 异常处理 socket.IO多路复用 线程.进程.协程 HTML CSS JavaScript DOM文档操作 jQuery实例 web框架本质 Tornado mysql基础 mysql进阶 ..... 基本算法 递归--二分法查找 冒泡排序 更多 线程池

自学Python全栈开发第一次笔记

我已经跟着视频自学好几天Python全栈开发了,今天决定听老师的,开始写blog,听说大神都回来写blog来记录自己的成长. 我特别认真的跟着这个视频来学习,(他们开课前的保证书,我也写了一份,哈哈哈...)我现在是准大学生,准备学习编程,日后做一个程序员,哈哈哈.听说程序员很苦逼,不过貌似挣得也很多啊.并且我貌似也只喜欢计算机这个方面,所以我想在这个行业发光. 前些天学习了一些Linux一些命令: pwd     查看你当前所在的目录  /root=计算机/E盘 /    是根目录 cd(ch

Python 全栈开发【第一篇】:目录

Python 全栈开发[第0篇]:目录 第一阶段:Python 开发入门 Python 全栈开发[第一篇]:计算机原理&Linux系统入门 Python 全栈开发[第二篇]:Python基础语法入门 Python 全栈开发[第三篇]:数据类型.字符编码.文件操作 第二阶段:函数编程&常用标准库 Python 全栈开发[第四篇]:函数.递归.生成器.迭代器 Pyhton 全栈开发[第五篇]:常用模块学习 第三阶段:面向对象编程&网络编程基础 Python 全栈开发[第六篇]:面向对象

Python全栈开发【基础三】

Python全栈开发[基础三]  本节内容: 函数(全局与局部变量) 递归 函数 一.定义和使用 函数最重要的是减少代码的重用性和增强代码可读性 1 def 函数名(参数): 2 3 ... 4 函数体 5 ... 6 返回值 函数的定义主要有如下要点: def:表示函数的关键字 函数名:函数的名称,日后根据函数名调用函数 函数体:函数中进行一系列的逻辑计算 参数:为函数体提供数据 返回值:当函数执行完毕后,可以给调用者返回数据. 总结使用函数的好处: 1.减少代码重用 2.保持一致性,易维护

Python全栈开发【第一篇】:初识Python

Python全栈开发[第一篇] 本节内容: Python 的种类 Python 的环境 Python 入门(解释器.编码.变量.input输入.if流程控制与缩进.while循环) if流程控制与while循环练习题 基本数据类型前引 Python 的种类 Cpython Python的官方版本,使用C语言实现,使用最为广泛,CPython实现会将源文件(py文件)转换成字节码文件(pyc文件),然后运行在Python虚拟机上. Jyhton Python的Java实现,Jython会将Pyth

Python全栈开发

Python全栈开发 一文让你彻底明白Python装饰器原理,从此面试工作再也不怕了. 一.装饰器 装饰器可以使函数执行前和执行后分别执行其他的附加功能,这种在代码运行期间动态增加功能的方式,称之为“装饰器”(Decorator),装饰器的功能非常强大,但是理解起来有些困难,因此我尽量用最简单的例子一步步的说明这个原理. 1.不带参数的装饰器 假设我定义了一个函数f,想要在不改变原来函数定义的情况下,在函数运行前打印出start,函数运行后打印出end,要实现这样一个功能该怎么实现?看下面如何用

Python全栈开发【基础二】

Python全栈开发[基础二] 本节内容: Python 运算符(算术运算.比较运算.赋值运算.逻辑运算.成员运算) 基本数据类型(数字.布尔值.字符串.列表.元组.字典) 编码与进制转换 Python 运算符 1.算术运算: 2.比较运算: 3.赋值运算: 4.逻辑运算:  5.成员运算: 基本数据类型 1.数字 int(整型) 1 class int(object): 2 """ 3 int(x=0) -> integer 4 int(x, base=10) -&g

Python全栈开发【基础四】

Python全栈开发[基础四] 本节内容: 匿名函数(lambda) 函数式编程(map,filter,reduce) 文件处理 匿名函数 lambda表达式:对于简单的函数,存在一种简便的表示方式,即lambda表达式 1 #这段代码 2 def calc(n): 3 return n**n 4 print(calc(10)) 5 6 #换成匿名函数 7 calc = lambda n:n**n 8 print(calc(10)) 匿名函数主要是和其它函数搭配使用 举例: 1 ########