线程、进程、协程和队列

一.线程、进程

1.简述

  • 进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,通俗讲就是自定义一段程序的执行过程,即一个正在运行的程序。线程是进程的基本单位,又称为轻量级进程。 * 不同的进程在内存中会开辟独立的地址空间,默认进程之间的数据是不共享,线程是由进程创建,所以处在同一个进程中的所有线程都可以访问该进程所包含的地址空间,当然也包含存储在该空间中的所有资源。
  • 应用场景:

    IO密集型操作由于不占用CPU资源,所以一般使用线程来完成
    计算密集型操作靠cpu,所以一般使用进程来完成

  • 为什么使用多线程或多进程?
    多线程和多进程可以提供程序的并发处理能力。看下面需求:
    现在有10台主机,现在需要监控主机的存过状态,默认使用单线程,如下:
import time

st = time.time()           #程序开始时间
def f1(arg):
    time.sleep(2)            #假设ping一次需要2s
    print("ping %s主机中..." % arg)
host_List = [0,1,2,3,4,5,6,7,8,9,]   #假设列表中1233。。表示10个主机
for i in host_List:
    f1(i)

cost_time = time.time() - st
print(‘程序耗时:%s‘ % cost_time)

程序运行结果:

ping 0主机中...
ping 1主机中...
ping 2主机中...
ping 3主机中...
ping 4主机中...
ping 5主机中...
ping 6主机中...
ping 7主机中...
ping 8主机中...
ping 9主机中...
程序耗时:20.002294063568115

发现耗时20s,这仅仅是10台机器,如果100台呢,效率会非常低。假如用了多线程呢?

import threading
import time

st = time.time()           #程序开始时间
def f1(arg):
    time.sleep(2)            #假设ping一次需要2s
    print("ping %s主机中..." % arg)
host_List = [0,1,2,3,4,5,6,7,8,9,]   #假设列表中1233。。表示10个主机
for i in host_List:
    t = threading.Thread(target=f1, args=(i,))
    t.start()
t.join()
cost_time = time.time() - st
print(‘程序耗时:%s‘ % cost_time)
运行结果:
ping 0主机中...
ping 1主机中...
ping 5主机中...
ping 4主机中...
ping 2主机中...
ping 3主机中...
ping 7主机中...
ping 6主机中...
ping 8主机中...
ping 9主机中...
程序耗时:2.002915382385254

从结果中看出,10个机器启用10个线程并发去独立ping,这样耗时仅仅是单线程的耗时,效率大大提供。所以多进程多线程一般用来提高并发

2.线程进程的基本操作

