python小白-day8 线程、进程、协程

Python线程

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。


1

2

3

4

5

6

7

8

9

10

11

12

13

#!/usr/bin/env python

import threading

import time

def show(arg):

    time.sleep(1)

    print(‘thread‘+str(arg))

for i in range(10):

    t = threading.Thread(target=show, args=(i,))

    t.start()

print(‘main thread stop‘)

上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

更多方法:

  • start            线程准备就绪,等待CPU调度
  • setName      为线程设置名称
  • getName      获取线程名称
  • setDaemon   设置为后台线程或前台线程(默认)
                       如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                        如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
  • join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
  • run              线程被cpu调度后执行Thread类对象的run方法

Python GIL(Global Interpreter Lock) 

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

import time

import threading

 

def addNum():

    global num #在每个线程中都获取这个全局变量

    print(‘--get num:‘,num )

    time.sleep(1)

    num  -=1 #对此公共变量进行-1操作

 

num = 100  #设定一个共享变量

thread_list = []

for i in range(100):

    t = threading.Thread(target=addNum)

    t.start()

    thread_list.append(t)

 

for t in thread_list: #等待所有线程执行完毕

    t.join()

 

 

print(‘final num:‘, num )

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁版本


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

import time

import threading

 

def addNum():

    global num #在每个线程中都获取这个全局变量

    print(‘--get num:‘,num )

    time.sleep(1)

    lock.acquire() #修改数据前加锁

    num  -=1 #对此公共变量进行-1操作

    lock.release() #修改后释放

 

num = 100  #设定一个共享变量

thread_list = []

lock = threading.Lock() #生成全局锁

for i in range(100):

    t = threading.Thread(target=addNum)

    t.start()

    thread_list.append(t)

 

for t in thread_list: #等待所有线程执行完毕

    t.join()

 

print(‘final num:‘, num )

RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import threading,time

def run1():

    print("grab the first part data")

    lock.acquire()

    global num

    num +=1

    lock.release()

    return num

def run2():

    print("grab the second part data")

    lock.acquire()

    global  num2

    num2+=1

    lock.release()

    return num2

def run3():

    lock.acquire()

    res = run1()

    print(‘--------between run1 and run2-----‘)

    res2 = run2()

    lock.release()

    print(res,res2)

if __name__ == ‘__main__‘:

    num,num2 = 0,0

    lock = threading.RLock()

    for i in range(10):

        t = threading.Thread(target=run3)

        t.start()

while threading.active_count() != 1:

    print(threading.active_count())

else:

    print(‘----all threads done---‘)

    print(num,num2)


Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import threading,time

def run(n):

    semaphore.acquire()

    time.sleep(1)

    print("run the thread: %s\n" %n)

    semaphore.release()

if __name__ == ‘__main__‘:

    num= 0

    semaphore  = threading.BoundedSemaphore(3) #最多允许5个线程同时运行

    for i in range(20):

        t = threading.Thread(target=run,args=(i,))

        t.start()

while threading.active_count() != 1:

    pass #print threading.active_count()

else:

    print(‘----all threads done---‘)

    print(num)

event

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import threading,time

import random

