Python线程
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#!/usr/bin/env python
import threading
import time
def show(arg):
time.sleep( 1 )
print ( ‘thread‘ + str (arg))
for i in range ( 10 ):
t = threading.Thread(target = show, args = (i,))
t.start()
print ( ‘main thread stop‘ )
|
上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
更多方法:
- start 线程准备就绪,等待CPU调度
- setName 为线程设置名称
- getName 获取线程名称
- setDaemon 设置为后台线程或前台线程(默认)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
- join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
- run 线程被cpu调度后执行Thread类对象的run方法
Python GIL(Global Interpreter Lock)
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
线程锁(互斥锁Mutex)
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import time
import threading
def addNum():
global num #在每个线程中都获取这个全局变量
print ( ‘--get num:‘ ,num )
time.sleep( 1 )
num - = 1 #对此公共变量进行-1操作
num = 100 #设定一个共享变量
thread_list = []
for i in range ( 100 ):
t = threading.Thread(target = addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
t.join()
print ( ‘final num:‘ , num )
|
正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁
加锁版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import time
import threading
def addNum():
global num #在每个线程中都获取这个全局变量
print ( ‘--get num:‘ ,num )
time.sleep( 1 )
lock.acquire() #修改数据前加锁
num - = 1 #对此公共变量进行-1操作
lock.release() #修改后释放
num = 100 #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range ( 100 ):
t = threading.Thread(target = addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
t.join()
print ( ‘final num:‘ , num )
|
RLock(递归锁)
说白了就是在一个大锁中还要再包含子锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
def run1():
print ( "grab the first part data" )
lock.acquire()
global num
num + = 1
lock.release()
return num
def run2():
print ( "grab the second part data" )
lock.acquire()
global num2
num2 + = 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print ( ‘--------between run1 and run2-----‘ )
res2 = run2()
lock.release()
print (res,res2)
if __name__ = = ‘__main__‘ :
num,num2 = 0 , 0
lock = threading.RLock()
for i in range ( 10 ):
t = threading.Thread(target = run3)
t.start()
while threading.active_count() ! = 1 :
print (threading.active_count())
else :
print ( ‘----all threads done---‘ )
print (num,num2)
|
Semaphore(信号量)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
def run(n):
semaphore.acquire()
time.sleep( 1 )
print ( "run the thread: %s\n" % n)
semaphore.release()
if __name__ = = ‘__main__‘ :
num = 0
semaphore = threading.BoundedSemaphore( 3 ) #最多允许5个线程同时运行
for i in range ( 20 ):
t = threading.Thread(target = run,args = (i,))
t.start()
while threading.active_count() ! = 1 :
pass #print threading.active_count()
else :
print ( ‘----all threads done---‘ )
print (num)
|
event
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False
- set:将“Flag”设置为True
通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
import random
def light():
if not event.isSet():
event. set () #wait就不阻塞 #绿灯状态
count = 0
while True :
if count < 10 :
print ( ‘\033[42;1m--green light on---\033[0m‘ )
elif count < 13 :
print ( ‘\033[43;1m--yellow light on---\033[0m‘ )
elif count < 20 :
if event.isSet():
event.clear()
print ( ‘\033[41;1m--red light on---\033[0m‘ )
else :
count = 0
event. set () #打开绿灯
time.sleep( 1 )
count + = 1
def car(n): #no bug version
while 1 :
time.sleep( 1 )
if event.isSet(): #绿灯
print ( "car [%s] is running.." % n)
else :
print ( "car [%s] is waiting for the red light.." % n)
event.wait()
def car2(n):
while 1 :
time.sleep(random.randrange( 10 ))
if event.isSet(): #绿灯
print ( "car [%s] is running.." % n)
else :
print ( "car [%s] is waiting for the red light.." % n)
if __name__ = = ‘__main__‘ :
event = threading.Event()
Light = threading.Thread(target = light)
Light.start()
for i in range ( 3 ):
t = threading.Thread(target = car,args = (i,))
t.start()
|
queue队列
Python中对队列和线程的操作,需要使用模块:Queue 和 threading。其中,Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
生产者消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#!/usr/bin/env python
import queue
import time
import threading
q = queue.Queue()
def consumer(num):
while True :
time.sleep( 1 )
print ( ‘consumer %s get task:%s‘ % (num,q.get()))
q.task_done()
def producer(num):
count = 1
while True :
print ( ‘producer %s produced a new task:%s‘ % (num,count))
q.put(count)
count + = 1
q.join()
print ( ‘all tasks has been consumer by consumers‘ )
c1 = threading.Thread(target = consumer,args = [ 1 ,])
c2 = threading.Thread(target = consumer,args = [ 2 ,])
c3 = threading.Thread(target = consumer,args = [ 3 ,])
p1 = threading.Thread(target = producer,args = [ ‘hetan‘ ,])
p2 = threading.Thread(target = producer,args = [ ‘liuyao‘ ,])
p3 = threading.Thread(target = producer,args = [ ‘xxxx‘ ,])
c1.start()
c2.start()
c3.start()
p1.start()
p2.start()
p3.start()
|
Python 进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def f(name):
time.sleep( 2 )
print ( ‘hello‘ , name)
if __name__ = = ‘__main__‘ :
p = Process(target = f, args = ( ‘bob‘ ,))
p2 = Process(target = f, args = ( ‘bob‘ ,))
p.start()
p2.start()
p.join()
|
进程间通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
1
2
3
4
5
6
7
8
9
10
11
12
|
#!/usr/bin/env python
from multiprocessing import Process, Queue
def f(q):
q.put([ 42 , None , ‘hello‘ ])
if __name__ = = ‘__main__‘ :
q = Queue()
p = Process(target = f, args = (q,))
p.start()
print (q.get()) # prints "[42, None, ‘hello‘]"
p.join()
|
Pipes
1
2
3
4
5
6
7
8
9
10
11
12
|
from multiprocessing import Process, Pipe
def f(conn):
conn.send([ 42 , None , ‘hello‘ ])
conn.close()
if __name__ = = ‘__main__‘ :
parent_conn, child_conn = Pipe()
p = Process(target = f, args = (child_conn,))
p.start()
print (parent_conn.recv()) # prints "[42, None, ‘hello‘]"
p.join()
|
Managers
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
.
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
from multiprocessing import Process, Manager
def f(d, l):
d[ 1 ] = ‘1‘
d[ ‘2‘ ] = 2
d[ 0.25 ] = None
l.append( 1 )
print (l)
if __name__ = = ‘__main__‘ :
with Manager() as manager:
d = manager. dict ()
l = manager. list ( range ( 5 ))
p_list = []
for i in range ( 10 ):
p = Process(target = f, args = (d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print (d)
print (l)
|
进程同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try :
print ( ‘hello world‘ , i)
finally :
l.release()
if __name__ = = ‘__main__‘ :
lock = Lock()
for num in range ( 10 ):
Process(target = f, args = (lock, num)).start()
|
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
- apply 同步,一般不用
- apply_async 异步,一般用这个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from multiprocessing import Process,Pool,freeze_support
import time
def Foo(i):
time.sleep( 2 )
return i + 100
def Bar(arg):
print ( ‘-->exec done:‘ ,arg)
if __name__ = = ‘__main__‘ :
freeze_support() #windows系统执行需加上,否则会报错
pool = Pool( 5 )
for i in range ( 10 ):
pool.apply_async(func = Foo, args = (i,),callback = Bar)
#pool.apply(func=Foo, args=(i,))
print ( ‘end‘ )
pool.close()
pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
|
Python 协程
协程
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
使用yield实现协程操作例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import time
def consumer(name):
print ( "--->starting eating baozi..." )
while True :
new_baozi = yield
print ( "[%s] is eating baozi %s" % (name,new_baozi))
time.sleep( 1 )
def producer():
r = con.__next__()
r = con2.__next__()
n = 0
while n < 5 :
n + = 1
con.send(n)
con2.send(n)
print ( "\033[32;1m[producer]\033[0m is making baozi %s" % n )
if __name__ = = ‘__main__‘ :
con = consumer( "hetan" )
con2 = consumer( "liuyao" )
p = producer()
|
Greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
#!/usr/bin/env python
from greenlet import greenlet
def test1():
print ( 12 )
gr2.switch()
print ( 34 )
gr2.switch()
def test2():
print ( 56 )
gr1.switch()
print ( 78 )
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
|
Gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import gevent
def foo():
print ( ‘Running in foo‘ )
gevent.sleep( 0 )
print ( ‘Explicit context switch to foo again‘ )
def bar():
print ( ‘Explicit context to bar‘ )
gevent.sleep( 0 )
print ( ‘Implicit context switch back to bar‘ )
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
|
同步与异步的性能区别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import gevent
def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep( 0.5 )
print ( ‘Task %s done‘ % pid)
def synchronous():
for i in range ( 1 , 10 ):
task(i)
def asynchronous():
threads = [gevent.spawn(task, i) for i in range ( 10 )]
gevent.joinall(threads)
print ( ‘Synchronous:‘ )
synchronous()
print ( ‘Asynchronous:‘ )
asynchronous()
|
上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn
。 初始化的greenlet列表存放在数组threads
中,此数组被传给gevent.joinall
函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。
遇到IO阻塞时会自动切换任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from gevent import monkey; monkey.patch_all()
import gevent
from urllib.request import urlopen
def f(url):
print ( ‘GET: %s‘ % url)
resp = urlopen(url)
data = resp.read()
print ( ‘%d bytes received from %s.‘ % ( len (data), url))
gevent.joinall([
gevent.spawn(f, ‘https://www.python.org/‘ ),
gevent.spawn(f, ‘https://www.yahoo.com/‘ ),
gevent.spawn(f, ‘https://github.com/‘ ),
])
|
通过gevent实现单线程下的多socket并发
server端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#!/usr/bin/env python
import gevent
import time
import sys
from gevent import socket,monkey
import socket
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(( ‘0.0.0.0‘ ,port))
s.listen( 300 )
while True :
cli,addr = s.accept()
gevent.spawn(handle_request,cli)
def handle_request(s):
try :
while True :
data = s.recv( 1024 )
print ( ‘recv‘ ,data.decode())
s.send(data)
if not data:
s.shutdown(socket.SHUT_WR)
except Exception as e:
print (e)
finally :
s.close()
if __name__ = = ‘__main__‘ :
server( 8000 )
|
client端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python
import socket
Host = ‘localhost‘
port = 8000
s = socket.socket()
s.connect((Host,port))
while True :
msg = bytes( input ( ‘>>‘ ),encoding = ‘utf8‘ )
s.send(msg)
data = s.recv( 1024 )
print ( ‘recevied‘ , repr (data.decode()))
s.close()
|
论事件驱动与异步IO
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。
在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。
在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。
在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。
当我们面对如下的环境时,事件驱动模型通常是一个好的选择:
- 程序中有许多任务,而且…
- 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
- 在等待事件到来时,某些任务会阻塞。
当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。
网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。
首先列一下,sellect、poll、epoll三者的区别
select
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
poll
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
Python select
Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。
注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.
接下来通过echo server例子要以了解select 是如何通过单进程实现同时处理多个非阻塞的socket连接的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import select
import socket
import sys
import Queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking( 0 )
# Bind the socket to the port
server_address = ( ‘localhost‘ , 10000 )
print >>sys.stderr, ‘starting up on %s port %s‘ % server_address
server.bind(server_address)
# Listen for incoming connections
server.listen( 5 )
|
select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需要创建2个列表来包含输入和输出信息来传给select().
1
2
3
4
5
|
# Sockets from which we expect to read
inputs = [ server ]
# Sockets to which we expect to write
outputs = [ ]
|
所有客户端的进来的连接和数据将会被server的主循环程序放在上面的list中处理,我们现在的server端需要等待连接可写(writable)之后才能过来,然后接收数据并返回(因此不是在接收到数据之后就立刻返回),因为每个连接要把输入或输出的数据先缓存到queue里,然后再由select取出来再发出去。
1
2
|
# Outgoing message queues (socket:Queue)
message_queues = {}
|
下面是此程序的主循环,调用select()时会阻塞和等待直到新的连接和数据进来
1
2
3
4
5
|
while inputs:
# Wait for at least one of the sockets to be ready for processing
print >>sys.stderr, ‘\nwaiting for the next event‘
readable, writable, exceptional = select.select(inputs, outputs, inputs)
|
当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,我们上面将他们分别赋值为readable,writable,exceptional, 所有在readable list中的socket连接代表有数据可接收(recv),所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接,当连接通信出现error时会把error写到exceptional列表中。
Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。
1
2
3
4
5
6
7
8
9
10
11
12
|
# Handle inputs
for s in readable:
if s is server:
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print >>sys.stderr, ‘new connection from‘ , client_address
connection.setblocking( 0 )
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection] = Queue.Queue()
|
第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。
1
2
3
4
5
6
7
8
9
|
else :
data = s.recv( 1024 )
if data:
# A readable client socket has data
print >>sys.stderr, ‘received "%s" from %s‘ % (data, s.getpeername())
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
|
第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。
1
2
3
4
5
6
7
8
9
10
11
|
else :
# Interpret empty result as closed connection
print >>sys.stderr, ‘closing‘ , client_address, ‘after reading no data‘
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
inputs.remove(s) #inputs中也删除掉
s.close() #把这个连接关闭掉
# Remove message queue
del message_queues[s]
|
对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态
1
2
3
4
5
6
7
8
9
10
11
|
# Handle outputs
for s in writable:
try :
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stop checking for writability.
print >>sys.stderr, ‘output queue for‘ , s.getpeername(), ‘is empty‘
outputs.remove(s)
else :
print >>sys.stderr, ‘sending "%s" to %s‘ % (next_msg, s.getpeername())
s.send(next_msg)
|
最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉
1
2
3
4
5
6
7
8
9
10
11
|
# Handle "exceptional conditions"
for s in exceptional:
print >>sys.stderr, ‘handling exceptional condition for‘ , s.getpeername()
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
|
最后服务器端的完整代码如下:
客户端
下面的这个是客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import socket
import sys
messages = [ ‘This is the message. ‘ ,
‘It will be sent ‘ ,
‘in parts.‘ ,
]
server_address = ( ‘localhost‘ , 10000 )
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print >>sys.stderr, ‘connecting to %s port %s‘ % server_address
for s in socks:
s.connect(server_address)
|
接下来通过循环通过每个socket连接给server发送和接收数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
for message in messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, ‘%s: sending "%s"‘ % (s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv( 1024 )
print >>sys.stderr, ‘%s: received "%s"‘ % (s.getsockname(), data)
if not data:
print >>sys.stderr, ‘closing socket‘ , s.getsockname()
|
客户端完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
__author__ = ‘jieli‘
import socket
import sys
messages = [ ‘This is the message. ‘ ,
‘It will be sent ‘ ,
‘in parts.‘ ,
]
server_address = ( ‘localhost‘ , 10000 )
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print >>sys.stderr, ‘connecting to %s port %s‘ % server_address
for s in socks:
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, ‘%s: sending "%s"‘ % (s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv( 1024 )
print >>sys.stderr, ‘%s: received "%s"‘ % (s.getsockname(), data)
if not data:
print >>sys.stderr, ‘closing socket‘ , s.getsockname()
s.close()
|
selectors模块(将select模块封装,调用更简洁)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print ( ‘accepted‘ , conn, ‘from‘ , addr)
conn.setblocking( False )
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv( 1000 ) # Should be ready
if data:
print ( ‘echoing‘ , repr (data), ‘to‘ , conn)
conn.send(data) # Hope it won‘t block
else :
print ( ‘closing‘ , conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(( ‘localhost‘ , 10000 ))
sock.listen( 100 )
sock.setblocking( False )
sel.register(sock, selectors.EVENT_READ, accept)
while True :
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
|
来自为知笔记(Wiz)
时间: 2024-10-19 06:30:28