python 多线程并发threading & 任务队列Queue

https://docs.python.org/3.7/library/concurrency.html
python程序默认是单线程的,也就是说在前一句语句执行完之前后面的语句不能继续执行
先感受一下线程,一般情况下:

def testa():
    sleep(1)
    print "a"

def testb():
    sleep(1)
    print "b"

testa()
testb()#先隔出一秒打印出a,再过一秒打出b

但是如果用了threading的话:

ta = threading.Thread(target=testa)
tb = threading.Thread(target=testb)
for t in [ta,tb]:
    t.start()
for t in [ta,tb]:
    t.join()
print "DONE"

#输出是ab或者ba(紧贴着的)然后空一行再来DONE的结果。

得到这样的结果是因为这样的,在start之后,ta首先开始跑,但是主线程(脚本本身)没有等其完成就继续开始下一轮循环,然后tb也开始了,在之后的一段时间里,ta和tb两条线程(分别代表了testa和testb这两个过程)共同执行。相对于一个个迭代而言,这样做无疑是大大提高了运行的速度。

  Thread类为线程的抽象类,其构造方法的参数target指向一个函数对象,即该线程的具体操作。此外还可以有args=<tuple>来给target函数传参数。需要注意的是当传任何一个序列进去的话Thread会自动把它分解成单个单个的元素然后分解传给target函数。我估计在定义的时候肯定是*args了。

  join方法是个很tricky的东西,至今还不是很清楚地懂这是个什么玩意儿。join([timeout])方法阻塞了主线程,直到调用此方法的子线程完成之后主线程才继续往下运行。(之前我糊里糊涂地把join就紧紧接在start后面写了,如果这么写了的话那么多线程在速度上就毫无优势,和单线程一样了= =)。而像上面这个示例一样,先一个遍历把所有线程 都启动起来,再用一个遍历把所有线程都join一遍似乎是比较通行的做法。

关于线程锁

  多线程程序涉及到一个问题,那就是当不同线程要对同一个资源进行修改或利用时会出现混乱,所以有必要引入线程锁。

  可以通过Thread.Lock类来创建简单的线程锁。lock = threading.Lock()即可。在某线程start之前,让lock.acquire(),且lock在acquire()之后不能再acquire,否则会报错。当线程结束后调用lock.release()来释放锁就好了。一般而言,有锁的多线程场景可以提升一部分效率,但在写文件等时机下会有阻塞等待的情况。相比之下,无所多线程场景可以进一步提升效率,但是可能会引起读写冲突等问题,所以要慎用。一定要确认各个线程间没有共同的资源之类的问题后再实行无锁多线程。

  ●  以上的包装线程的方式是一种面向过程的方法,下面介绍一下如何面向对象地来抽象线程

  面向对象地抽象线程需要自定义一个类继承Thread类。比如自定义class MyThread(Thread)。这个类的一个实例就是代表了一个线程,然后通过重载这个类中的run方法(是run,不是start!!但start的动作确实就是调用run)来执行具体的操作。此时锁可以作为一个构造方法的参数,将一个锁传进不同的实例中以实现线程锁控制。比如:

#方法二:从Thread继承,并重写run()
class MyThread(threading.Thread):
    def __init__(self,arg):
        super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。
        self.arg=arg
    def run(self):#定义每个线程要运行的函数
        time.sleep(1)
        print ‘the arg is:%s\r‘ % self.arg

for i in xrange(4):
    t =MyThread(i)
    t.start()

print ‘main thread end!‘