创建

  • 线程

    • 创建方法
    import threading
    import time
    
    def f1(args):
      time.sleep(2)
      print(args)
    
    #方式1 直接使用thread模块进行创建
    for i in range(10):
      t = threading.Thread(target=f1,args=(123,))   #target是要执行的任务(函数),args是任务(函数)的参数
      t.start()
    
    #方式2 使用自定义类创建
    
    class Mythread(threading.Thread):
      def __init__(self,func,args):
          self.func = func
          self.args = args
          super(Mythread,self).__init__()
      def run(self):
          self.func(self.args)
    
    obj = Mythread(f1,123)
    obj.start()

    上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令

    • 线程的其他方法

      • start 线程准备就绪
      • setName 为线程设置名称
      • getName 获取线程名称
      • setDaemon 设置为后台线程或前台线程(默认),注意需要设置在start前 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
      • join 放在for循环内表示逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义,放在for循环外,会阻塞主进程,这样主进程会等待线程执行完之后,再去继续执行下面的代码
      • run 线程被cpu调度后自动执行线程对象的run方法,这也是线程第二种创建方法的原理
    • 方法具体使用
      • 使用setDaemon

        默认不使用setDaemon 情况

      import threading
      import time
      
      def f1(args):
          time.sleep(2)
          print(args)
      
      print(‘-----start------‘)
      for i in range(10):
          t = threading.Thread(target=f1,args=(123,))
          t.setDaemon(False)        #默认为false 主线程等待子线程执行完之后再退出程序
          t.start()
      
      print(‘-------end------‘)
      执行效果:
      
      -----start------
      -------end------        end主线程执行完毕,等待子线程执行
      123     子线程执行结果
      123
      123
      123
      123
      123
      123
      123
      123
      123

      前台进程

      使用setDaemon 情况:

      import threading
      import time
      #方式1
      
      def f1(args):
          time.sleep(2)
          print(args)
      
      print(‘-----start------‘)
      for i in range(10):
          t = threading.Thread(target=f1,args=(123,))
          t.setDaemon(True)        #设置为True, 主线程不管子线程是否执行完成,直接退出程序
          t.start()
      
      print(‘-------end------‘)
      执行结果:
      
      -----start------
      -------end------  主线程不等待子线程,直接执行完之后退出

      后台进程

      • 使用join

        默认不使用join的话,子线程会并发执行

      import threading
      import time
      #方式1
      
      def f1(args):
          time.sleep(2)
          print("子线程:",args)
      
      print(‘-----start------‘)
      for i in range(10):
          t = threading.Thread(target=f1,args=(i,))
          t.start()
      
      print(‘-------end------‘)
      
      执行效果:
      
      -----start------
      -------end------
      子线程: 0
      子线程: 8
      子线程: 3
      子线程: 4
      子线程: 7
      子线程: 1
      子线程: 2
      子线程: 5
      子线程: 9
      子线程: 6

      不使用join

      阻塞子线程

      import threading
      import time
      
      def f1(args):
          time.sleep(2)
          print("子线程:",args)
      
      print(‘-----start------‘)
      for i in range(10):
          t = threading.Thread(target=f1,args=(i,))
          t.start()
          t.join()        #join会阻塞剩下的所有线程
          print(time.time())    #每个线程执行时的时间
      
      print(‘-------end------‘)
      
      执行效果:发现线程是单个依次执行
      
      -----start------
      子线程: 0
      1468933921.031646
      子线程: 1
      1468933923.037936
      子线程: 2
      1468933925.040694
      子线程: 3
      1468933927.044142
      子线程: 4
      1468933929.047844
      子线程: 5
      1468933931.049584
      子线程: 6
      1468933933.053723
      子线程: 7
      1468933935.056232
      子线程: 8
      1468933937.061142
      子线程: 9
      1468933939.065674
      -------end------

      在for循环内

      阻塞主线程

      import threading
      import time
      
      def f1(args):
          time.sleep(2)
          print("子线程:",args)
      
      print(‘-----start------‘)
      for i in range(10):
          t = threading.Thread(target=f1,args=(i,))
          t.start()
      
      t.join()  # join会阻塞剩下的所有线程
      print(‘-------end------‘)
      执行效果:
      
      -----start------
      子线程: 1
      子线程: 0
      子线程: 4
      子线程: 5
      子线程: 6
      子线程: 2
      子线程: 3
      子线程: 7
      子线程: 9
      -------end------
      子线程: 8    #到最后一个线程之后,不再阻塞主线程,而子线程运行需要2s,所以会先打印主线程end,在打印子线程

      在for循环外

  • 进程

    • 创建方法

    进程的创建方法和线程类似

    import multiprocessing
    import time
    
    def f1(args):
        time.sleep(2)
        print("进程:",args)
    
    #方法1
    
    for i in range(10):
        t = multiprocessing.Process(target=f1,args=(i,))
        t.start()
    
    #方法2:
    
    class Myprocess(multiprocessing.Process):
        def __init__(self,func,args):
            self.func = func
            self.args = args
            super(Myprocess,self).__init__()
        def run(self):
            self.func(self.args)
    • 其它方法 进程同样支持 join(),setDaemon(),run(),setName(),getName()等方法,和线程的使用一样,不再赘述
    • 方法使用

      参考线程使用

线程锁(Lock、RLock)

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

  • 未使用线程锁:
import threading
import time

NUM = 10

def f1(arg):
    global NUM

    NUM -= 1            #让每个线程执行时,将NUM的值减去1
    time.sleep(2)
    print(NUM)

for i in range(10):
    t = threading.Thread(target=f1,args=(123,))
    t.start()

执行效果:发现每个线程同时都在操作NUM,最后打印的结果都是0

0
0
0
0
0
0
0
0
0
0
  • 使用线程锁

使用线程锁,当一个线程开始处理事务A时,先在事务A上把锁,然后开始
处理事务A,处理完程之后,再解锁。其他进程遇到线程锁,则处于等待中
直到有线程解锁了该事务

import threading
import time

NUM = 10

def f1(arg):
    global NUM
    arg.acquire()   #阻塞后面的线程

    NUM -= 1
    time.sleep(2)
    print(NUM)
    arg.release()       #放开后面的线程

lock = threading.Lock()
for i in range(10):
    t = threading.Thread(target=f1,args=(lock,))
    t.start()

执行效果:发现线程是逐步操作NUM的

9
8
7
6
5
4
3
2
1
0

