并发编程之多进程3 (生产者与消费者模型) 回调函数

一、生产者消费模型补充

  总结:

    ---生产者消费者模型程序中两种角色:①负责生产数据(生产者);②负责处理数据(消费者)

    ---生产者消费者模型的作用:平衡生产者与消费者之间的速度差。

    ---实现方式:生产者——>队列——>消费者

  如上篇博客内容关于生产消费模型内容,在生产者生产数据的过程结束后,即使消费者已将数据完全获取,消费者程序也不能结束,需由主进程或者生产者在结束生产程序后发送给消费者结束口令,消费者程序才会结束。但是如果出现多个消费者和多个生产者,这种情况又该如何解决?方法如下两种:

1、根据消费者数量传送结束信号(low)

from multiprocessing import Process,Queue
import time,random,os

def procducer(q):
    for i in range(10):
        res=‘包子%s‘ %i
        time.sleep(0.5)
        q.put(res)
        print(‘%s 生产了 %s‘ %(os.getpid(),res))

def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        print(‘%s 吃 %s‘ %(os.getpid(),res))
        time.sleep(random.randint(2,3))

if __name__ == ‘__main__‘:
    q=Queue()
    p=Process(target=procducer,args=(q,))
    c=Process(target=consumer,args=(q,))

    p.start()
    c.start()

    p.join()
    q.put(None)
    print(‘主‘)

from multiprocessing import Process,Queue
import time
import random
import os
def producer(name,q):
    for i in range(10):
        res=‘%s%s‘ %(name,i)
        time.sleep(random.randint(1, 3))
        q.put(res)
        print(‘%s生产了%s‘ %(os.getpid(),res))
def consumer(name,q):
    while True:
        res=q.get()
        if not res:break
        print(‘%s吃了%s‘ %(name,res))
if __name__==‘__main__‘:
    q=Queue()
    p1=Process(target=producer,args=(‘巧克力‘,q))
    p2=Process(target=producer,args=(‘甜甜圈‘,q))
    p3=Process(target=producer, args=(‘奶油蛋糕‘,q))
    c1=Process(target=consumer,args=(‘alex‘,q))
    c2=Process(target=consumer,args=(‘egon‘,q))

    _p=[p1,p2,p3,c1,c2]
    for p in _p:
        p.start()
    p1.join()
    p2.join()
    p3.join()
    ‘‘‘保证生产程序结束后,再发送结束信号,发送数量和消费者数量一致‘‘‘
    q.put(None)
    q.put(None)

天啊噜

2、JoinableQueue队列机制

 JoinableQueue与Queue队列基本相似,但前者队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。Queue实例的对象具有的方法JoinableQueue同样具有,除此JoinableQueue还具有如下方法:

  ①q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

  ②q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

from multiprocessing import Process,JoinableQueue
import time
import random
def producer(name,food,q):
    for i in range(10):
        res=‘%s%s‘ %(food,i)
        time.sleep(random.randint(1, 3))
        q.put(res)
        print(‘%s生产了%s‘ %(name,res))
    q.join()  #阻塞生产者进程,保证此进程结束时消费者进程已处理完其产生的数据
def consumer(name,q):
    while True:
        res=q.get()
        if not res:break
        print(‘%s吃了%s‘ %(name,res))
        q.task_done()
if __name__==‘__main__‘:
    q=JoinableQueue()
    p1=Process(target=producer,args=(1,‘巧克力‘,q))
    p2=Process(target=producer,args=(2,‘奶油蛋糕‘,q))
    p3 = Process(target=producer, args=(3,‘冰糖葫芦‘, q))
    c1=Process(target=consumer,args=(‘lishi‘,q))
    c2=Process(target=consumer,args=(‘jassin‘,q))
    ‘‘‘守护进程保证主进程结束时,守护进程也立即结束‘‘‘
    c1.daemon=True
    c2.daemon=True

    _p=[p1,p2,p3,c1,c2]
    for p in _p:
        p.start()
    p1.join()
    p2.join()
    p3.join()

二、回调函数

  进程池执行完一个获得数据的进程,即刻要求通知主进程拿去解析数据。主进程调用一个函数去处理,这个函数便被称为回调函数,要求进程池进程的结果为回调函数的参数。

  爬虫实例:

线程池

import requests
from concurrent.futures import ThreadPoolExecutor(线程池),ProcessPoolExecutor(进程池)
from threading import current_thread
import time
import os

def get(url):  # 下载
    print(‘%s GET %s‘ %(current_thread().getName(),url))
    response=requests.get(url)
    time.sleep(3)
    if response.status_code == 200:   # 固定,=200表示下载完成
        return {‘url‘:url,‘text‘:response.text}

def parse(obj):  # 解析
    res=obj.result()
    print(‘[%s] <%s> (%s)‘ % (current_thread().getName(), res[‘url‘],len(res[‘text‘])))

if __name__ == ‘__main__‘:
    urls = [
        ‘https://www.python.org‘,
        ‘https://www.baidu.com‘,
        ‘https://www.jd.com‘,
        ‘https://www.tmall.com‘,
    ]
    t=ThreadPoolExecutor(2)
    for url in urls:
        t.submit(get,url).add_done_callback(parse)
    t.shutdown(wait=True)

    print(‘主‘,os.getpid())

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数。

进程池

import requests
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
import os

