python 多线程与队列

各位好,之前写了多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。


Queue队列

Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。

  • Queue.Queue(maxsize)  FIFO(先进先出队列)
  • Queue.LifoQueue(maxsize)  LIFO(先进后出队列)
  • Queue.PriorityQueue(maxsize)  为优先级越高的越先出来,对于一个队列中的所有元素组成的entries,优先队列优先返回的一个元素是sorted(list(entries))[0]。至于对于一般的数据,优先队列取什么东西作为优先度要素进行判断,官方文档给出的建议是一个tuple如(priority, data),取priority作为优先度。
    如果设置的maxsize小于1,则表示队列的长度无限长

FIFO是常用的队列,常用的方法有:

  • Queue.qsize()   返回队列大小
  • Queue.empty()  判断队列是否为空
  • Queue.full()   判断队列是否满了
  • Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。
    在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。
  • Queue.put(...[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。
    同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。
    Queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成。
  • Queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

队列-单线程

import threading
import queue
import time

class worker(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.thread_stop = False

    def run(self):
        while not self.thread_stop:
            print("thread%d %s: waiting for tast" % (self.ident, self.name))
            try:
                task = q.get(block=True, timeout=2)  # 接收消息
            except queue.Empty:
                print("Nothing to do! I will go home!")
                self.thread_stop = True
                break
            print("tasking: %s ,task No:%d" % (task[0], task[1]))
            print("I am working")
            time.sleep(3)
            print("work finished!")
            q.task_done()                           # 完成一个任务
            res = q.qsize()                         # 判断消息队列大小(队列中还有几个任务)
            if res > 0:
                print("fuck! Still %d tasks to do" % (res))

    def stop(self):
        self.thread_stop = True

if __name__ == "__main__":
    q = queue.Queue(3)                                    # 创建队列(大小为3)
    worker = worker(q)                                    # 将队列加入类中
    worker.start()                                        # 启动类
    q.put(["produce cup!", 1], block=True, timeout=None)  # 向队列中添加元素,产生任务消息
    q.put(["produce desk!", 2], block=True, timeout=None)
    q.put(["produce apple!", 3], block=True, timeout=None)
    q.put(["produce banana!", 4], block=True, timeout=None)
    q.put(["produce bag!", 5], block=True, timeout=None)
    print("***************leader:wait for finish!")
    q.join()                                             # 等待所有任务完成
    print("***************leader:all task finished!")

输出:
thread9212 Thread-1: waiting for tast
tasking: produce cup! ,task No:1
I am working
work finished!
fuck! Still 3 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce desk! ,task No:2
I am working
***************leader:wait for finish!
work finished!
fuck! Still 3 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce apple! ,task No:3
I am working
work finished!
fuck! Still 2 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce banana! ,task No:4
I am working
work finished!
fuck! Still 1 tasks to do
thread9212 Thread-1: waiting for tast
tasking: produce bag! ,task No:5
I am working
work finished!
thread9212 Thread-1: waiting for tast
***************leader:all task finished!
Nothing to do!i will go home!

队列-多线程

import threading
import time
from queue import Queue

img_lists = [‘lipei_00006.mp3‘,‘lipei_00007.mp3‘,‘lipei_00012.mp3‘,‘lipei_00014.mp3‘,
             ‘lipei_00021.mp3‘,‘lipei_00027.mp3‘,‘lipei_00028.mp3‘,‘lipei_00035.mp3‘,
             ‘lipei_00039.mp3‘,‘lipei_00044.mp3‘,‘lipei_00047.mp3‘,‘lipei_00049.mp3‘,
             ‘lipei_00057.mp3‘,‘lipei_00058.mp3‘,‘lipei_00059.mp3‘,‘lipei_00061.mp3‘,
             ‘lipei_00066.mp3‘,‘lipei_00068.mp3‘,‘lipei_00070.mp3‘,‘lipei_00081.mp3‘,
             ‘lipei_00087.mp3‘,‘lipei_00104.mp3‘,‘lipei_00106.mp3‘,‘lipei_00117.mp3‘,
             ‘lipei_00123.mp3‘,‘lipei_00129.mp3‘,]

q = Queue(10)

class Music_Cols(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        global img_lists
        global q
        while True:
            try:
                music = img_lists.pop(0)
                q.put(music)
            except IndexError:
                break

class Music_Play(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        global q
        while True:
            if q.not_empty:
                music = q.get()
                print(‘{}正在播放{}‘.format(threading.current_thread(), music))
                time.sleep(5)
                q.task_done()
                print(‘{}播放结束‘.format(music))
            else:
                break

if __name__ == ‘__main__‘:
    mc_thread = Music_Cols(‘music_cols‘)
    mc_thread.setDaemon(True)       # 设置为守护进程,主线程退出时,子进程也kill掉
    mc_thread.start()               # 启动进程
    for i in range(5):              # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
        mp_thread = Music_Play(‘music_play‘)
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                        # 线程阻塞(等待所有子线程处理完成,再退出)

输出:
<Music_Play(music_play, started daemon 1068)>正在播放lipei_00006.mp3
<Music_Play(music_play, started daemon 1072)>正在播放lipei_00007.mp3
<Music_Play(music_play, started daemon 4920)>正在播放lipei_00012.mp3
<Music_Play(music_play, started daemon 3880)>正在播放lipei_00014.mp3
<Music_Play(music_play, started daemon 5400)>正在播放lipei_00021.mp3
lipei_00014.mp3播放结束
... ...
<Music_Play(music_play, started daemon 1068)>正在播放lipei_00117.mp3
lipei_00066.mp3播放结束
<Music_Play(music_play, started daemon 1072)>正在播放lipei_00123.mp3
lipei_00104.mp3播放结束
<Music_Play(music_play, started daemon 4920)>正在播放lipei_00129.mp3
lipei_00123.mp3播放结束
lipei_00117.mp3播放结束
lipei_00087.mp3播放结束
lipei_00106.mp3播放结束
lipei_00129.mp3播放结束

或者(效果与上述一样)

import threading
import time
from queue import Queue

img_lists = [‘lipei_00006.mp3‘,‘lipei_00007.mp3‘,‘lipei_00012.mp3‘,‘lipei_00014.mp3‘,
             ‘lipei_00021.mp3‘,‘lipei_00027.mp3‘,‘lipei_00028.mp3‘,‘lipei_00035.mp3‘,
             ‘lipei_00039.mp3‘,‘lipei_00044.mp3‘,‘lipei_00047.mp3‘,‘lipei_00049.mp3‘,
             ‘lipei_00057.mp3‘,‘lipei_00058.mp3‘,‘lipei_00059.mp3‘,‘lipei_00061.mp3‘,
             ‘lipei_00066.mp3‘,‘lipei_00068.mp3‘,‘lipei_00070.mp3‘,‘lipei_00081.mp3‘,
             ‘lipei_00087.mp3‘,‘lipei_00104.mp3‘,‘lipei_00106.mp3‘,‘lipei_00117.mp3‘,
             ‘lipei_00123.mp3‘,‘lipei_00129.mp3‘,]

q = Queue(10)

class Music_Cols(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            try:
                music = img_lists.pop(0)
                q.put(music)
            except IndexError:
                break

class Music_Play(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            if q.not_empty:
                music = q.get()
                print(‘{}正在播放{}‘.format(threading.current_thread(), music))
                time.sleep(5)
                q.task_done()
                print(‘{}播放结束‘.format(music))
            else:
                break

if __name__ == ‘__main__‘:
    mc_thread = Music_Cols(‘music_cols‘)
    mc_thread.setDaemon(True)       # 设置为守护进程,主线程退出时,子进程也kill掉
    mc_thread.start()               # 启动进程
    for i in range(5):              # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
        mp_thread = Music_Play(‘music_play‘)
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                        # 线程阻塞(等待所有子线程处理完成,再退出)

队列-多线程—图像增强实例


"""
开启多线程:图像增强
"""
import os
import random
import queue
import numpy as np
import cv2
import time
import threading

def Affine_transformation(img_array):
    rows, cols = img_array.shape[:2]
    pointsA = np.float32([[30, 80], [180, 60], [80, 230]])  # 左偏
    pointsB = np.float32([[60, 50], [220, 70], [20, 180]])  # 右偏
    pointsC = np.float32([[70, 60], [180, 50], [50, 200]])  # 前偏
    pointsD = np.float32([[40, 50], [210, 60], [70, 180]])  # 后偏

    points1 = np.float32([[50, 50], [200, 50], [50, 200]])
    points2 = random.choice((pointsA, pointsB, pointsC, pointsD))

    matrix = cv2.getAffineTransform(points1, points2)
    Affine_transfor_img = cv2.warpAffine(img_array, matrix, (cols, rows))
    return Affine_transfor_img

def random_rotate_img(img):
    rows, cols= img.shape[:2]
    angle = random.choice([25, 90, -25, -90, 180])
    Matrix = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
    res = cv2.warpAffine(img, Matrix, (cols, rows), borderMode=cv2.BORDER_CONSTANT)
    return res

def random_hsv_transform(img, hue_vari, sat_vari, val_vari):
    """
    :param img:
    :param hue_vari: 色调变化比例范围(0,360)
    :param sat_vari: 饱和度变化比例范围(0,1)
    :param val_vari: 明度变化比例范围(0,1)
    :return:
    """
    hue_delta = np.random.randint(-hue_vari, hue_vari)
    sat_mult = 1 + np.random.uniform(-sat_vari, sat_vari)
    val_mult = 1 + np.random.uniform(-val_vari, val_vari)

    img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.float)
    img_hsv[:, :, 0] = (img_hsv[:, :, 0] + hue_delta) % 180
    img_hsv[:, :, 1] *= sat_mult
    img_hsv[:, :, 2] *= val_mult
    img_hsv[img_hsv > 255] = 255
    return cv2.cvtColor(np.round(img_hsv).astype(np.uint8), cv2.COLOR_HSV2BGR)

def random_gamma_transform(img, gamma_vari):
    """
    :param img:
    :param gamma_vari:
    :return:
    """
    log_gamma_vari = np.log(gamma_vari)
    alpha = np.random.uniform(-log_gamma_vari, log_gamma_vari)
    gamma = np.exp(alpha)
    gamma_table = [np.power(x / 255.0, gamma) * 255.0 for x in range(256)]
    gamma_table = np.round(np.array(gamma_table)).astype(np.uint8)
    return cv2.LUT(img, gamma_table)

def random_flip_img(img):
    """
    0 = X axis, 1 = Y axis,  -1 = both
    :param img:
    :return:
    """
    flip_val = [0,1,-1]
    random_flip_val = random.choice(flip_val)
    res = cv2.flip(img, random_flip_val)
    return res

def clamp(pv):     #防止像素溢出
    if pv > 255:
        return 255
    if pv < 0:
        return 0
    else:
        return pv

def gaussian_noise(image):   # 加高斯噪声
    """
    :param image:
    :return:
    """
    h, w, c = image.shape
    for row in range(h):
        for col in range(w):
            s = np.random.normal(0, 20, 3)
            b = image[row, col, 0] # blue
            g = image[row, col, 1] # green
            r = image[row, col, 2] # red
            image[row, col, 0] = clamp(b + s[0])
            image[row, col, 1] = clamp(g + s[1])
            image[row, col, 2] = clamp(r + s[2])
    return image

def get_img(input_dir):
    img_path_list = []
    for (root_path,dirname,filenames) in os.walk(input_dir):
        for filename in filenames:
            Suffix_name = [‘.png‘, ‘.jpg‘, ‘.tif‘, ‘.jpeg‘]
            if filename.endswith(tuple(Suffix_name)):
                img_path = root_path+"/"+filename
                img_path_list.append(img_path)
    return  img_path_list

class IMG_QUEUE(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            try:
                img_path = img_path_list.pop(0)
                q.put(img_path)
            except IndexError:
                break

class IMG_AUG(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
        self.q = q

    def run(self):
        while True:
            if q.not_empty:
                img_path = q.get()
                try:
                    print("doing...")
                    img_array = cv2.imread(img_path)
                    Affine_transfor_img = Affine_transformation(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_Affine_transfor.png‘, Affine_transfor_img)

                    res_rotate = random_rotate_img(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_rotate_img.png‘,res_rotate)

                    GAMMA_IMG = random_gamma_transform(img_array, 0.3)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_GAMMA_IMG.png‘,GAMMA_IMG)

                    res_flip = random_flip_img(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_flip_img.png‘,res_flip)

                    G_Noiseimg = gaussian_noise(img_array)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_G_Noise_img.png‘,G_Noiseimg)

                    HSV_IMG = random_hsv_transform(img_array, 2, 0.3, 0.6)
                    cv2.imwrite(output_dir + "/" + img_path[len(input_dir):-4] + ‘_HSV_IMG.png‘,HSV_IMG)
                except:
                    print("图像格式错误!")
                    pass
                q.task_done()
            else:
                break

if __name__ == ‘__main__‘:
    input_dir = ‘./cccc‘
    output_dir = ‘./eeee‘
    start_time = time.time()            # 开始计时
    img_path_list = get_img(input_dir)  # 获取图像数据

    q = queue.Queue(10)                 # 设置队列元素个数
    my_thread = IMG_QUEUE(‘IMG_QUEUE‘)  # 实例化
    my_thread.setDaemon(True)           # 设置为守护进程,主线程退出时,子进程也kill掉
    my_thread.start()                   # 启动进程

    for i in range(5):                  # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
        mp_thread = IMG_AUG(‘IMG_AUG‘)
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                            # 线程阻塞(等待所有子线程处理完成,再退出)
    end_time = time.time()
    print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")

多线程-创建图像缩略图(等比缩放图像)

import os
from PIL import Image
import threading
import time
import queue

def get_img(input_dir):
    img_path_list = []
    for (root_path,dirname,filenames) in os.walk(input_dir):
        for filename in filenames:
            Suffix_name = [‘.png‘, ‘.jpg‘, ‘.tif‘, ‘.jpeg‘]
            if filename.endswith(tuple(Suffix_name)):
                img_path = root_path+"/"+filename
                img_path_list.append(img_path)
    return  img_path_list

class IMG_QUEUE(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            try:
                img_path = img_path_list.pop(0)
                q.put(img_path)
            except IndexError:
                break

class IMG_RESIZE(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        while True:
            if q.not_empty:
                img_path = q.get()
                try:
                    im = Image.open(img_path)
                    im.thumbnail((size, size))
                    print(im.format, im.size, im.mode)
                    im.save(img_path, ‘JPEG‘)
                except:
                    print("图像格式错误!")
                    pass
                q.task_done()
            else:
                break

if __name__==‘__main__‘:
    input_dir = ‘D:\\20190112_20190114_all‘ #需要创建缩略图,图片的目录
    start_time = time.time()            # 开始计时
    img_path_list = get_img(input_dir)  # 获取图像数据

    size = 800
    q = queue.Queue(100)                # 设置队列元素个数
    my_thread = IMG_QUEUE(‘IMG_QUEUE‘)  # 实例化
    my_thread.setDaemon(True)           # 设置为守护进程,主线程退出时,子进程也kill掉
    my_thread.start()                   # 启动进程

    for i in range(5):                  # 设置线程个数(批量任务时,线程数不必太大,注意内存及CPU负载)
        mp_thread = IMG_RESIZE(str(i))
        mp_thread.setDaemon(True)
        mp_thread.start()
    q.join()                            # 线程阻塞(等待所有子线程处理完成,再退出)
    end_time = time.time()              # 计时结束
    print("Total Spend time:", str((end_time - start_time) / 60)[0:6] + "分钟")

原文地址:http://blog.51cto.com/13984132/2345569

时间: 2024-11-25 15:51:25

python 多线程与队列的相关文章

Python 多线程同步队列模型

Python 多线程同步队列模型 我面临的问题是有个非常慢的处理逻辑(比如分词.句法),有大量的语料,想用多线程来处理. 这一个过程可以抽象成一个叫"同步队列"的模型. 具体来讲,有一个生产者(Dispatcher)一方面从语料中读入句子,并且存入队列中,一方面看有没有空闲的消费者(Segmentor),如果有,就把句子从队列中弹出并交给这个空闲的消费者处理. 然后消费者把处理完成的结果交给生产者输出,生产者要保证输出与输入顺序一致. 消费者是典型的threading,它需要看见生成者

python多线程--优先级队列(Queue)

Python的Queue模块中提供了同步的.线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue.这些队列都实现了锁原语,能够在多线程中直接使用.可以使用队列来实现线程间的同步. Queue模块中的常用方法: Queue.qsize() 返回队列的大小 Queue.empty() 如果队列为空,返回True,反之False Queue.full() 如果队列满了,返回True,反之False Queue.fu

利用python多线程和队列管理shell程序

首先来描述下环境,在机器上有很多个JAVA程序,我们在每个JAVA程序里都配置了一个启动|停止|重启的脚本 举个例子: 我们现在要同时运行这些脚本,来达到快速启动所有的JAVA程序,如果我们只用多线程的话,线程是不会返回消息给父进程,我们如何才能知道这些程序是启动成功了呢? 所以我们用到了队列来管理. """我试过gevent,但是会在command这里造成阻塞""" gevent代码如下  如果有朋友知道如何优化,请您告诉我 #!/usr/bi

python多线程多队列(BeautifulSoup网络爬虫)

程序大概内容如下: 程序中设置两个队列分别为queue负责存放网址,out_queue负责存放网页的源代码. ThreadUrl线程负责将队列queue中网址的源代码urlopen,存放到out_queue队列中. DatamineThread线程负责使用BeautifulSoup模块从out_queue网页的源代码中提取出想要的内容并输出. 这只是一个基本的框架,可以根据需求继续扩展. 程序中有很详细的注释,如有有问题跪求指正啊. import Queue import threading i

Python多线程(threading)学习总结

注:此文除了例子和使用心得是自己写的,很多都是Python核心编程中的原文.原文文风应该能看出来,就不每个地方单独表明出处了. 线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中,共享相同的运行环境.它们可以想像成是在主进程或"主线程"中并行运行的"迷你进程". 线程有开始,顺序执行和结束三部分.它有一个自己的指令指针,记录自己运行到什么地方.线程的运行可能被抢占(中断),或暂时的被挂起(也叫睡眠),让其它的线程运行,这叫做让步.一个

python 多线程和C++多线程的区别

看到论坛上有人问python多线程和C++多线程的区别? 暖神是这样回答的: Python有Global Interpreter Lock,所以嘛……你懂的.C++11开始才有多线程,使用共享内存方式的线程间通信,有低级的atomic operation和memory order,以及高级的lock, condition的,却没有提供消息队列. 然后,就去找GIL(Global Interpreter Lock)的定义,下面有一个定义,wiki的定义是这样的.在stackoverflow上看到了

Python多线程目录扫描器

Python多线程目录扫描器,代码很简单,不过也花了我很多时间. 遇到文件如下: 多线程执行问题 队列执行问题 编码问题 目录问题 自己逻辑问题 报告长官,总结完毕,以下是成果,请长官查收: # coding:utf-8 import requests import threading import Queue import sys url_list = Queue.Queue() mutex = threading.Lock() def path(url): with open("./path

Python 多线程教程:并发与并行

Python 多线程教程:并发与并行 在批评Python的讨论中,常常说起Python多线程是多么的难用.还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行.因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行.必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情.如果你还没看过的话,我建议你看看

PHP 高级编程之多线程-消息队列

Home  |  Mirror  |  Search  |  杂文  |  ITEYE 博客  |  OSChina 博客  |  51CTO 博客  |  Linkedin PHP 高级编程之多线程 http://netkiller.github.io/journal/thread.php.html Mr. Neo Chen (netkiller), 陈景峰(BG7NYT) 中国广东省深圳市龙华新区民治街道溪山美地 518131 +86 13113668890 +86 755 29812080