多线程、生产者消费者模型

目录

  • 生产者消费者模型

    • 生产者消费者模型
    • 为什么要使用生产者和消费者模式
    • 什么是生产者消费者模式
    • 基于队列实现生产者消费者模型
  • 多线程
    • 什么是线程
    • 开启线程的两种方式
    • 线程与进程区别
    • Tread类的常用属性
    • 守护线程
    • 线程锁

生产者消费者模型

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('主')

生产者消费者模型总结:

#程序中有两类角色
    一类负责生产数据(生产者)
    一类负责处理数据(消费者)

#引入生产者消费者模型为了解决的问题是:
    平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度

#如何实现:
    生产者<-->队列<——>消费者
#生产者消费者模型实现程序的解耦和

多线程

什么是线程

线程指的是一条流水线的工作过程的总称

线程是CPU的基本执行单位

对比进程而言,进程仅仅是一个资源单位其包含了程序运行所需的资源,就像一个车间

而单有资源是无法生产出产品的,必须有具体的生产产品的逻辑代码

线程就相当于车间中的一条流水线,而你的代码就是流水线上的一道道工序

特点

1.每个进程都会有一个默认的线程

2.每个进程可以存在多个线程

3.同一进程中的所有线程之间数据是共享的

4.创建线程的开销远比创建进程小的多

主线程与子线程的区别

1.线程之间是没有父子之分,是平等的

2.主线程是由操作系统自动开启的,而子线是由程序主动开启

3.即时主线程的代码执行完毕,也不会结束进程,会等待所有线程执行完毕,进程才结束

开启线程的两种方式

1.实例化Tread类,target参数用于指定子线程要执行的任务

from threading import  Thread

def task():
    print("子线程 run........")

t = Thread(target=task)
t.start()
print("over")

2.继承Tread类,覆盖run方法

from threading import  Thread

class MyThread(Thread):
    def run(self):
        print("子线程 run........")

t = MyThread()
t.start()
print("over")

与进程在使用方法上没有任何区别,不同的是开启子线程的代码可以写在任意位置

之所以使用方法完全相同是因为,多进程其实是为了弥补多线程的缺憾而诞生的。详见GIL锁

线程与进程区别

1.同一进程中 线程之间数据共享

a = 100
def task():
    global a
    print("子线程 run........")
    a = 1

t = Thread(target=task)
t.start()

print(a) # 1
print("over")

2.创建线程的开销远比创建进程小的多

from threading import  Thread
from multiprocessing import  Process
import time

def task():
    pass

if __name__ == '__main__':
    start = time.time()
    for i in range(100):
        p = Thread(target=task)
        p.start()
    print(time.time()-start)
# 修改Thread 为Process类 查看结果

3.无论开启了多少子线程PID是不会变的

from threading import  Thread
import os

def task():
    print(os.getpid())

for i in range(100):
    p = Thread(target=task)
    p.start()

Tread类的常用属性

# threading模块包含的常用方法
import threading
print(threading.current_thread().name) #获取当前线程对象
print(threading.active_count()) # 获取目前活跃的线程数量
print(threading.enumerate()) # 获取所有线程对象

t = Thread(name="aaa")
# t.join() # 主线程等待子线程执行完毕
print(t.name) # 线程名称
print(t.is_alive()) # 是否存活
print(t.isDaemon()) # 是否为守护线程

守护线程

设置守护线程的语法与进程相同,相同的是也必须放在线程开启前设置,否则抛出异常。

守护线程的特点:

守护线程会在被守护线程结束后立即结束

from threading import  Thread
import time

def task():
    print("start......")
    time.sleep(5)
    print("end......")

t = Thread(target=task)
# t.setDaemon(True)
t.daemon = True
t.start()
print("main over!")

疑惑:

from threading import  Thread
import time

def task():
    print("start....1")
    time.sleep(3)
    print("end......1")

def task2():
    print("start....2")
    time.sleep(4)
    print("end......2")

t = Thread(target=task)
t.daemon = True
t.start()

t2 = Thread(target=task2)
t2.start()

print("main over!")

打印main over后主线程代码执行完毕,但是守护线程t1并没有立即结束,这是什么原因呢?

答:主线程会等待所有子线程执行完毕后结束