def light():

    if not event.isSet():

        event.set() #wait就不阻塞 #绿灯状态

    count = 0

    while True:

        if count < 10:

            print(‘\033[42;1m--green light on---\033[0m‘)

        elif count <13:

            print(‘\033[43;1m--yellow light on---\033[0m‘)

        elif count <20:

            if event.isSet():

                event.clear()

            print(‘\033[41;1m--red light on---\033[0m‘)

        else:

            count = 0

            event.set() #打开绿灯

        time.sleep(1)

        count +=1

def car(n): #no bug version

    while 1:

        time.sleep(1)

        if  event.isSet(): #绿灯

            print("car [%s] is running.." % n)

        else:

            print("car [%s] is waiting for the red light.." %n)

            event.wait()

def car2(n):

    while 1:

        time.sleep(random.randrange(10))

        if  event.isSet(): #绿灯

            print("car [%s] is running.." % n)

        else:

            print("car [%s] is waiting for the red light.." %n)

if __name__ == ‘__main__‘:

    event = threading.Event()

    Light = threading.Thread(target=light)

    Light.start()

    for i in range(3):

        t = threading.Thread(target=car,args=(i,))

        t.start()

queue队列

Python中对队列和线程的操作,需要使用模块:Queue 和 threading。其中,Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

生产者消费者模型


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

#!/usr/bin/env python

import queue

import time

import threading

q = queue.Queue()

def consumer(num):

    while True:

        time.sleep(1)

        print(‘consumer %s get task:%s‘%(num,q.get()))

        q.task_done()

def producer(num):

    count = 1

    while True:

        print(‘producer %s produced a new task:%s‘%(num,count))

        q.put(count)

        count += 1

        q.join()

        print(‘all tasks has been consumer by consumers‘)

c1 = threading.Thread(target=consumer,args=[1,])

c2 = threading.Thread(target=consumer,args=[2,])

c3 = threading.Thread(target=consumer,args=[3,])

p1 = threading.Thread(target=producer,args=[‘hetan‘,])

p2 = threading.Thread(target=producer,args=[‘liuyao‘,])

p3 = threading.Thread(target=producer,args=[‘xxxx‘,])

c1.start()

c2.start()

c3.start()

p1.start()

p2.start()

p3.start()


Python 进程


1

2

3

4

5

6

7

8

9

10

11

12

13

14

#!/usr/bin/env python

# -*- coding:utf-8 -*-

from multiprocessing import Process

import time

def f(name):

    time.sleep(2)

    print(‘hello‘, name)

if __name__ == ‘__main__‘:

    p = Process(target=f, args=(‘bob‘,))

    p2 = Process(target=f, args=(‘bob‘,))

    p.start()

    p2.start()

    p.join()

进程间通讯  

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues

使用方法跟threading里的queue差不多


1

2

3

4

5

6

7

8

9

10

11

12

#!/usr/bin/env python

from multiprocessing import Process, Queue

def f(q):

    q.put([42, None, ‘hello‘])

if __name__ == ‘__main__‘:

    q = Queue()

    p = Process(target=f, args=(q,))

    p.start()

    print(q.get())    # prints "[42, None, ‘hello‘]"

    p.join()


Pipes


1

2

3

4

5

6

7

8

9

10

11

12

from multiprocessing import Process, Pipe

def f(conn):

    conn.send([42, None, ‘hello‘])

    conn.close()

if __name__ == ‘__main__‘:

    parent_conn, child_conn = Pipe()

    p = Process(target=f, args=(child_conn,))

    p.start()

    print(parent_conn.recv())   # prints "[42, None, ‘hello‘]"

    p.join()


Managers

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array.

例如:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

from multiprocessing import Process, Manager

def f(d, l):

    d[1] = ‘1‘

    d[‘2‘] = 2

    d[0.25] = None

    l.append(1)

    print(l)

if __name__ == ‘__main__‘:

    with Manager() as manager:

        d = manager.dict()

        l = manager.list(range(5))

        p_list = []

        for i in range(10):

            p = Process(target=f, args=(d, l))

            p.start()

            p_list.append(p)

        for res in p_list:

            res.join()

        print(d)

        print(l)


进程同步


1

2

3

4

5

6

7

8

9

10

11

12

13

14

from multiprocessing import Process, Lock

 

def f(l, i):

    l.acquire()

    try:

        print(‘hello world‘, i)

    finally:

        l.release()

 

if __name__ == ‘__main__‘:

    lock = Lock()

 

    for num in range(10):

        Process(target=f, args=(lock, num)).start()


进程池  

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply              同步,一般不用
  • apply_async   异步,一般用这个

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

from  multiprocessing import Process,Pool,freeze_support

import time

 

def Foo(i):

    time.sleep(2)

    return i+100

 

def Bar(arg):

    print(‘-->exec done:‘,arg)

if __name__ == ‘__main__‘:

    freeze_support()   #windows系统执行需加上,否则会报错

 

    pool = Pool(5)

 

    for i in range(10):

        pool.apply_async(func=Foo, args=(i,),callback=Bar)

        #pool.apply(func=Foo, args=(i,))

 

    print(‘end‘)

    pool.close()

    pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

Python 协程

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

使用yield实现协程操作例子


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

import time

def consumer(name):

    print("--->starting eating baozi...")

    while True:

        new_baozi = yield

        print("[%s] is eating baozi %s" % (name,new_baozi))

        time.sleep(1)

def producer():

    r = con.__next__()

    r = con2.__next__()

    n = 0

    while n < 5:

        n +=1

        con.send(n)

        con2.send(n)

        print("\033[32;1m[producer]\033[0m is making baozi %s" %n )

if __name__ == ‘__main__‘:

    con = consumer("hetan")

    con2 = consumer("liuyao")

    p = producer()

Greenlet


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

#!/usr/bin/env python

from greenlet import greenlet

def test1():

    print(12)

    gr2.switch()

    print(34)

    gr2.switch()

def test2():

    print(56)

    gr1.switch()

    print(78)

gr1 = greenlet(test1)

gr2 = greenlet(test2)

gr1.switch()

Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

import gevent

def foo():

    print(‘Running in foo‘)

    gevent.sleep(0)

    print(‘Explicit context switch to foo again‘)

def bar():

    print(‘Explicit context to bar‘)

    gevent.sleep(0)

    print(‘Implicit context switch back to bar‘)

gevent.joinall([

    gevent.spawn(foo),

    gevent.spawn(bar),

])

同步与异步的性能区别 


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import gevent

def task(pid):

    """

    Some non-deterministic task

    """

    gevent.sleep(0.5)

    print(‘Task %s done‘ % pid)

def synchronous():

    for i in range(1,10):

        task(i)

def asynchronous():

    threads = [gevent.spawn(task, i) for i in range(10)]

    gevent.joinall(threads)

print(‘Synchronous:‘)

synchronous()

print(‘Asynchronous:‘)

asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。  

遇到IO阻塞时会自动切换任务


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

from gevent import monkey; monkey.patch_all()

import gevent

from  urllib.request import urlopen

def f(url):

    print(‘GET: %s‘ % url)

    resp = urlopen(url)

    data = resp.read()

    print(‘%d bytes received from %s.‘ % (len(data), url))

gevent.joinall([

        gevent.spawn(f, https://www.python.org/),

        gevent.spawn(f, https://www.yahoo.com/),

        gevent.spawn(f, https://github.com/),

])

通过gevent实现单线程下的多socket并发

server端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

#!/usr/bin/env python

import gevent

import time

import sys

from gevent import socket,monkey

import socket

monkey.patch_all()

def server(port):

    s = socket.socket()

    s.bind((‘0.0.0.0‘,port))

    s.listen(300)

    while True:

        cli,addr = s.accept()

        gevent.spawn(handle_request,cli)

def handle_request(s):

    try:

        while True:

            data = s.recv(1024)

            print(‘recv‘,data.decode())

            s.send(data)

            if not data:

                s.shutdown(socket.SHUT_WR)

    except Exception as e:

        print(e)

    finally:

        s.close()

if __name__ == ‘__main__‘:

    server(8000)

client端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

#!/usr/bin/env python

import socket

Host = ‘localhost‘

port = 8000

s = socket.socket()

s.connect((Host,port))

while True:

    msg = bytes(input(‘>>‘),encoding=‘utf8‘)

    s.send(msg)

    data = s.recv(1024)

    print(‘recevied‘,repr(data.decode()))

s.close()

论事件驱动与异步IO

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。


在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

  1. 程序中有许多任务,而且…
  2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
  3. 在等待事件到来时,某些任务会阻塞。

当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

首先列一下,sellect、poll、epoll三者的区别 
select 
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

Python select 

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.

接下来通过echo server例子要以了解select 是如何通过单进程实现同时处理多个非阻塞的socket连接的:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

import select

import socket

import sys

import Queue

 

# Create a TCP/IP socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.setblocking(0)

 

# Bind the socket to the port

server_address = (‘localhost‘, 10000)

print >>sys.stderr, ‘starting up on %s port %s‘ % server_address

server.bind(server_address)

 

# Listen for incoming connections

server.listen(5)

select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需要创建2个列表来包含输入和输出信息来传给select().


1

2

3

4

5

# Sockets from which we expect to read

inputs = [ server ]

# Sockets to which we expect to write

outputs = [ ]

所有客户端的进来的连接和数据将会被server的主循环程序放在上面的list中处理,我们现在的server端需要等待连接可写(writable)之后才能过来,然后接收数据并返回(因此不是在接收到数据之后就立刻返回),因为每个连接要把输入或输出的数据先缓存到queue里,然后再由select取出来再发出去。


1

2

# Outgoing message queues (socket:Queue)

message_queues = {}

下面是此程序的主循环,调用select()时会阻塞和等待直到新的连接和数据进来


1

2

3

4

5

while inputs:

    # Wait for at least one of the sockets to be ready for processing

    print >>sys.stderr, ‘\nwaiting for the next event‘

    readable, writable, exceptional = select.select(inputs, outputs, inputs)

当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,我们上面将他们分别赋值为readable,writable,exceptional, 所有在readable list中的socket连接代表有数据可接收(recv),所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接,当连接通信出现error时会把error写到exceptional列表中。

Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。


1

2

3

4

5

6

7

8

9

10

11

12

# Handle inputs

for s in readable:

 

    if s is server:

        # A "readable" server socket is ready to accept a connection

        connection, client_address = s.accept()

        print >>sys.stderr, ‘new connection from‘, client_address

        connection.setblocking(0)

        inputs.append(connection)

 

        # Give the connection a queue for data we want to send

        message_queues[connection] = Queue.Queue()

第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。


1

2

3

4

5

6

7

8

9

else:

     data = s.recv(1024)

     if data:

         # A readable client socket has data

         print >>sys.stderr, ‘received "%s" from %s‘ % (data, s.getpeername())

         message_queues[s].put(data)

         # Add output channel for response

         if s not in outputs:

             outputs.append(s)

第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。


1

2

3

4

5

6

7

8

9

10

11

else:

    # Interpret empty result as closed connection

    print >>sys.stderr, ‘closing‘, client_address, ‘after reading no data‘

    # Stop listening for input on the connection

    if s in outputs:

        outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉

    inputs.remove(s)    #inputs中也删除掉

    s.close()           #把这个连接关闭掉

 

    # Remove message queue

    del message_queues[s]

对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态


1

2

3

4

5

6

7

8

9

10

11

# Handle outputs

for s in writable:

    try:

        next_msg = message_queues[s].get_nowait()

    except Queue.Empty:

        # No messages waiting so stop checking for writability.

        print >>sys.stderr, ‘output queue for‘, s.getpeername(), ‘is empty‘

        outputs.remove(s)

    else:

        print >>sys.stderr, ‘sending "%s" to %s‘ % (next_msg, s.getpeername())

        s.send(next_msg)

最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉


1

2

3

4

5

6

7

8

9

10

11

# Handle "exceptional conditions"

for s in exceptional:

    print >>sys.stderr, ‘handling exceptional condition for‘, s.getpeername()

    # Stop listening for input on the connection

    inputs.remove(s)

    if s in outputs:

        outputs.remove(s)

    s.close()

 

    # Remove message queue

    del message_queues[s]

最后服务器端的完整代码如下




客户端

下面的这个是客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import socket

import sys

 

messages = [ ‘This is the message. ‘,

             ‘It will be sent ‘,

             ‘in parts.‘,

             ]

server_address = (‘localhost‘, 10000)

 

# Create a TCP/IP socket

socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),

          socket.socket(socket.AF_INET, socket.SOCK_STREAM),

          ]

 

# Connect the socket to the port where the server is listening

print >>sys.stderr, ‘connecting to %s port %s‘ % server_address

for s in socks:

    s.connect(server_address)

接下来通过循环通过每个socket连接给server发送和接收数据。


1

2

3

4

5

6

7

8

9

10

11

12

13

for message in messages:

 

    # Send messages on both sockets

    for s in socks:

        print >>sys.stderr, ‘%s: sending "%s"‘ % (s.getsockname(), message)

        s.send(message)

 

    # Read responses on both sockets

    for s in socks:

        data = s.recv(1024)

        print >>sys.stderr, ‘%s: received "%s"‘ % (s.getsockname(), data)

        if not data:

            print >>sys.stderr, ‘closing socket‘, s.getsockname()

客户端完整代码如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

__author__ = ‘jieli‘

import socket

import sys

 

messages = [ ‘This is the message. ‘,

             ‘It will be sent ‘,

             ‘in parts.‘,

             ]

server_address = (‘localhost‘, 10000)

 

# Create a TCP/IP socket

socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),

          socket.socket(socket.AF_INET, socket.SOCK_STREAM),

          ]

 

# Connect the socket to the port where the server is listening

print >>sys.stderr, ‘connecting to %s port %s‘ % server_address

for s in socks:

    s.connect(server_address)

 

for message in messages:

 

    # Send messages on both sockets

    for s in socks:

        print >>sys.stderr, ‘%s: sending "%s"‘ % (s.getsockname(), message)

        s.send(message)

 

    # Read responses on both sockets

    for s in socks:

        data = s.recv(1024)

        print >>sys.stderr, ‘%s: received "%s"‘ % (s.getsockname(), data)

        if not data:

            print >>sys.stderr, ‘closing socket‘, s.getsockname()

            s.close()

selectors模块(将select模块封装,调用更简洁)


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import selectors

import socket

 

sel = selectors.DefaultSelector()

 

def accept(sock, mask):

    conn, addr = sock.accept()  # Should be ready

    print(‘accepted‘, conn, ‘from‘, addr)

    conn.setblocking(False)

    sel.register(conn, selectors.EVENT_READ, read)

 

def read(conn, mask):

    data = conn.recv(1000# Should be ready

    if data:

        print(‘echoing‘, repr(data), ‘to‘, conn)

        conn.send(data)  # Hope it won‘t block

    else:

        print(‘closing‘, conn)

        sel.unregister(conn)

        conn.close()

 

sock = socket.socket()

sock.bind((‘localhost‘, 10000))

sock.listen(100)

sock.setblocking(False)

sel.register(sock, selectors.EVENT_READ, accept)

 

while True:

    events = sel.select()

    for key, mask in events:

        callback = key.data

        callback(key.fileobj, mask)

来自为知笔记(Wiz)

时间: 2024-10-19 06:30:28

python小白-day8 线程、进程、协程的相关文章

15.python并发编程(线程--进程--协程)

一.进程:1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程2.组成:进程一般由程序,数据集,进程控制三部分组成:(1)程序:用来描述进程要完成哪些功能以及如何完成(2)数据集:是程序在执行过程中所需要使用的一切资源(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志.3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的二.线程:1.定义:最小的执行单位,线程的出现是为了

Python并发编程(线程队列,协程,Greenlet,Gevent)

线程队列 线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢? queue队列 :使用import queue,用法与进程Queue一样 queue is especially useful in threaded programming when information must be exchanged safely between multiple threads. class queue.Queue(maxsize=0) #先进先出 import queue #不需要通过

Python的线程&amp;进程&amp;协程[0] -&gt; 基本概念

基本概念 / Basic Concept 0 简介与动机 / Why Multi-Thread/Multi-Process/Coroutine 在多线程(multithreaded, MT)编程出现之前,计算机程序的执行是由单个步骤序列组成的,该序列在主机的CPU中按照同步顺序执行.即无论任务多少,是否包含子任务,都要按照顺序方式进行. 然而,假定子任务之间相互独立,没有因果关系,若能使这些独立的任务同时运行,则这种并行处理方式可以显著提高整个任务的性能,这便是多线程编程. 而对于Python而

4月28日 python学习总结 线程与协程

一. 异步与回调机制 问题: 1.任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理 2.解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s 解决一: (线程实现异步,回调解析结果) from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import requests import os i

11 线程进程协程

1 线程 1.1 基本应用 1.1.1 标准线程(常用) import threading def f1(arg): print(arg) t = threading.Thread(target=f1, args=(123,)) t.start() 1.1.2 自定义线程 自定义线程类既threading.Thread流程,自定义run方法 import threading class MyThread(threading.Thread): #自定义类,继承threading.Thread类 d

线程 进程 协程

一.什么是线程? 线程是操作系统能够进行运算调度的最小单位(程序执行流的最小单元).它被包含在进程之中,是进程中的实际运作单位.一条线程指的是一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务. 一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成.另外,线程是进程的中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源.一个线程可以创建和撤销另一

Python的线程&amp;进程&amp;协程[0] -&gt; 线程 -&gt; 多线程的建立与使用

常用的多线程功能实现 目录 生成线程的三种方法 单线程与多线程对比 守护线程的设置 1 生成线程的三种方法 三种方式分别为: 创建一个Thread实例,传给它一个函数 创建一个Thread实例,传给它一个可调用的类实例 派生Thread的子类,并创建子类的实例 # There are three ways to create a thread # The first is create a thread instance, and pass a function # The second one

Python的线程&amp;进程&amp;协程[0] -&gt; 线程 -&gt; 多线程锁的使用

锁与信号量 目录 添加线程锁 锁的本质 互斥锁与可重入锁 死锁的产生 锁的上下文管理 信号量与有界信号量 1 添加线程锁 由于多线程对资源的抢占顺序不同,可能会产生冲突,通过添加线程锁来对共有资源进行控制. 1 import atexit 2 from random import randrange 3 from threading import Thread, Lock, current_thread # or currentThread 4 from time import ctime, s

Python的线程&amp;进程&amp;协程[1] -&gt; 线程 -&gt; 多线程的控制方式

多线程的控制方式 目录 唤醒单个线程等待 唤醒多个线程等待 条件函数等待 事件触发标志 函数延迟启动 设置线程障碍 1 唤醒单个线程等待 Condition类相当于一把高级的锁,可以进行一些复杂的线程同步控制.一般Condition内部都有一把内置的锁对象(默认为RLock),对于Condition的使用主要有以下步骤: 建立两个线程对象,及Condition对象; 线程1首先获取Condition的锁权限,acquire(); 线程1执行需要完成的任务后,调用等待wait(),此时,线程1会阻