多线程中的应用之队列(queue)

队列queue 多应用在多线程中,对于多线程访问共享变量时,队列queue是线程安全的。
从queue队列的实现来看,队列使用了1个线程互斥锁(pthread.Lock()),以及3个条件标量(pthread.condition()),
来保证了线程安全。

?self.mutex互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。共有两种操作require获取锁,release释放锁。同时该互斥锁被三个共享变量同时享有,即操作conditiond时的require和release操作也就是操作了该互斥锁。
?self.not_full条件变量:当队列中有元素添加后,会通知notify其他等待添加元素的线程,唤醒等待require互斥锁,或者有线程从队列中取出一个元素后,通知其它线程唤醒以等待require互斥锁。
?self.not empty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,唤醒等待require互斥锁后,读取队列。
?self.all_tasks_done条件变量:消费者线程从队列中get到任务后,任务处理完成,当所有的队列中的任务处理完成后,会使调用queue.join()的线程返回,表示队列中任务以处理完毕。

1,创建队列对象
import Queue
q = Queue.Queue(maxsize = 5) #设置队列长度为5,当有大于5个的数据put进队列时,将阻塞(等待其它线程取走数据,将继续执行)。
Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。
如果maxsize小于1就表示队列长度无限。

2,将一个值放入队列
q.put(5) #put()方法在队尾插入一个元素。
put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。
如果队列当前已满,且block为1,put()方法就使调用线程暂停,直到空出一个位置。如果block为0,put方法将引发Full异常。

3,将一个值从队列取出
q.get() #get()方法从队头删除并返回一个元素。
可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有元素可取。
如果队列为空且block为False,队列将引发Empty异常。

4,Queue模块有三种队列及构造函数:
(1),Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
(2),LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
(3),优先级队列,优先级越低(数字越小)越先出来。 class queue.PriorityQueue(maxsize)
(4),双端队列(collections.deque)
例(优先级队列):
格式:q.put([优先级,值])
q.put([2,"b"])
q.put([1,"a"])
q.put([3,"c"])
while True:
  data=q.get()
  print(data[1])
依次输出:a,b,c

★常用方法(queue = Queue.Queue()):
queue.qsize() 返回队列的大小
queue.empty() 如果队列为空,返回True,反之False
queue.full() 如果队列满了,返回True,反之False
queue.full 与 maxsize 大小对应
queue.get(self, block=True, timeout=None) 获取队列中的一个元素,timeout等待时间
queue.get_nowait() 相当q.get(False);无阻塞的向队列中get任务,当队列为空时,不等待,而是直接抛出empty异常。
queue.put(self, item, block=True, timeout=None) 写入队列,timeout等待时间
queue.put_nowait(item) 相当q.put(item, False);无阻塞的向队列中添加任务,当队列为满时,不等待,而是直接抛出full异常.
queue.task_done() 完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
queue.join() 阻塞等待队列中任务全部处理完毕,再执行别的操作。

★说明:
(1)queue.put(self, item, block=True, timeout=None)函数:
申请获得互斥锁,获得后,如果队列未满,则向队列中添加数据,并通知notify其它阻塞的某个线程,唤醒等待获取require互斥锁。
如果队列已满,则会wait等待。最后处理完成后释放互斥锁。其中还有阻塞block以及非阻塞,超时等。

(2)queue.get(self, block=True, timeout=None)函数:
从队列中获取任务,并且从队列中移除此任务。首先尝试获取互斥锁,获取成功则队列中get任务,如果此时队列为空,则wait等待生产者线程添加数据。
get到任务后,会调用self.not_full.notify()通知生产者线程,队列可以添加元素了。最后释放互斥锁。
  