在上述例子中,一共有三个线程,主线程 ,t1,t2

虽然t1是守护线程 ,但是t2并不是所以主线程会等待t2执行结束才结束

顺序是:守护线程 等待 主线程 等待 其余子线程

换句话说,守护线程会随着所有非守护线程结束而结束。

线程锁

互斥锁

多线程的最主要特征之一是:同一进程中所有线程数据共享

一旦共享必然出现竞争问题。

a = 10
#lock = Lock()
def task():
    global a
    #lock.acquire()
    b = a - 1
    time.sleep(0.1)
    a = b
    #lock.release()
for i in  range(10):
    t = Thread(target=task)
    t.start()

for t in threading.enumerate():
    if t != threading.current_thread():
        t.join()
print(a)
# 输出 9

当多个线程要并发修改同一资源时,也需要加互斥锁来保证数据安全。

同样的一旦加锁,就意味着串行,效率必然降低。

死锁

现有两把锁l1和l2 用于表示盘子和筷子

两个线程的目标是吃饭,要吃饭的前提是同时拿到筷子和盘子,但是两个人的目标不同一个先拿筷子 ,一个先拿盘子最终造成死锁

l1 = Lock()
l2 = Lock()

def task():
    l1.acquire()
    print(threading.current_thread().name,"拿到了筷子")
    time.sleep(0.1)
    l2.acquire()
    print(threading.current_thread().name, "拿到了盘子")

    print("吃饭")
    l1.release()
    l2.release()

def task2():
    l2.acquire()
    print(threading.current_thread().name, "拿到了盘子")

    l1.acquire()
    print(threading.current_thread().name,"拿到了筷子")

    print("吃饭")

    l2.release()
    l1.release()

t1 = Thread(target=task)
t1.start()
t2 = Thread(target=task2)
t2.start()

共有两把锁,但是一人拿到了一把,并且互不释放,相互等待,导致程序卡死,这就死锁。

要发生死锁只有两种情况

1.有不止一把锁,不同线程或进程分别拿到了不同的锁不放

2.对同一把锁执行了多次acquire

其中第二种情况我们可以通过可重入锁来解决

可重入锁

Rlock 同一个线程可以多次执行acquire,释放锁时,有几次acquire就要release几次。

但是本质上同一个线程多次执行acquire时没有任何意义的,其他线程必须等到RLock全部release之后才能访问共享资源。

所以Rlock仅仅是帮你解决了代码逻辑上的错误导致的死锁,并不能解决多个锁造成的死锁问题

# 同一把RLock 多次acquire
#l1 = RLock()
#l2 = l1

# 不同的RLock 依然会锁死
#l1 = RLock()
#l2 = RLock()

def task():
    l1.acquire()
    print(threading.current_thread().name,"拿到了筷子")
    time.sleep(0.1)
    l2.acquire()
    print(threading.current_thread().name, "拿到了盘子")

    print("吃饭")
    l1.release()
    l2.release()

def task2():
    l2.acquire()
    print(threading.current_thread().name, "拿到了盘子")

    l1.acquire()
    print(threading.current_thread().name,"拿到了筷子")

    print("吃饭")

    l2.release()
    l1.release()

t1 = Thread(target=task)
t1.start()
t2 = Thread(target=task2)
t2.start()

忠告:在处理并发安全时 用完公共资源后一定要释放锁

信号量

Semaphore

信号量也是一种锁,其特殊之处在于可以让一个资源同时被多个线程共享,并控制最大的并发访问线程数量。

如果把Lock比喻为家用洗手间,同一时间只能一个人使用。

那信号量就可以看做公共卫生间,同一时间可以有多个人同时使用。

from threading import  Thread,Semaphore,current_thread
import time

s = Semaphore(3)
def task():
    s.acquire()
    print("%s running........" % current_thread())
    time.sleep(1)
    s.release()

for i in range(20):
    Thread(target=task).start()

原文地址:https://www.cnblogs.com/gaohuayan/p/11135537.html

时间: 2024-10-01 23:19:11

多线程、生产者消费者模型的相关文章

[多线程] 生产者消费者模型的BOOST实现