def get(url):
    print(‘%s GET %s‘ %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(3)
    if response.status_code == 200:
        return {‘url‘:url,‘text‘:response.text}

def parse(obj):
    res=obj.result()
    print(‘[%s] <%s> (%s)‘ % (os.getpid(), res[‘url‘],len(res[‘text‘])))

if __name__ == ‘__main__‘:
    urls = [
        ‘https://www.python.org‘,
        ‘https://www.baidu.com‘,
        ‘https://www.jd.com‘,
        ‘https://www.tmall.com‘,
    ]

    t=ProcessPoolExecutor(2)
    for url in urls:
        t.submit(get,url).add_done_callback(parse)
    t.shutdown(wait=True)
    print(‘主‘,os.getpid())
时间: 2024-11-10 12:01:23

并发编程之多进程3 (生产者与消费者模型) 回调函数的相关文章

并发编程—— 阻塞队列和生产者-消费者模式

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 概述 第1部分 为什么要使用生产者和消费者模式 第2部分 什么是生产者消费者模式 第3部分 代码示例 第1部分 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费

并发编程之——多进程

一.基本概念 1.1 进程 其实进程就是正在进行的一个程序或者任务,而负责执行任务的是CPU,执行任务的地方是内存.跟程序相比,程序仅仅是一堆代码而已,而程序运行时的过程才是进程.另外同一个程序执行两次就是两个进程了. 1.2 并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务.对于"并发"而言,是伪并行,即看起来是同时运行,单个cpu+多道技术就可以实现并发

python并发编程之多进程1------互斥锁与进程间的通信

一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理. 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全. 1.上厕所的小例子:你上厕所的时候肯定得锁门吧,有人来了看见门锁着,就会在外面等着,等你吧门开开出来的时候,下一个人才去上厕所. 1 from multiprocessing import Process,Lock 2 import os 3

python并发编程之多进程

python并发编程之多进程 一.什么是进程 进程:正在进行的一个过程或者一个任务,执行任务的是CPU. 原理:单核加多道技术 二.进程与程序的区别 进程是指程序的运行过程 需要强调的是:同一个程序执行两次是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,另一个可以播放武藤兰. 三.并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务. (1)并发

并发编程之多进程

#Python 并发编程之多进程##1.1 multiprocessing 模块 Python 中的多线程无法利用多核资源,如果想要充分的使用多核 cpu 的资源,在 Python 中大部分情况需要使用多进程. Python 为我们提供了非常好用的多进程包multiprocessing! os.cpu_coutn() multiprocessing模块用来开启子进程,并在模块中执行我们定制的任务(如函数,实现功能等). multiprocessing 模块的功能有很多: ?支持子进程 ?通信和共

并发编程之多进程进程进程

Python 并发编程之多进程 1.1 multiprocessing 模块 Python 中的多线程无法利用多核资源,如果想要充分的使用多核 cpu 的资源,在 Python 中大部分情况需要使用多进程. Python 为我们提供了非常好用的多进程包multiprocessing! os.cpu_coutn() multiprocessing模块用来开启子进程,并在模块中执行我们定制的任务(如函数,实现功能等). multiprocessing 模块的功能有很多: ?支持子进程 ?通信和共享数

并发协作:多线程中的生产者与消费者模型

对于多线程程序来说,不管任何编程语言,生产者和消费者模型都是最经典的.就像学习每一门编程语言一样,Hello World!都是最经典的例子. 实际上,准确说应该是“生产者-消费者-仓储”模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确一下几点: 1.生产者仅仅在仓储未满时候生产,仓满则停止生产. 2.消费者仅仅在仓储有产品时候才能消费,仓空则等待. 3.当消费者发现仓储没产品可消费时候会通知生产者生产. 4.生产者在生产出可消费产品时候,应该通知等待的消费者去消费.

进程,操作系统,Python并发编程之多进程

1.进程基础知识 1.程序:若干文件 2.进程:一个正在执行的文件,程序 3.进程被谁执行:cpu最终运行指定的程序 4.操作系统调度作用:将磁盘上的程序加载到内存,然后交由CPU去处理,一个CPU正在运行的一个程序,就叫开启了一个进程 2.操作系统 1.操作系统:存在于硬盘与软件之间,管理.协调.控制软件与硬件的交互 2.操作系统的作用:将一些复杂的硬件封装成简单的借口,便于使用;合理地调度分配多个进程与cpu的关系,让其有序化 3.操作系统发展史 ①第一代电子计算机(1940-1955) 二

python并发编程02/多进程

目录 python并发编程02/多进程 1.进程创建的两种方式 1.1开启进程的第一种方式 1.2开启进程的第二种方式 1.3简单应用 2.进程pid 2.1命令行获取所有的进程的pid tasklist 2.2代码级别如何获取一个进程的pid 2.3获取父进程(主进程)的pid 3.验证进程之间的空间隔离 4.进程对象join方法 5.进程对象其他属性 6.守护进程 python并发编程02/多进程 1.进程创建的两种方式 1.1开启进程的第一种方式 from multiProcessing

Go语言编程:使用条件变量Cond和channel通道实现多个生产者和消费者模型

如题,使用条件变量Cond和channel通道实现多个生产者和消费者模型.Go语言天生带有C语言的基因,很多东西和C与很像,但是用起来 绝对比C语言方便.今天用Go语言来实现下多消费者和生产者模型.如果对C语言的多生产者和消费者模型感兴趣的可以看Linux系统编程:使用mutex互斥锁和条件变量实现多个生成者和消费者模型 代码实现代码实现用了Cond条件变量和channel通道. package main import ( "fmt" "math/rand" &qu