(3)队列的类定义:

  1 class Queue:
  2  """Create a queue object with a given maximum size.
  3
  4  If maxsize is <= 0, the queue size is infinite.
  5  """
  6  def __init__(self, maxsize=0):
  7   self.maxsize = maxsize
  8   self._init(maxsize)
  9   # mutex must be held whenever the queue is mutating. All methods
 10   # that acquire mutex must release it before returning. mutex
 11   # is shared between the three conditions, so acquiring and
 12   # releasing the conditions also acquires and releases mutex.
 13   self.mutex = _threading.Lock()
 14   # Notify not_empty whenever an item is added to the queue; a
 15   # thread waiting to get is notified then.
 16   self.not_empty = _threading.Condition(self.mutex)
 17   # Notify not_full whenever an item is removed from the queue;
 18   # a thread waiting to put is notified then.
 19   self.not_full = _threading.Condition(self.mutex)
 20   # Notify all_tasks_done whenever the number of unfinished tasks
 21   # drops to zero; thread waiting to join() is notified to resume
 22   self.all_tasks_done = _threading.Condition(self.mutex)
 23   self.unfinished_tasks = 0
 24
 25  def task_done(self):
 26   """Indicate that a formerly enqueued task is complete.
 27
 28   Used by Queue consumer threads. For each get() used to fetch a task,
 29   a subsequent call to task_done() tells the queue that the processing
 30   on the task is complete.
 31
 32   If a join() is currently blocking, it will resume when all items
 33   have been processed (meaning that a task_done() call was received
 34   for every item that had been put() into the queue).
 35
 36   Raises a ValueError if called more times than there were items
 37   placed in the queue.
 38   """
 39   self.all_tasks_done.acquire()
 40   try:
 41    unfinished = self.unfinished_tasks - 1
 42    if unfinished <= 0:
 43     if unfinished < 0:
 44      raise ValueError(‘task_done() called too many times‘)
 45     self.all_tasks_done.notify_all()
 46    self.unfinished_tasks = unfinished
 47   finally:
 48    self.all_tasks_done.release()
 49
 50  def join(self):
 51   """Blocks until all items in the Queue have been gotten and processed.
 52
 53   The count of unfinished tasks goes up whenever an item is added to the
 54   queue. The count goes down whenever a consumer thread calls task_done()
 55   to indicate the item was retrieved and all work on it is complete.
 56
 57   When the count of unfinished tasks drops to zero, join() unblocks.
 58   """
 59   self.all_tasks_done.acquire()
 60   try:
 61    while self.unfinished_tasks:
 62     self.all_tasks_done.wait()
 63   finally:
 64    self.all_tasks_done.release()
 65
 66  def qsize(self):
 67   """Return the approximate size of the queue (not reliable!)."""
 68   self.mutex.acquire()
 69   n = self._qsize()
 70   self.mutex.release()
 71   return n
 72
 73  def empty(self):
 74   """Return True if the queue is empty, False otherwise (not reliable!)."""
 75   self.mutex.acquire()
 76   n = not self._qsize()
 77   self.mutex.release()
 78   return n
 79
 80  def full(self):
 81   """Return True if the queue is full, False otherwise (not reliable!)."""
 82   self.mutex.acquire()
 83   n = 0 < self.maxsize == self._qsize()
 84   self.mutex.release()
 85   return n
 86
 87  def put(self, item, block=True, timeout=None):
 88   """Put an item into the queue.
 89
 90   If optional args ‘block‘ is true and ‘timeout‘ is None (the default),
 91   block if necessary until a free slot is available. If ‘timeout‘ is
 92   a non-negative number, it blocks at most ‘timeout‘ seconds and raises
 93   the Full exception if no free slot was available within that time.
 94   Otherwise (‘block‘ is false), put an item on the queue if a free slot
 95   is immediately available, else raise the Full exception (‘timeout‘
 96   is ignored in that case).
 97   """
 98   self.not_full.acquire()
 99   try:
100    if self.maxsize > 0:
101     if not block:
102      if self._qsize() == self.maxsize:
103       raise Full
104     elif timeout is None:
105      while self._qsize() == self.maxsize:
106       self.not_full.wait()
107     elif timeout < 0:
108      raise ValueError("‘timeout‘ must be a non-negative number")
109     else:
110      endtime = _time() + timeout
111      while self._qsize() == self.maxsize:
112       remaining = endtime - _time()
113       if remaining <= 0.0:
114        raise Full
115       self.not_full.wait(remaining)
116    self._put(item)
117    self.unfinished_tasks += 1
118    self.not_empty.notify()
119   finally:
120    self.not_full.release()
121
122  def put_nowait(self, item):
123   """Put an item into the queue without blocking.
124
125   Only enqueue the item if a free slot is immediately available.
126   Otherwise raise the Full exception.
127   """
128   return self.put(item, False)
129
130  def get(self, block=True, timeout=None):
131   """Remove and return an item from the queue.
132
133   If optional args ‘block‘ is true and ‘timeout‘ is None (the default),
134   block if necessary until an item is available. If ‘timeout‘ is
135   a non-negative number, it blocks at most ‘timeout‘ seconds and raises
136   the Empty exception if no item was available within that time.
137   Otherwise (‘block‘ is false), return an item if one is immediately
138   available, else raise the Empty exception (‘timeout‘ is ignored
139   in that case).
140   """
141   self.not_empty.acquire()
142   try:
143    if not block:
144     if not self._qsize():
145      raise Empty
146    elif timeout is None:
147     while not self._qsize():
148      self.not_empty.wait()
149    elif timeout < 0:
150     raise ValueError("‘timeout‘ must be a non-negative number")
151    else:
152     endtime = _time() + timeout
153     while not self._qsize():
154      remaining = endtime - _time()
155      if remaining <= 0.0:
156       raise Empty
157      self.not_empty.wait(remaining)
158    item = self._get()
159    self.not_full.notify()
160    return item
161   finally:
162    self.not_empty.release()
163
164  def get_nowait(self):
165   """Remove and return an item from the queue without blocking.
166
167   Only get an item if one is immediately available. Otherwise
168   raise the Empty exception.
169   """
170   return self.get(False)
171
172  # Override these methods to implement other queue organizations
173  # (e.g. stack or priority queue).
174  # These will only be called with appropriate locks held
175
176  # Initialize the queue representation
177  def _init(self, maxsize):
178   self.queue = deque()
179
180  def _qsize(self, len=len):
181   return len(self.queue)
182
183  # Put a new item in the queue
184  def _put(self, item):
185   self.queue.append(item)
186
187  # Get an item from the queue
188  def _get(self):
189   return self.queue.popleft() 