说明 如果 使用过程中有BUG 一定要告诉我:在下面留言或者给我邮件(sawpara at 126 dot com) 使用boost::thread库来实现生产者消费者模型中的缓冲区! 仓库内最多可以存放 capacity 个产品. 条件变量 condition_put 标记是否可以往仓库中存放一个产品. 条件变量 condition_get 标记是否可以从仓库中取出一个产品. 互斥量 mutexer 用于保证当前仓库只有一个线程拥有主权. 实现 #include <queue> #inclu

多线程,生产者消费者模型(生产馒头,消费馒头)

先建立一个容器 /** * 容器 * 共享资源 * @author Administrator * */ public class SynStack { int index = 0; //容器 SteamBread[] stb = new SteamBread[6]; /** * 往容器中放产品 */ public synchronized void push(SteamBread st){ while(index == stb.length){ try { this.wait(); } cat

生产者消费者模型实现多线程异步交互

[Python之旅]第六篇(五):生产者消费者模型实现多线程异步交互 消息队列 生产者消费者模型 多线程异步交互 摘要:  虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应该还包括Python的消息队列,因为这里多线程异步交互是通过Python的消息队列来实现的,因此主要内容如下: 1 2 3 4 1.生产者消费者模型:厨师做包子与顾客吃包子 2.Python的消息队列 3.利用... 虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应

Java多线程之~~~~使用wait和notify实现生产者消费者模型

在多线程开发中,最经典的一个模型就是生产者消费者模型,他们有一个缓冲区,缓冲区有最大限制,当缓冲区满 的时候,生产者是不能将产品放入到缓冲区里面的,当然,当缓冲区是空的时候,消费者也不能从中拿出来产品,这就 涉及到了在多线程中的条件判断,java为了实现这些功能,提供了wait和notify方法,他们可以在线程不满足要求的时候 让线程让出来资源等待,当有资源的时候再notify他们让他们继续工作,下面我们用实际的代码来展示如何使用wait和 notify来实现生产者消费者这个经典的模型. 首先是

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

多线程学习-基础(十二)生产者消费者模型:wait(),sleep(),notify()实现

一.多线程模型一:生产者消费者模型   (1)模型图:(从网上找的图,清晰明了) (2)生产者消费者模型原理说明: 这个模型核心是围绕着一个"仓库"的概念,生产者消费者都是围绕着:"仓库"来进行操作,一个仓库同时只能被一个生产者线程或一个消费者线程所操作,synchronized锁住的也是这个仓库,仓库是一个容器,所以会有边界值,0和仓库可存放上限,在这个上限内,可以设置多种级别,不同的级别可以执行不同的策略流程. (3)本案例使用知识点: Thread.curre

生产者消费者模型 与多线程(1)

生产者消费者模型 模型就是要解决某个问题的固定方法或套路 要解决的问题 生产者:泛指生产数据的一方 消费者:泛指处理数据的一方 双方的处理速度不一致,导致总有一方会在等待 解决问题的方法 先将双方解开耦合,让不同的进程负责不同的任务 提供一个共享的容器如队列,用来平衡双方的能力,用队列是因为队列可以在进程间共享 案例: from multiprocessing import Process,Queue import request import re,os,time,random #生产者任务

Java多线程(十):BlockingQueue实现生产者消费者模型

BlockingQueue BlockingQueue.解决了多线程中,如何高效安全"传输"数据的问题.程序员无需关心什么时候阻塞线程,什么时候唤醒线程,该唤醒哪个线程. 方法介绍 BlockingQueue是Queue的子类 void put(E e) 插入指定元素,当BlockingQueue为满,则线程阻塞,进入Waiting状态,直到BlockingQueue有空闲空间再继续. 这里以ArrayBlockingQueue为例进行分析 void take() 队首出队,当Bloc

Java多线程14:生产者/消费者模型

什么是生产者/消费者模型 一种重要的模型,基于等待/通知机制.生产者/消费者模型描述的是有一块缓冲区作为仓库,生产者可将产品放入仓库,消费者可以从仓库中取出产品,生产者/消费者模型关注的是以下几个点: 1.生产者生产的时候消费者不能消费 2.消费者消费的时候生产者不能生产 3.缓冲区空时消费者不能消费 4.缓冲区满时生产者不能生产 生产者/模型作为一种重要的模型,它的优点在于: 1.解耦.因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这一点很容易想到,这样生产者和消费者的代码发生变化,