本节内容
操作系统发展史介绍
进程、与线程区别
python GIL全局解释器锁
线程
语法
join
线程锁之Lock\Rlock\信号量
将线程变为守护进程
Event事件
queue队列
生产者消费者模型
Queue队列
开发一个线程池
进程
语法
进程间通讯
进程池
操作系统发展史
1.手工操作(无操作系统)
程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把程序和数据输入计算机内存,接着通过控制台开关启动程序针对数据运行;计算完毕,打印机输出计算结果;用户取走结果并卸下纸带(或卡片)后,才让下一个用户上机。
手工操作方式两个特点:
(1)用户独占全机。不会出现因资源已被其他用户占用而等待的现象,但资源的利用率低。
(2)CPU 等待手工操作。CPU的利用不充分。
2.批处理系统
批处理系统:加载在计算机上的一个系统软件,在它的控制下,计算机能够自动地、成批地处理一个或多个用户的作业(这作业包括程序、数据和命令)。
3.联机批处理系统
首先出现的是联机批处理系统,即作业的输入/输出由CPU来处理。
主机与输入机之间增加一个存储设备——磁带,在运行于主机上的监督程序的自动控制下,计算机可自动完成:成批地把输入机上的用户作业读入磁带,依次把磁带上的用户作业读入主机内存并执行并把计算结果向输出机输出。完成了上一批作业后,监督程序又从输入机上输入另一批作业,保存在磁带上,并按上述步骤重复处理。
4.脱机批处理系统
为克服与缓解高速主机与慢速外设的矛盾,提高CPU的利用率,又引入了脱机批处理系统,即输入/输出脱离主机控制。
这种方式的显著特征是:增加一台不与主机直接相连而专门用于与输入/输出设备打交道的卫星机。
其功能是:
(1)从输入机上读取用户作业并放到输入磁带上。
(2)从输出磁带上读取执行结果并传给输出机。
这样,主机不是直接与慢速的输入/输出设备打交道,而是与速度相对较快的磁带机发生关系,有效缓解了主机与设备的矛盾。主机与卫星机可并行工作,二者分工明确,可以充分发挥主机的高速计算能力。
脱机批处理系统:20世纪60年代应用十分广泛,它极大缓解了人机矛盾及主机与外设的矛盾。IBM-7090/7094:配备的监督程序就是脱机批处理系统,是现代操作系统的原型。
不足:每次主机内存中仅存放一道作业,每当它运行期间发出输入/输出(I/O)请求后,高速的CPU便处于等待低速的I/O完成状态,致使CPU空闲。
为改善CPU的利用率,又引入了多道程序系统。
5.多道程序系统
多道程序设计技术
所谓多道程序设计技术,就是指允许多个程序同时进入内存并运行。即同时把多个程序放入内存,并允许它们交替在CPU中运行,它们共享系统中的各种硬、软件资源。当一道程序因I/O请求而暂停运行时,CPU便立即转去运行另一道程序。
多道程序设计技术不仅使CPU得到充分利用,同时改善I/O设备和内存的利用率,从而提高了整个系统的资源利用率和系统吞吐量(单位时间内处理作业(程序)的个数),最终提高了整个系统的效率。
单处理机系统中多道程序运行时的特点:
(1)多道:计算机内存中同时存放几道相互独立的程序;
(2)宏观上并行:同时进入系统的几道程序都处于运行过程中,即它们先后开始了各自的运行,但都未运行完毕;
(3)微观上串行:实际上,各道程序轮流地用CPU,并交替运行。
多道程序系统的出现,标志着操作系统渐趋成熟的阶段,先后出现了作业调度管理、处理机管理、存储器管理、外部设备管理、文件系统管理等功能。
6.1分时系统
由于CPU速度不断提高和采用分时技术,一台计算机可同时连接多个用户终端,而每个用户可在自己的终端上联机使用计算机,好象自己独占机器一样。
分时技术:把处理机的运行时间分成很短的时间片,按时间片轮流把处理机分配给各联机作业使用。
若某个作业在分配给它的时间片内不能完成其计算,则该作业暂时中断,把处理机让给另一作业使用,等待下一轮时再继续其运行。由于计算机速度很快,作业运行轮转得很快,给每个用户的印象是,好象他独占了一台计算机。而每个用户可以通过自己的终端向系统发出各种操作控制命令,在充分的人机交互情况下,完成作业的运行。
具有上述特征的计算机系统称为分时系统,它允许多个用户同时联机使用计算机。
特点:
(1)多路性。若干个用户同时使用一台计算机。微观上看是各用户轮流使用计算机;宏观上看是各用户并行工作。
(2)交互性。用户可根据系统对请求的响应结果,进一步向系统提出新的请求。这种能使用户与系统进行人机对话的工作方式,明显地有别于批处理系统,因而,分时系统又被称为交互式系统。
(3)独立性。用户之间可以相互独立操作,互不干扰。系统保证各用户程序运行的完整性,不会发生相互混淆或破坏现象。
(4)及时性。系统可对用户的输入及时作出响应。分时系统性能的主要指标之一是响应时间,它是指:从终端发出命令到系统予以应答所需的时间。
分时系统的主要目标:对用户响应的及时性,即不至于用户等待每一个命令的处理时间过长。
分时系统可以同时接纳数十个甚至上百个用户,由于内存空间有限,往往采用对换(又称交换)方式的存储方法。即将未“轮到”的作业放入磁盘,一旦“轮到”,再将其调入内存;而时间片用完后,又将作业存回磁盘(俗称“滚进”、“滚出“法),使同一存储区域轮流为多个用户服务。
多用户分时系统是当今计算机操作系统中最普遍使用的一类操作系统。
6.2实时系统
虽然多道批处理系统和分时系统能获得较令人满意的资源利用率和系统响应时间,但却不能满足实时控制与实时信息处理两个应用领域的需求。于是就产生了实时系统,即系统能够及时响应随机发生的外部事件,并在严格的时间范围内完成对该事件的处理。
实时系统在一个特定的应用中常作为一种控制设备来使用。
实时系统可分成两类:
(1)实时控制系统。当用于飞机飞行、导弹发射等的自动控制时,要求计算机能尽快处理测量系统测得的数据,及时地对飞机或导弹进行控制,或将有关信息通过显示终端提供给决策人员。当用于轧钢、石化等工业生产过程控制时,也要求计算机能及时处理由各类传感器送来的数据,然后控制相应的执行机构。
(2)实时信息处理系统。当用于预定飞机票、查询有关航班、航线、票价等事宜时,或当用于银行系统、情报检索系统时,都要求计算机能对终端设备发来的服务请求及时予以正确的回答。此类对响应及时性的要求稍弱于第一类。
实时操作系统的主要特点:
(1)及时响应。每一个信息接收、分析处理和发送的过程必须在严格的时间限制内完成。
(2)高可靠性。需采取冗余措施,双机系统前后台工作,也包括必要的保密措施等。
进程与线程
什么是进程(process)?
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。
有了进程为什么还要线程?
进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:
- 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
- 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
什么是线程(thread)?
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
进程与线程的区别?
- Threads share the address space of the process that created it; processes have their own address space.
- Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
- Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
- New threads are easily created; new processes require duplication of the parent process.
- Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
- Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
Python threading模块
线程有2种调用方式,如下:
直接调用
import
threading
import
time
def
sayhi(num):
#定义每个线程要运行的函数
print
(
"running on number:%s"
%
num)
time.sleep(
3
)
if
__name__
=
=
‘__main__‘
:
t1
=
threading.Thread(target
=
sayhi,args
=
(
1
,))
#生成一个线程实例
t2
=
threading.Thread(target
=
sayhi,args
=
(
2
,))
#生成另一个线程实例
t1.start()
#启动线程
t2.start()
#启动另一个线程
print
(t1.getName())
#获取线程名
print
(t2.getName())
继承式调用
import
threading
import
time
class
MyThread(threading.Thread):
def
__init__(
self
,num):
threading.Thread.__init__(
self
)
self
.num
=
num
def
run(
self
):
#定义每个线程要运行的函数
print
(
"running on number:%s"
%
self
.num)
time.sleep(
3
)
if
__name__
=
=
‘__main__‘
:
t1
=
MyThread(
1
)
t2
=
MyThread(
2
)
t1.start()
t2.start()
Join & Daemon
#_*_coding:utf-8_*_
__author__
=
‘Alex Li‘
import
time
import
threading
def
run(n):
print
(
‘[%s]------running----\n‘
%
n)
time.sleep(
2
)
print
(
‘--done--‘
)
def
main():
for
i
in
range
(
5
):
t
=
threading.Thread(target
=
run,args
=
[i,])
t.start()
t.join(
1
)
print
(
‘starting thread‘
, t.getName())
m
=
threading.Thread(target
=
main,args
=
[])
m.setDaemon(
True
)
#将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(timeout
=
2
)
print
(
"---main thread done----"
)
Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event
.
线程锁(互斥锁Mutex)
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?
import
time
import
threading
def
addNum():
global
num
#在每个线程中都获取这个全局变量
print
(
‘--get num:‘
,num )
time.sleep(
1
)
num
-
=
1
#对此公共变量进行-1操作
num
=
100
#设定一个共享变量
thread_list
=
[]
for
i
in
range
(
100
):
t
=
threading.Thread(target
=
addNum)
t.start()
thread_list.append(t)
for
t
in
thread_list:
#等待所有线程执行完毕
t.join()
print
(
‘final num:‘
, num )
正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁
加锁版本
import
time
import
threading
def
addNum():
global
num
#在每个线程中都获取这个全局变量
print
(
‘--get num:‘
,num )
time.sleep(
1
)
lock.acquire()
#修改数据前加锁
num
-
=
1
#对此公共变量进行-1操作
lock.release()
#修改后释放
num
=
100
#设定一个共享变量
thread_list
=
[]
lock
=
threading.Lock()
#生成全局锁
for
i
in
range
(
100
):
t
=
threading.Thread(target
=
addNum)
t.start()
thread_list.append(t)
for
t
in
thread_list:
#等待所有线程执行完毕
t.join()
print
(
‘final num:‘
, num )
GIL VS Lock
既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。
RLock(递归锁)
说白了就是在一个大锁中还要再包含子锁
import
threading,time
def
run1():
print
(
"grab the first part data"
)
lock.acquire()
global
num
num
+
=
1
lock.release()
return
num
def
run2():
print
(
"grab the second part data"
)
lock.acquire()
global
num2
num2
+
=
1
lock.release()
return
num2
def
run3():
lock.acquire()
res
=
run1()
print
(
‘--------between run1 and run2-----‘
)
res2
=
run2()
lock.release()
print
(res,res2)
if
__name__
=
=
‘__main__‘
:
num,num2
=
0
,
0
lock
=
threading.RLock()
for
i
in
range
(
10
):
t
=
threading.Thread(target
=
run3)
t.start()
while
threading.active_count() !
=
1
:
print
(threading.active_count())
else
:
print
(
‘----all threads done---‘
)
print
(num,num2)
Semaphore(信号量)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 。
import
threading,time
def
run(n):
semaphore.acquire()
time.sleep(
1
)
print
(
"run the thread: %s\n"
%
n)
semaphore.release()
if
__name__
=
=
‘__main__‘
:
num
=
0
semaphore
=
threading.BoundedSemaphore(
5
)
#最多允许5个线程同时运行
for
i
in
range
(
20
):
t
=
threading.Thread(target
=
run,args
=
(i,))
t.start()
while
threading.active_count() !
=
1
:
pass
#print threading.active_count()
else
:
print
(
‘----all threads done---‘
)
print
(num)
Timer
This class represents an action that should be run only after a certain amount of time has passed
Timers are started, as with threads, by calling their start()
method. The timer can be stopped (before its action has begun) by calling thecancel()
method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.
def
hello():
print
(
"hello, world"
)
t
=
Timer(
30.0
, hello)
t.start()
# after 30 seconds, "hello, world" will be printed
Events
An event is a simple synchronization object;
the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
import
threading,time
import
random
def
light():
if
not
event.isSet():
event.
set
()
#wait就不阻塞 #绿灯状态
count
=
0
while
True
:
if
count <
10
:
print
(
‘\033[42;1m--green light on---\033[0m‘
)
elif
count <
13
:
print
(
‘\033[43;1m--yellow light on---\033[0m‘
)
elif
count <
20
:
if
event.isSet():
event.clear()
print
(
‘\033[41;1m--red light on---\033[0m‘
)
else
:
count
=
0
event.
set
()
#打开绿灯
time.sleep(
1
)
count
+
=
1
def
car(n):
while
1
:
time.sleep(random.randrange(
10
))
if
event.isSet():
#绿灯
print
(
"car [%s] is running.."
%
n)
else
:
print
(
"car [%s] is waiting for the red light.."
%
n)
if
__name__
=
=
‘__main__‘
:
event
=
threading.Event()
Light
=
threading.Thread(target
=
light)
Light.start()
for
i
in
range
(
3
):
t
=
threading.Thread(target
=
car,args
=
(i,))
t.start()
这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。
1 #_*_coding:utf-8_*_ 2 __author__ = ‘Alex Li‘ 3 import threading 4 import time 5 import random 6 7 def door(): 8 door_open_time_counter = 0 9 while True: 10 if door_swiping_event.is_set(): 11 print("\033[32;1mdoor opening....\033[0m") 12 door_open_time_counter +=1 13 14 else: 15 print("\033[31;1mdoor closed...., swipe to open.\033[0m") 16 door_open_time_counter = 0 #清空计时器 17 door_swiping_event.wait() 18 19 20 if door_open_time_counter > 3:#门开了已经3s了,该关了 21 door_swiping_event.clear() 22 23 time.sleep(0.5) 24 25 26 def staff(n): 27 28 print("staff [%s] is comming..." % n ) 29 while True: 30 if door_swiping_event.is_set(): 31 print("\033[34;1mdoor is opened, passing.....\033[0m") 32 break 33 else: 34 print("staff [%s] sees door got closed, swipping the card....." % n) 35 print(door_swiping_event.set()) 36 door_swiping_event.set() 37 print("after set ",door_swiping_event.set()) 38 time.sleep(0.5) 39 door_swiping_event = threading.Event() #设置事件 40 41 42 door_thread = threading.Thread(target=door) 43 door_thread.start() 44 45 46 47 for i in range(5): 48 p = threading.Thread(target=staff,args=(i,)) 49 time.sleep(random.randrange(3)) 50 p.start()
queue队列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
queue.
Queue
(maxsize=0) #先入先出
- class
queue.
LifoQueue
(maxsize=0) #last in fisrt out - class
queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
- exception
queue.
Empty
- exception
queue.
Full
Queue.
qsize
()
Queue.
empty
() #return True if empty
Queue.
full
() # return True if full
Queue.
put
(item, block=True, timeout=None)
Queue.
get_nowait
()
Queue.
task_done
()
Queue.
join
() block直到queue被消费完毕
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
下面来学习一个最基本的生产者消费者模型的例子
import
threading
import
queue
def
producer():
for
i
in
range
(
10
):
q.put(
"骨头 %s"
%
i )
print
(
"开始等待所有的骨头被取走..."
)
q.join()
print
(
"所有的骨头被取完了..."
)
def
consumer(n):
while
q.qsize() >
0
:
print
(
"%s 取到"
%
n , q.get())
q.task_done()
#告知这个任务执行完了
q
=
queue.Queue()
p
=
threading.Thread(target
=
producer,)
p.start()
c1
=
consumer(
"李闯"
)
import
time,random
import
queue,threading
q
=
queue.Queue()
def
Producer(name):
count
=
0
while
count <
20
:
time.sleep(random.randrange(
3
))
q.put(count)
print
(
‘Producer %s has produced %s baozi..‘
%
(name, count))
count
+
=
1
def
Consumer(name):
count
=
0
while
count <
20
:
time.sleep(random.randrange(
4
))
if
not
q.empty():
data
=
q.get()
print
(data)
print
(
‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘
%
(name, data))
else
:
print
(
"-----no baozi anymore----"
)
count
+
=
1
p1
=
threading.Thread(target
=
Producer, args
=
(
‘A‘
,))
c1
=
threading.Thread(target
=
Consumer, args
=
(
‘B‘
,))
p1.start()
c1.start()
多进程multiprocessing
multiprocessing
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
from
multiprocessing
import
Process
import
time
def
f(name):
time.sleep(
2
)
print
(
‘hello‘
, name)
if
__name__
=
=
‘__main__‘
:
p
=
Process(target
=
f, args
=
(
‘bob‘
,))
p.start()
p.join()
-------------------------------------------------
from
multiprocessing
import
Process
import
os
def
info(title):
print
(title)
print
(
‘module name:‘
, __name__)
print
(
‘parent process:‘
, os.getppid())
print
(
‘process id:‘
, os.getpid())
print
(
"\n\n"
)
def
f(name):
info(
‘\033[31;1mfunction f\033[0m‘
)
print
(
‘hello‘
, name)
if
__name__
=
=
‘__main__‘
:
info(
‘\033[32;1mmain process line\033[0m‘
)
p
=
Process(target
=
f, args
=
(
‘bob‘
,))
p.start()
p.join()
进程间通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
1.Queues
使用方法跟threading里的queue差不多
from
multiprocessing
import
Process, Queue
def
f(q):
q.put([
42
,
None
,
‘hello‘
])
if
__name__
=
=
‘__main__‘
:
q
=
Queue()
p
=
Process(target
=
f, args
=
(q,))
p.start()
print
(q.get())
# prints "[42, None, ‘hello‘]"
p.join()
2.Pipes
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from
multiprocessing
import
Process, Pipe
def
f(conn):
conn.send([
42
,
None
,
‘hello‘
])
conn.close()
if
__name__
=
=
‘__main__‘
:
parent_conn, child_conn
=
Pipe()
p
=
Process(target
=
f, args
=
(child_conn,))
p.start()
print
(parent_conn.recv())
# prints "[42, None, ‘hello‘]"
p.join()
3.Managers
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example,
from
multiprocessing
import
Process, Manager
def
f(d, l):
d[
1
]
=
‘1‘
d[
‘2‘
]
=
2
d[
0.25
]
=
None
l.append(
1
)
print
(l)
if
__name__
=
=
‘__main__‘
:
with Manager() as manager:
d
=
manager.
dict
()
l
=
manager.
list
(
range
(
5
))
p_list
=
[]
for
i
in
range
(
10
):
p
=
Process(target
=
f, args
=
(d, l))
p.start()
p_list.append(p)
for
res
in
p_list:
res.join()
print
(d)
print
(l)
进程同步
Without using the lock output from the different processes is liable to get all mixed up.
from
multiprocessing
import
Process, Lock
def
f(l, i):
l.acquire()
try
:
print
(
‘hello world‘
, i)
finally
:
l.release()
if
__name__
=
=
‘__main__‘
:
lock
=
Lock()
for
num
in
range
(
10
):
Process(target
=
f, args
=
(lock, num)).start()
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
- apply
- apply_async
from
multiprocessing
import
Process,Pool
import
time
def
Foo(i):
time.sleep(
2
)
return
i
+
100
def
Bar(arg):
print
(
‘-->exec done:‘
,arg)
pool
=
Pool(
5
)
for
i
in
range
(
10
):
pool.apply_async(func
=
Foo, args
=
(i,),callback
=
Bar)
#pool.apply(func=Foo, args=(i,))
print
(
‘end‘
)
pool.close()
pool.join()
#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。