一、Python 多进程多线程原理介绍
1. Python 全局解释器锁GIL
a) Python的全局解释器锁GIL是互斥锁,能够防止本机多个线程一次执行Python字节码;由于CPython的内存管理在线程级别是不安全的(内存泄露),所以这个全局解释器锁是必须的。每个Python进程只能申请使用一个GIL锁,因此Python的多线程虽然是并发的但不能并行处理。Python的解释器每次只能执行一个线程,待GIL锁释放后再执行下一个线程,这样线程轮流被执行。
b) Python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100;python3.x中,GIL不使用ticks计数,改为使用计时器即执行时间达到设定阈值后,释放当前线程的GIL。
2. Python 多进程多线程适用场景
a) CPU-bound (计算密集型)
CPU-bound 指的是系统的硬盘/内存效能相对CPU的效能要好很多,系统运行时的状态是CPU占用接近100%;I/O读写硬盘和内存占用很低。在这种计算密集型的状况下Python的ticks计数器很快达到阈值会触发GIL的释放与再锁定。这时Python多线程会频繁的加锁和释放锁,消耗大量的CPU资源。因此对于计算密集型的程序Python使用多进程要比多线程好很多。
b) I/O-bound (I/O密集型)
I/O-bound 指的是系统的CPU效能相对硬盘/内存的效能要好很多,系统运行时大部分时间是在等待I/O读写,CPU的占用并不高。Python单线程下有I/O操作会进行I/O等待,造成不必要的CPU时间浪费,开启多线程能在线程A执行I/O等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率。因此对于I/O密集型的程序Python多线程有一定的优势。
二、 Python 多进程多线程编程模板
1. 此模板具有两个重要优点:
a) 根据程序执行类型自定义多进程多线程的开启个数,调节程序的执行效率。
b) 程序中还实时打印程序执行进度并不较大的影响程序的执行效率。
1 #!/usr/bin/env python 2 # -*- coding: UTF-8 -*- 3 __author__="阿辉枫情" 4 __date__ = "$2017-5-12 21:49:51$" 5 import os 6 import sys 7 import time 8 import random 9 import threading 10 from multiprocessing import Process, Manager, Lock 11 12 # 解决编码问题 13 reload(sys) 14 sys.setdefaultencoding(‘utf-8‘) 15 Type = sys.getfilesystemencoding() 16 17 #------------------------------------------------ 18 # 可修改的全局变量参数--Start. 19 tasklist = [] # 定义任务列表 20 21 PROCESS_COUNT = 2 # 定义进程数量 22 THREAD_COUNT = 4 # 定义线程数量 23 # 可修改全局变量参数--End. 24 #------------------------------------------------ 25 26 27 class HandleTask(threading.Thread): 28 """docstring for HandleTask""" 29 30 def __init__(self, proid, prolock, thrid, thrlock, tasklist, tasknum, schedule): 31 super(HandleTask, self).__init__() 32 self.proid = proid 33 self.prolock = prolock 34 self.thrid = thrid 35 self.thrlock = thrlock 36 self.tasklist = tasklist 37 self.tasknum = tasknum 38 self.sch = schedule 39 self.pid = os.getpid() 40 41 def run(self): 42 self.prolock.acquire() 43 self.thrlock.acquire() 44 print "The Thread [%s:%s] tasklist number:[%s]" % (self.proid, self.thrid, len(self.tasklist)) 45 self.thrlock.release() 46 self.prolock.release() 47 48 for (element, ) in self.tasklist: 49 # 任务执行开始 50 # print element 51 time.sleep(1) 52 # 任务执行结束 53 54 self.prolock.acquire() 55 self.thrlock.acquire() 56 self.sch.value += 1 57 self.thrlock.release() 58 self.prolock.release() 59 60 def Thread_Handle(proid, prolock, tasklist, tasknum, schedule): 61 global THREAD_COUNT 62 lock = threading.Lock() 63 WorksThread = [] 64 thread_task_number = len(tasklist) / THREAD_COUNT 65 if thread_task_number == 0: 66 THREAD_COUNT = len(tasklist) 67 thread_task_number = 1 68 69 for i in range(THREAD_COUNT): 70 if i != THREAD_COUNT - 1: 71 source_list = tasklist[i * thread_task_number: (i + 1) * thread_task_number] 72 else: 73 source_list = tasklist[i * thread_task_number:] 74 Work = HandleTask(proid, prolock, i, lock, source_list, tasknum, schedule) 75 Work.start() 76 WorksThread.append(Work) 77 78 for Work in WorksThread: 79 Work.join() 80 81 def Process_Handle(tasklist, tasknum): 82 global PROCESS_COUNT 83 lock = Lock() 84 # 定义进度变量 schedule 85 schedule = Manager().Value(‘schedule‘, 0) 86 WorksProcess = [] 87 # 按照任务大小进行进程任务分配 88 process_task_num = len(tasklist) / PROCESS_COUNT 89 if process_task_num == 0: 90 PROCESS_COUNT = len(tasklist) 91 process_task_num = 1 92 93 for i in range(PROCESS_COUNT): 94 if i != PROCESS_COUNT - 1: 95 source_list = tasklist[i * process_task_num: (i + 1) * process_task_num] 96 else: 97 source_list = tasklist[i * process_task_num:] 98 Work = Process(target=Thread_Handle, args=(i, lock, source_list, tasknum, schedule)) 99 Work.start() 100 WorksProcess.append(Work) 101 # 添加额外进程打印任务执行进度 102 Work = Process(target=Displays, args=(lock, tasknum, schedule)) 103 Work.start() 104 WorksProcess.append(Work) 105 for Work in WorksProcess: 106 Work.join() 107 del WorksProcess 108 109 def Displays(prolock, tasknum, schedule, delaytime=None): 110 if delaytime is None: 111 delaytime = 1 112 while (tasknum - schedule.value): 113 time.sleep(delaytime) 114 print "Completed:[%s] , Remaining:[%s]" % (schedule.value, tasknum - schedule.value) 115 116 def main(): 117 # 打印输出主进程号 118 print "The Main Process ID:[%s]"% os.getpid() 119 # 建立测试任务 120 for i in range(1, 101): 121 tasklist.append((i, )) 122 Process_Handle(tasklist, len(tasklist)) 123 124 125 if __name__ == ‘__main__‘: 126 127 print "The Program start time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) 128 start = time.time() 129 main() 130 print "The Program end time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "[%s]" % (time.time() - start) 131 raw_input("Please enter any key to end!!!".decode(‘utf-8‘).encode(Type))
2. 程序执行测试结果
1 The Program start time: 2017-05-12 18:15:12 2 The Main Process ID:[9752] 3 The Thread [0:0] tasklist number:[12] 4 The Thread [1:0] tasklist number:[12] 5 The Thread [1:1] tasklist number:[12] 6 The Thread [0:1] tasklist number:[12] 7 The Thread [1:2] tasklist number:[12] 8 The Thread [0:2] tasklist number:[12] 9 The Thread [0:3] tasklist number:[14] 10 The Thread [1:3] tasklist number:[14] 11 Completed:[0] , Remaining:[100] 12 Completed:[8] , Remaining:[92] 13 Completed:[16] , Remaining:[84] 14 Completed:[25] , Remaining:[75] 15 Completed:[34] , Remaining:[66] 16 Completed:[42] , Remaining:[57] 17 Completed:[51] , Remaining:[49] 18 Completed:[59] , Remaining:[41] 19 Completed:[67] , Remaining:[32] 20 Completed:[76] , Remaining:[24] 21 Completed:[85] , Remaining:[15] 22 Completed:[93] , Remaining:[7] 23 Completed:[98] , Remaining:[2] 24 Completed:[100] , Remaining:[0] 25 The Program end time: 2017-05-12 18:15:27 [15.007999897] 26 Please enter any key to end!!!