Thread类还有以下的一些方法,自定义的类也可以调用

    getName()

    setName(...)  //其实Thread类在构造方法中有一个name参数,可以为相应的线程取一个名字。这两个方法就是相关这个名字属性的

    isAlive()  一个线程从start()开始到run()结束的过程中没有异常,则其实alive的。

    setDaemon(True/False)  是否设置一个线程为守护线程。当你设置一个线程为守护线程之后,程序不会等待这个线程结束再退出程序,可参考http://blog.csdn.net/u012063703/article/details/51601579

  ●  除了Thread类,threading中还有以下一些属性,简单介绍一下:

    Timer类,Timer(int,target=func)  和Thread类类似,只不过它在int秒过后才以target指定的函数开始线程运行

    currentThread()  获得当前线程对象

    activeCount()  获得当前活动的线程总个数

    enumerate()  获得所有活动线程的列表

    settrace(func)  设置一跟踪函数,在run执行前执行

    setprofile(func)  设置一跟踪函数,在run执行完毕之后执行

Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。

  首先,队列有很多种,根据进出顺序来分类,可以分成

    Queue.Queue(maxsize)  FIFO(先进先出队列)

    Queue.LifoQueue(maxsize)  LIFO(先进后出队列)

    Queue.PriorityQueue(maxsize)  为优先度越低的越先出来

    如果设置的maxsize小于1,则表示队列的长度无限长

  FIFO是常用的队列,其一些常用的方法有:

    Queue.qsize()  返回队列大小

    Queue.empty()  判断队列是否为空

    Queue.full()  判断队列是否满了

    Queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。

    Queue.put(...[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。

    Queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成

    Queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

  结合threading和Queue可以构建出一个简单的生产者-消费者模型,比如:

import threading
    import Queue
    import time
    class worker(threading.Thread):
     def __init__(self,queue):
      threading.Thread.__init__(self)
      self.queue=queue
      self.thread_stop=False  

     def run(self):
      while not self.thread_stop:
       print("thread%d %s: waiting for tast" %(self.ident,self.name))
       try:
        task=q.get(block=True, timeout=20)#接收消息
       except Queue.Empty:
        print("Nothing to do!i will go home!")
        self.thread_stop=True
        break
       print("task recv:%s ,task No:%d" % (task[0],task[1]))
       print("i am working")
       time.sleep(3)
       print("work finished!")
       q.task_done()#完成一个任务
       res=q.qsize()#判断消息队列大小
       if res>0:
        print("fuck!There are still %d tasks to do" % (res))  

     def stop(self):
      self.thread_stop = True  

    if __name__ == "__main__":
     q=Queue.Queue(3)
     worker=worker(q)
     worker.start()
     q.put(["produce one cup!",1], block=True, timeout=None)#产生任务消息
     q.put(["produce one desk!",2], block=True, timeout=None)
     q.put(["produce one apple!",3], block=True, timeout=None)
     q.put(["produce one banana!",4], block=True, timeout=None)
     q.put(["produce one bag!",5], block=True, timeout=None)
     print("***************leader:wait for finish!")
     q.join()#等待所有任务完成
     print("***************leader:all task finished!")  

输出是这样的

thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one cup! ,task No:1
    i am working
    work finished!
    fuck!There are still 3 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one desk! ,task No:2
    i am workingleader:wait for finish!
    work finished!
    fuck!There are still 3 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one apple! ,task No:3
    i am working
    work finished!
    fuck!There are still 2 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one banana! ,task No:4
    i am working
    work finished!
    fuck!There are still 1 tasks to do
    thread139958685849344 Thread-1: waiting for tast 1
    task recv:produce one bag! ,task No:5
    i am working
    work finished!
    thread139958685849344 Thread-1: waiting for tast 1
     ***************leader:all task finished!
    Nothing to do!i will go home!

上例中并没有性能的提升(毕竟还是只有一个线程在跑)。线程队列的意义并不是进一步提高运行效率,而是使线程的并发更加有组织。可以看到,在增加了线程队列之后,程序对于线程的并发数量就有了控制。新线程想要加入队列开始执行,必须等一个既存的线程完成之后才可以。举个例子,比如

for i in range(x):
  t = MyThread(queue)
  t.start()

x在这里是个变量,我们不知道这个循环会触发多少线程并发,如果多的话就会很冒险。但是有了队列之后,把一个队列作为所有线程构建线程对象时的一个参数,让线程必须按照这个队列规定的大小来执行的话,就不担心过多线程带来的危险了。

时间: 2024-10-28 20:29:28

python 多线程并发threading & 任务队列Queue的相关文章

Python多线程(threading)学习总结

注:此文除了例子和使用心得是自己写的,很多都是Python核心编程中的原文.原文文风应该能看出来,就不每个地方单独表明出处了. 线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中,共享相同的运行环境.它们可以想像成是在主进程或"主线程"中并行运行的"迷你进程". 线程有开始,顺序执行和结束三部分.它有一个自己的指令指针,记录自己运行到什么地方.线程的运行可能被抢占(中断),或暂时的被挂起(也叫睡眠),让其它的线程运行,这叫做让步.一个

Python 多线程 -thread threading Queue- 简单学习

在实际工作过程中,会出现需要并发的做一些事情,例如一台机器测到几千台机器的网络连通性,如果你单线程一台一台测的话,会花费很多的事情,不具有实时性,更不能在变化的时候立刻感知当时网络的状况,这时多线程就是一个很好地选择.python已经给我们封装好了多线程库thread和threading. thread:比较底层的模块 threading:Higher-level threading interface ps:建议使用threading模块 - 高级别的threading模块更为先进,对线程的支

python多线程并发

单线程执行 python的内置模块提供了两个内置模块:thread和threading,thread是源生模块,threading是扩展模块,在thread的基础上进行了封装及改进.所以只需要使用threading这个模块就能完成并发的测试 实例 创建并启动一个单线程 import threading def myTestFunc(): print("我是一个函数") t = threading.Thread(target=myTestFunc) # 创建一个线程 t.start()

Python多线程(3)——Queue模块

Queue模块支持先进先出(FIFO)队列,支持多线程的访问,包括一个主要的类型(Queue)和两个异常类(exception classes). Python 2 中的Queue模块在Python 3中更名为 queue. Queue对象的创建 可以通过实例化Queue类型获得队列对象: q = Queue.Queue(maxsize=0) 创建新的队列,参数 maxsize 的含义是: 如果 maxsize > 0:当 q 中的元素达到 maxsize 个时,队列就满了,此时再有一个线程希望

python多线程与threading模块

python多线程与_thread模块 中介绍了线程的基本概念以及_thread模块的简单示例.然而,_thread模块过于简单,使得我们无法用它来准确地控制线程,本文介绍threading模块,它提供了更强大的多线程管理方案. threading模块的对象 Thread 表示一个执行线程的对象 Lock 锁原语 RLock 可重入锁对象,使单一线程可以再次获得已持有的锁(递归锁) Condition 条件变量对象,使得一个线程等待另一个线程满足特定条件 Event 条件变量的通用版本,任意数量

python多线程--优先级队列(Queue)

Python的Queue模块中提供了同步的.线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue.这些队列都实现了锁原语,能够在多线程中直接使用.可以使用队列来实现线程间的同步. Queue模块中的常用方法: Queue.qsize() 返回队列的大小 Queue.empty() 如果队列为空,返回True,反之False Queue.full() 如果队列满了,返回True,反之False Queue.fu

Python多线程的threading Event

Python threading模块提供Event对象用于线程间通信.它提供了一组.拆除.等待用于线程间通信的其他方法. event它是沟通中最简单的一个过程之中,一个线程产生一个信号,号.Python 通过threading.Event()产生一个event对象.event对象维护一个内部标志(标志初始值为False),通过set()将其置为True.wait(timeout)则用于堵塞线程直至Flag被set(或者超时,可选的),isSet()用于查询标志位是否为True,Clear()则用

python多线程之 threading模块详解

1.threading.Thread对象[创建线程的主要对象]: 方法:start():启动线程   run():启动线程后自动调用的方法 join([timeout]):等待到被调用的线程终止   is_alive():返回线程活动状态 属性:name:线程名   ident:线程ID号   daemon:后台标志 2.创建线程的方法: 1:实例化threading.Thread对象 ths = threading.Thread( target = None, name = None, arg

Python多线程,threading的用法

虫师的文章: 需要注意的是: threads = [ ] t1 = threading.Thread(target=music,args=(u'爱情买卖',)) threads.append(t1) t2 = threading.Thread(target=move,args=(u'阿凡达',)) threads.append(t2) 上面的 args = 的是元组而不是简单的括号... 链接为: www.cnblogs.com/fnng/p/3670789.html www.cnblogs.c