此有别于join()方法,join是在线程从开始执行的时候,按照单线程依次执行,也就意味着所有的任务都是单线程执行,而线程锁是针对执行的任务进行上锁,解锁

  • Rlock和lock的区别

Rlock支持递归上锁,解锁,lock只支持单个上锁解锁

import threading
import time

NUM = 10

def f1(arg,lock):
    global NUM
    print(‘线程:‘,arg,‘执行1‘)
    lock.acquire()   #阻塞后面的线程
    NUM -= 1

    lock.acquire()    #继续上锁
    time.sleep(2)       #sleep 2秒
    print(‘线程:‘,arg,time.time())      #打印当前时间戳
    lock.release()          #解锁
    print(‘线程执行结果:‘,arg,NUM)

    lock.release()       #放开后面的线程
    print(123)

lock = threading.RLock()
for i in range(10):
    t = threading.Thread(target=f1,args=(i,lock,))
    t.start()

执行结果:

#print(‘线程:‘,arg,‘执行1‘)操作没有加锁,所以所有线程运行至此
线程: 0 执行1
线程: 1 执行1
线程: 2 执行1
线程: 3 执行1
线程: 4 执行1
线程: 5 执行1
线程: 6 执行1
线程: 7 执行1
线程: 8 执行1
线程: 9 执行1
#print(‘线程:‘,arg,time.time()) 和    print(‘线程执行结果:‘,arg,NUM) 在锁中,所以所有线程会先依次执行
线程: 0 1468937172.877005
线程执行结果: 0 9
123
线程: 1 1468937174.878281
线程执行结果: 1 8
123
线程: 2 1468937176.883292
线程执行结果: 2 7
123
线程: 3 1468937178.886487
线程执行结果: 3 6
123
线程: 4 1468937180.888464
线程执行结果: 4 5
123
线程: 5 1468937182.892491
线程执行结果: 5 4
123
线程: 6 1468937184.893167
线程执行结果: 6 3
123
线程: 7 1468937186.894769
线程执行结果: 7 2
123
线程: 8 1468937188.89809
线程执行结果: 8 1
123
线程: 9 1468937190.899442
线程执行结果: 9 0
123

信号量(Semaphore)

线程锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading
import time
def f1(arg,lock):
    lock.acquire()
    print(‘线程:‘,arg)
    print(time.time())
    lock.release()
lock = threading.BoundedSemaphore(5)    #5表示最多同时运行5个线程

for i in range(10):
    t = threading.Thread(target=f1,args=(i,lock,))
    t.start()

执行结果:发现每5个线程的执行时间戳是一样的。小数点后微秒可忽略

线程: 0
1468937937.129999
线程: 1
1468937937.130123
线程: 2
1468937937.130278
线程: 3
1468937937.13044
线程: 4
1468937937.130581
线程: 5
1468937937.130719
线程: 6
1468937937.13086
线程: 7
1468937937.130994
线程: 8
1468937937.131154
线程: 9
1468937937.131289

事件(event)

python线程的事件用于主线程控制其他线程的执行,一个线程发送/传递事件,另外的线程等待事件的触发事件。主要提供了三个方法 set、wait、clear

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么阻塞线程执行,如果“Flag”值为True,那么线程继续执行。
wait()方法:当事件标志为False时将阻塞线程,当事件标志为True时,什么也不做
set()方法:它设置事件标志为True,并且唤醒其他线程。条件锁对象保护程序修改事件标志状态的关键部分
clear()方法正好相反,它设置时间标志为False

import threading
import time

def f1(arg,e):
    print(‘线程:‘,arg)
    e.wait()            #阻塞线程
    print(‘线程继续执行:‘,arg)

event = threading.Event()

for i in range(3):
    t = threading.Thread(target=f1,args=(i,event))
    t.start()

event.clear()

res = input(‘>>‘)
if res == ‘1‘:
    event.set()   #放开 线程

执行结果:

线程: 0
线程: 1
线程: 2
>>1     #输入1,触发线程继续执行的信号
线程继续执行: 0
线程继续执行: 1
线程继续执行: 2

条件(Condition)

使得线程等待,条件是针对单个线程的,条件成立,则不再阻塞线程,
条件不成立,一直阻塞

import threading
import time
def condition_func():

    ret = False
    inp = input(‘>>>‘)
    if inp == ‘1‘:
        ret = True

    return ret

def run(n):
    con.acquire()
    con.wait_for(condition_func)   #当condition_func返回值为真时,触发线程继续运行
    print("run the thread: %s" %n)
    con.release()