原文地址:https://www.cnblogs.com/mountain2011/p/11343254.html

时间: 2024-10-04 03:43:35

多线程中的应用之队列(queue)的相关文章

121 python程序中的线程操作-队列queue

一.线程队列 queue队列:使用方法同进程的Queue一样 如果必须在多个线程之间安全地交换信息时,队列在线程编程中尤其有用. 重要: q.put():往队列里面放值,当参数block=Ture的时候,timeout参数将会有作用,当队列已经满了的时候,在往里面放值时,block为True程序将会等待timeout的时间,过了时间程序会报错,block如果为Flase时,程序不会等待直接报错 q.get():从队列里面取值,当参数block=Ture的时候,timeout参数将会有作用,当队列

Java多线程总结之线程安全队列Queue

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列.Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列. 注:什么叫线程安全?这个首先要明确.线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的.如果由于多线程的访问(比如修改.遍历.查看)而使这些变量结构被破坏

多线程利器---队列(queue)

列表是不安全的数据结构 import threading,time li=[1,2,3,4,5] def pri(): while li: a=li[-1] print(a) time.sleep(1) try: li.remove(a) except Exception as e: print('----',a,e) t1=threading.Thread(target=pri,args=()) t1.start() t2=threading.Thread(target=pri,args=()

Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍

1.什么是阻塞队列? 所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了. 看一下线程的四种状态,首先是新创建一个线程,然后,通过start方法启动线程--->线程变为可运行可执行状态,然后通过数据产生共享,线程产生互斥---->线程状态变为阻塞状态---->阻塞状态想打开的话可以调用notify方法. 这里Java5中提供了封装好的类,可以直接调用然后构造阻塞状态,以保证数据的原子性. 2.如何实现? 主要是实现Blo

iOS多线程中,队列和执行的排列组合结果分析

本文是对以往学习的多线程中知识点的一个整理. 多线程中的队列有:串行队列,并发队列,全局队列,主队列. 执行的方法有:同步执行和异步执行.那么两两一组合会有哪些注意事项呢? 如果不是在董铂然博客园看到这边文章请 点击查看原文 提到多线程,也就是四种,pthread,NSthread,GCD,NSOperation 其中phtread是跨平台的.GCD和NSOperation都是常用的,后者是基于前者的. 但是两者区别:GCD的核心概念是将一个任务添加到队列,指定任务执行的方法,然后执行. NSO

STL中队列queue的用法

头文件:#include <queue> 建立一个队列queue < 类型 > q 加入一个新的元素q.push(a) 询问队首元素q.front() 弹出队首元素q.pop() 队里面有多少个元素q.size() 原文地址:https://www.cnblogs.com/yujh01/p/queue.html

使用C#的泛型队列Queue实现生产消费模式

本篇体验使用C#的泛型队列Queue<T>实现生产消费模式. 如果把生产消费想像成自动流水生产线的话,生产就是流水线的物料,消费就是某种设备对物料进行加工的行为,流水线就是队列. 现在,要写一个体现生产消费模式的泛型帮助类,比如叫ProducerConsumer<T>. 该类肯定会维护一个有关生产.物料的Queue<T>类型的字段,还存在一个有关消费.Action<T>类型的字段. 在ProducerConsumer类的构造函数中,为Action<T&

Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程

队列(queue) 队列只在多线程里有意义,是一种线程安全的数据结构. get与put方法 ''' 创建一个"队列"对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中: q.put() 调用队列对象的put()方法在队尾插入一个项目.put()

ZooKeeper实现分布式队列Queue

ZooKeeper实现分布式队列Queue 让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务. 现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了.这种配置如果简单地放几个web应用,显然是奢侈的浪费.就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的.对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了. 通过