if __name__ == ‘__main__‘:

    con = threading.Condition()
    for i in range(3):
        t = threading.Thread(target=run, args=(i,))
        t.start()

运行效果:

>>>1    #每次手动输入一个1,触发一个线程运行
run the thread: 0
>>>1
run the thread: 1
>>>1
run the thread: 2

Timer

定时器,指定n秒后执行某操作

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

进程和线程一样,同样支持进程锁、信号量、事件、条件、timer用法一摸一样,可参考线程使用方法,不在赘述

3.进程之间数据共享

由于不同的进程会有各自的内存地址空间,所以进程之间的数据默认是不能共享的

  • 运行结果
from multiprocessing import Process
import time

li = []

def foo(i):
    li.append(i)
    print(‘say hi‘,li)

for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

time.sleep(1)
print(‘ending‘,li)

运行结果:发现每个子进程都独立操作li列表

say hi [0]
say hi [1]
say hi [2]
say hi [3]
say hi [4]
say hi [5]
say hi [6]
say hi [7]
say hi [8]
say hi [9]
ending []
  • 实现进程之间数据共享

    • 方法1:

    from multiprocessing import Process
    from multiprocessing import Array
    import time
    
    li = Array(‘i‘,10)
    
    #i 的类型
    ‘‘‘
    ‘c‘: ctypes.c_char, ‘u‘: ctypes.c_wchar,
    ‘b‘: ctypes.c_byte, ‘B‘: ctypes.c_ubyte,
    ‘h‘: ctypes.c_short, ‘H‘: ctypes.c_ushort,
    ‘i‘: ctypes.c_int, ‘I‘: ctypes.c_uint,
    ‘l‘: ctypes.c_long, ‘L‘: ctypes.c_ulong,
    ‘f‘: ctypes.c_float, ‘d‘: ctypes.c_double
    ‘‘‘
    def foo(i,q):
        q[i] = i + 100
        print(‘say hi‘,i)
    
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()
    
    time.sleep(4)
    for i in li:
        print(i)

    Array方法

    • 方法2

    from multiprocessing import Process
    from multiprocessing import Manager
    import multiprocessing
    import time
    
    obj = Manager()
    li = obj.dict()
    
    def foo(i,q):
        q[i] = i + 100
        print(‘say hi‘,i)
    
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()
    
    time.sleep(2)
    print(li)

    Dict方法

    • 方法3:

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
import time

li = queues.Queue(ctx=multiprocessing)

def foo(i,q):
    q.put(i)
    print(‘say hi‘,i)

for i in range(10):
    p = Process(target=foo,args=(i,li,))
    p.start()
    p.join()

time.sleep(4)
print(li.qsize())

queue方法

4.python内部队列Queue

队列(queue)是一种具有先进先出特征的线性数据结构,元素的增加只能在一端进行,元素的删除只能在另一端进行。能够增加元素的队列一端称为队尾,可以删除元素的队列一端则称为队首。python内部支持一套轻量级queue队列

  • queue队列的方法:

    • Queue(maxsize=0) 先进先出队列,maxsize表示队列元素数量,0表示无限
    • LifoQueue(maxsize=0) 后进先出队列
    • PriorityQueue(maxsize=0) 优先级队列,优先级值越小,优先级越高
    • deque(maxsize=0) 双向队列
    • empty() 判断队列是否为空,为空时返回True,否则为False
    • full() 判断队列是否已满,满时返回True,否则为False
    • put(item,[block[,timeout]] 在队尾插入一个项目。参数item为必需的,为插入项目的值;第二个block为可选参数,默认为True,表示当前队列满时,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为False,put方法将引发Full异常
    • get() 从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
    • qsize() 返回队列长度
    • clear() 清空队列
    • join() 等到队列为空(即队列中所有的项都被取走,处理完毕),再执行别的操作
    • task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号. 注意:在多线程下,注意task_done的位置,每次执行task_done(),unfinished_tasks就减1,应该在一切搞定后,再执行task_done.

队列支持下面的四种情况:

  • 先进先出队列
import queue

q = queue.Queue(4)  #创建队列  容量为4

q.put(123)      #往队列中插值
q.put(431)

print(q.maxsize)    #队列容量
print(q.qsize())    #队列目前元素的容量

print(q.get())    #队列取值
print(q.get())

执行效果:

4
2
123
431

先进先出原则第一次存放的是123,第二次存放的是431,那么我们在获取值得时候,第一次获取的就是123,第二次就是431
如果队列满之后,再put 或者队列为空时,再get,进程就就挂在哪里,put会等待,直到队列中有空间之后才能put成功,get会等待,直到队列中有元素之后,才能获取到值,如果不需要等待,可以通过设置block=False来抛出异常,使用try捕捉异常

import queue

q = queue.Queue(5)

for i in range(8):
    try:
        q.put(i,block=False)
        print(i,‘已提交队列‘)
    except:
        print(‘队列已满‘)

for i in range(8):
    try:
        res = q.get(block=False)
        print(‘从队列取出:‘,res)
    except:
        print(‘队列已空‘)

效果:

0 已提交队列
1 已提交队列
2 已提交队列
3 已提交队列
4 已提交队列
队列已满
队列已满
队列已满
从队列取出: 0
从队列取出: 1
从队列取出: 2
从队列取出: 3
从队列取出: 4
队列已空
队列已空
队列已空

  • 后进先出
import queue
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())
print(q.get())
输出结果:

456
123
  • 根据优先级处理
import queue
q = queue.PriorityQueue()   #根据优先级处理
q.put((2,"alex1"))
q.put((1,"alex2"))
q.put((3,"alex3"))
print(q.get())
print(q.get())
print(q.get())
输出结果:

(1, ‘alex2‘)
(2, ‘alex1‘)
(3, ‘alex3‘)
  • 双向队列
q = queue.deque()          #双向队列
q.append((123))         #右边进
q.append(234)
print(q.pop())          #右边出,后进先出
print(q.pop())

q.appendleft(555)       #左边进
q.appendleft(666)
print(q.popleft())         #左边出,后进先出
print(q.popleft())
输出效果:

234
123
666
555

5.支持并发的两种模型

生产者消费者模型

生产者消费者模型是通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度

  • 为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

  • 什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的

  • 实现:
import queue
import threading
import time

q = queue.Queue(30)   #创建一个队列,用户生产者和消费者通讯

#模拟订单创建处理

def product(arg):        #生产者  创建订单
    while True:
        q.put("订单" + str(arg))
        print(arg,"创建订单")

def cost(arg):          #消费者,处理订单
    while True:
        print(arg , "处理:" ,q.get())
        time.sleep(2)    #sleep 2秒表示 消费者处理需要2s

#创建生产者线程
for i in range(3):
    t = threading.Thread(target=product,args=(i,))
    t.start()

#创建消费者线程
for c in range(10):
    t = threading.Thread(target=cost,args=(c,))
    t.start()

效果:

0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
0 创建订单
1 创建订单
0 处理: 订单0
1 创建订单
1 处理: 订单0
0 创建订单
2 处理: 订单0
2 创建订单
3 处理: 订单0
1 创建订单
4 处理: 订单0
0 创建订单
5 处理: 订单0
2 创建订单
6 处理: 订单0
1 创建订单
7 处理: 订单0
0 创建订单
8 处理: 订单0
2 创建订单
9 处理: 订单0
1 创建订单
1 处理: 订单0
2 处理: 订单0
2 创建订单
2 创建订单
6 处理: 订单0
1 创建订单
3 处理: 订单0
4 处理: 订单0
5 处理: 订单0
8 处理: 订单0
2 创建订单
2 创建订单
2 创建订单
2 创建订单
0 处理: 订单0
7 处理: 订单0
2 创建订单
2 创建订单
9 处理: 订单0
1 创建订单
1 处理: 订单0
3 处理: 订单0
5 处理: 订单0
8 处理: 订单0
0 处理: 订单0
2 创建订单
6 处理: 订单0
4 处理: 订单0
2 处理: 订单0
2 创建订单
2 创建订单
2 创建订单
2 创建订单
2 创建订单
9 处理: 订单0
0 创建订单
0 创建订单
0 创建订单
7 处理: 订单1
0 创建订单
3 处理: 订单1
1 处理: 订单0
0 处理: 订单2
4 处理: 订单1
5 处理: 订单0
7 处理: 订单2
1 创建订单
1 创建订单
1 创建订单
1 创建订单
1 创建订单
1 创建订单
8 处理: 订单1
6 处理: 订单0
9 处理: 订单2
1 创建订单
1 创建订单
1 创建订单
2 处理: 订单1
2 创建订单
0 处理: 订单2
4 处理: 订单2
5 处理: 订单1
8 处理: 订单2
2 创建订单
2 创建订单
2 创建订单
2 创建订单
7 处理: 订单2
2 创建订单
6 处理: 订单2
2 创建订单
1 处理: 订单2
0 创建订单
9 处理: 订单2
2 处理: 订单2
3 处理: 订单1
2 创建订单
2 创建订单
2 创建订单

订阅者模型

待讲

6.线程池和进程池

线程池

提高并发并不是线程越多越好,每个系统对于线程的数量都有一个临界值,线程数量超过该临界值后,反而会降低系统性能。线程的上下文切换,遇到大量线程,也就很耗时,所以线程池的定义就是定义一组线程,用于处理当前的事务,线程处理完当前事务后,在继续处理其它事务。当事务超过线程池的处理能力,事务则等待出现空闲线程。线程池的线程数量也是可以根据系统性能调节额
python中没有线程池的机制,即使是python3中提供了该机制,也很low,所以进程池一般需要自己定义
* 简单实现线程池

利用队列,事先将创建的线程放在队列中,有事务需要执行时,从队列中取出一个线程进行执行,执行完之后自动再往队列中添加一个线程,实现队列中的线程 终止一个,创建一个

import queue
import threading
import time

class Threadpoll:

    def __init__(self,maxsize=5):
        self.maxsize = maxsize         #线程池的线程最大数量
        self._q = queue.Queue(self.maxsize)     #线程队列
        for i in range(self.maxsize):       #根据队列容量,添加满线程
            self.put_thread()               #调用线程put到队列的方法

    def get_thread(self):           #获取队列中的线程
        return self._q.get()

    def put_thread(self):           #往队列中put线程.put的值为线程的对象
        self._q.put(threading.Thread)

pool = Threadpoll(10)     #创建一个线程池,最大线程数量为10

def task(arg):          #执行的任务函数
    time.sleep(3)
    print(arg)
    pool.put_thread()    #执行完毕,线程已经停止,需要重新往队列中put一个线程

for i in range(1000):
    t = pool.get_thread()   #获取一个线程
    obj = t(target=task,args=(i,))      #指定线程执行的任务
    obj.start()     #开启线程

Low版

  • 升级版线程池 简单的线程池,只能实现可控的线程数量,实现处理多个事物,但是其中还是存在很多问题,如1.线程不重用,线程执行完之后,线程就死掉了,最终被垃圾回收机制处理,我们需要重新创建线程数量来填补队列。2.线程数量是固定的,当事务数量小于线程数量时,多数线程处于等待状态,造成线程浪费。下面将完美实现线程池

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#pyversion:python3.5
#owner:fuzj

import queue
import threading
import contextlib
import time

StopEvent = object()          #创建一个空的事务对象

class ThreadPool(object):
    ‘‘‘线程池类‘‘‘

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:                        #队列中事务个数容量
            self.q = queue.Queue(max_task_num)
        else:                               #默认为不限制容量
            self.q = queue.Queue()
        self.max_num = max_num              #线程池线程的最大数量
        self.cancel = False             #线程执行完后是否终止线程
        self.terminal = False           #是否立即终止线程
        self.generate_list = []     #存放正在运行的线程
        self.free_list = []         #存放空闲的线程

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:    #判断是否创建新的线程:当没有空闲线程和当前正在运行的线程总数量小于线程池的容量
            self.generate_thread()      #调用创建线程的方法
        w = (func, args, callback,)     #将执行事务(函数).参数和回调函数 组合为元组
        self.q.put(w)               #将事务放到队列中,等待线程处理

    def generate_thread(self):
        """
        创建线程的方法
        """
        t = threading.Thread(target=self.call)     #创建线程,并调用call方法
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread            #当前运行的线程
        self.generate_list.append(current_thread)           #将当前运行的线程放入运行列表中

        event = self.q.get()            #从队列中获取事务
        while event != StopEvent:       #循环从队列中取事务,并判断每次取的是否为空事务

            func, arguments, callback = event    #func 为事务名.arguments为参数,callback为回调函数
            try:                        #捕捉异常
                result = func(*arguments)           #执行事务(函数)
                success = True          #事务执行成功
            except Exception as e:          #事务执行出现异常
                success = False             #事务执行失败
                result = None           #结果为None

            if callback is not None:        #判断回调函数是否有实际内容
                try:
                    callback(success, result)       #将线程的执行结果和成功与否传参,执行回调函数
                except Exception as e:              #捕捉异常
                    pass

            with self.worker_state(self.free_list, current_thread):     #执行worker_state方法,记录等待线程
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True          #表示线程执行完则终止
        full_size = len(self.generate_list)         #取出正在运行的线程数量n
        while full_size:
            self.q.put(StopEvent)               #在队列中put n个空事务
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True        #强制执行线程终止

        while self.generate_list:       #根据现在正在运行的线程列表,往队列中put空事务,有多少个线程put多少个空事务
            self.q.put(StopEvent)

        self.q.empty()      #判断队列是否为空

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)            #将空闲的线程放到空闲的列表中
        try:
            yield state_list
        finally:
            state_list.remove(worker_thread)        #当线程被调用之后,从空闲的线程列表中移除

# How to use

pool = ThreadPool(5)        #实例化线程池,线程最大数量为5

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

#执行的事务
def action(i):
    time.sleep(1)
    print(i)

for i in range(30):             #创建30个事务,然后调用线程池的线程执行
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
#pool.close()
pool.terminate()

终极版

进程池

python中提哦那个了进程池的概念,可以直接使用

  • apply
from  multiprocessing import Process,Pool
import time

def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())

pool = Pool(5)    #创建进程池
for i in range(8):
    pool.apply(func=f1, args=(i,))     #给进程指定任务
print(‘end‘)

输出效果:发现进程是依次执行,没有并发的效果

100 1469023658.397264
101 1469023660.400479
102 1469023662.40297
103 1469023664.408063
104 1469023666.410736
105 1469023668.413439
106 1469023670.414941
107 1469023672.417192
end
  • apply_sync
from  multiprocessing import Process,Pool
import time

def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())

pool = Pool(5)    #创建进程池
for i in range(8):
    pool.apply_async(func=f1, args=(i,))     #给进程指定任务

print(‘end‘)
time.sleep(2)      #主进程等代2s

执行效果:发现事务没有执行完,主进程终止了子进程

end
100 1469023994.189458
102 1469023994.189459
101 1469023994.189458
103 1469023994.189688
104 1469023994.190085

改进;

from  multiprocessing import Process,Pool
import time

def f1(arg):
    time.sleep(2)
    print(arg+100,time.time())

pool = Pool(5)    #创建进程池

for i in range(8):
    pool.apply_async(func=f1, args=(i,))     #给进程指定任务

print(‘end‘)
pool.close()        #所有任务执行完毕后基础

time.sleep(2)
#pool.terminate()    #子进程立即终止
pool.join()  #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

print(123)

执行效果:

end
100 1469024133.797895
102 1469024133.797907
101 1469024133.797895
104 1469024133.7983
103 1469024133.797907
107 1469024135.800982
106 1469024135.800983
105 1469024135.801028
123
二.协程

1.简介

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协成的原理:利用一个线程,分解一个线程成为多个微线程,注意此是从程序级别来分解的
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

2.如何实现

greenlet和gevent需要手动安装模块。直接安装gevent默认会把greenlet装上
* 基于底层greenlet实现

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

执行过程:解释器从上倒下读代码后,读到gr1.switch()时,开始执行gr1对应的test1函数,test1函数执行完print(12)后遇到gr2.swith(),会自动执行gr2的test2,test2函数中执行完print(56)遇到gr1.switch(),会继续执行test1的 print(34),最后遇到gr2.switch(),会执行test2的print(78)
输出效果:

12
56
34
78
  • 使用gevent实现
import gevent

def foo():
    print(‘Running in foo‘)
    gevent.sleep(0)
    print(‘Explicit context switch to foo again‘)

def bar():
    print(‘Explicit context to bar‘)
    gevent.sleep(0)
    print(‘Implicit context switch back to bar‘)

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

效果:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
  • 举例
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print(‘GET: %s‘ % url)
    resp = requests.get(url)
    data = resp.text
    print(‘%d bytes received from %s.‘ % (len(data), url))

gevent.joinall([
        gevent.spawn(f, ‘https://www.python.org/‘),
        gevent.spawn(f, ‘https://www.yahoo.com/‘),
        gevent.spawn(f, ‘https://github.com/‘),
])

效果:

GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
431218 bytes received from https://www.yahoo.com/.
25529 bytes received from https://github.com/.
47394 bytes received from https://www.python.org/.
时间: 2024-11-06 12:48:06

线程、进程、协程和队列的相关文章

11 线程进程协程

1 线程 1.1 基本应用 1.1.1 标准线程(常用) import threading def f1(arg): print(arg) t = threading.Thread(target=f1, args=(123,)) t.start() 1.1.2 自定义线程 自定义线程类既threading.Thread流程,自定义run方法 import threading class MyThread(threading.Thread): #自定义类,继承threading.Thread类 d

15.python并发编程(线程--进程--协程)

一.进程:1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程2.组成:进程一般由程序,数据集,进程控制三部分组成:(1)程序:用来描述进程要完成哪些功能以及如何完成(2)数据集:是程序在执行过程中所需要使用的一切资源(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志.3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的二.线程:1.定义:最小的执行单位,线程的出现是为了

Python的线程&amp;进程&amp;协程[0] -&gt; 基本概念

基本概念 / Basic Concept 0 简介与动机 / Why Multi-Thread/Multi-Process/Coroutine 在多线程(multithreaded, MT)编程出现之前,计算机程序的执行是由单个步骤序列组成的,该序列在主机的CPU中按照同步顺序执行.即无论任务多少,是否包含子任务,都要按照顺序方式进行. 然而,假定子任务之间相互独立,没有因果关系,若能使这些独立的任务同时运行,则这种并行处理方式可以显著提高整个任务的性能,这便是多线程编程. 而对于Python而

线程 进程 协程

一.什么是线程? 线程是操作系统能够进行运算调度的最小单位(程序执行流的最小单元).它被包含在进程之中,是进程中的实际运作单位.一条线程指的是一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务. 一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成.另外,线程是进程的中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源.一个线程可以创建和撤销另一

第九天 线程 进程 协程 队列

详细链接http://www.cnblogs.com/alex3714/articles/5230609.html 1.线程:包含在进程中,是操作系统运算调度的最小单位,是一串指令的集合,直接与cpu交互 2进程:进程是一个程序各种资源的集合.操作系统通过管理这个集合进而运行程序,进程本身并不执行,进程通过调用线程来调度cpu. 3.不同点: 一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程 创建新线程很简单,但是创建一个子进程需要对父进程进行拷贝 线程共享内存,进程的内存是独

Python的线程&amp;进程&amp;协程[0] -&gt; 线程 -&gt; 多线程的建立与使用

常用的多线程功能实现 目录 生成线程的三种方法 单线程与多线程对比 守护线程的设置 1 生成线程的三种方法 三种方式分别为: 创建一个Thread实例,传给它一个函数 创建一个Thread实例,传给它一个可调用的类实例 派生Thread的子类,并创建子类的实例 # There are three ways to create a thread # The first is create a thread instance, and pass a function # The second one

Python的线程&amp;进程&amp;协程[0] -&gt; 线程 -&gt; 多线程锁的使用

锁与信号量 目录 添加线程锁 锁的本质 互斥锁与可重入锁 死锁的产生 锁的上下文管理 信号量与有界信号量 1 添加线程锁 由于多线程对资源的抢占顺序不同,可能会产生冲突,通过添加线程锁来对共有资源进行控制. 1 import atexit 2 from random import randrange 3 from threading import Thread, Lock, current_thread # or currentThread 4 from time import ctime, s

Python的线程&amp;进程&amp;协程[1] -&gt; 线程 -&gt; 多线程的控制方式

多线程的控制方式 目录 唤醒单个线程等待 唤醒多个线程等待 条件函数等待 事件触发标志 函数延迟启动 设置线程障碍 1 唤醒单个线程等待 Condition类相当于一把高级的锁,可以进行一些复杂的线程同步控制.一般Condition内部都有一把内置的锁对象(默认为RLock),对于Condition的使用主要有以下步骤: 建立两个线程对象,及Condition对象; 线程1首先获取Condition的锁权限,acquire(); 线程1执行需要完成的任务后,调用等待wait(),此时,线程1会阻

Python的线程&amp;进程&amp;协程[2] -&gt; 进程 -&gt; 多进程的基本使用

多进程的基本使用 1 subprocess 常用函数示例 首先定义一个子进程调用的程序,用于打印一个输出语句,并获取命令行参数 1 import sys 2 print('Called_Function.py called, Hello world.') 3 try: 4 print('Got para', sys.argv[1:]) 5 except: 6 pass 再定义主函数,即父进程,分别测试 run() / call() / check_call() / getstatusoutput

Python 中的进程、线程、协程、同步、异步、回调

进程和线程究竟是什么东西?传统网络服务模型是如何工作的?协程和线程的关系和区别有哪些?IO过程在什么时间发生? 在刚刚结束的 PyCon2014 上海站,来自七牛云存储的 Python 高级工程师许智翔带来了关于 Python 的分享<Python中的进程.线程.协程.同步.异步.回调>. 一.上下文切换技术 简述 在进一步之前,让我们先回顾一下各种上下文切换技术. 不过首先说明一点术语.当我们说"上下文"的时候,指的是程序在执行中的一个状态.通常我们会用调用栈来表示这个状