转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】

下面这篇,原理理解了,

再结合 这一周来的心得体会,整个框架就差不多了。。。

http://www.haiyun.me/archives/1056.html

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

#!/usr/bin/python

# -*- coding: utf-8 -*-

import select

import socket

import Queue

 

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

server.setblocking(False)

server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)

server_address= (‘192.168.1.5‘,8080)

server.bind(server_address)

server.listen(10)

 

#select轮询等待读socket集合

inputs = [server]

#select轮询等待写socket集合

outputs = []

message_queues = {}

#select超时时间

timeout = 20

 

while True:

    print "等待活动连接......"

    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)

 

    if not (readable or writable or exceptional) :

        print "select超时无活动连接,重新select...... "

        continue;  

    #循环可读事件

    for s in readable :

        #如果是server监听的socket

        if s is server:

            #同意连接

            connection, client_address = s.accept()

            print "新连接: ", client_address

            connection.setblocking(0)

            #将连接加入到select可读事件队列

            inputs.append(connection)

            #新建连接为key的字典,写回读取到的消息

            message_queues[connection] = Queue.Queue()

        else:

            #不是本机监听就是客户端发来的消息

            data = s.recv(1024)

            if data :

                print "收到数据:" , data , "客户端:",s.getpeername()

                message_queues[s].put(data)

                if s not in outputs:

                    #将读取到的socket加入到可写事件队列

                    outputs.append(s)

            else:

                #空白消息,关闭连接

                print "关闭连接:", client_address

                if s in outputs :

                    outputs.remove(s)

                inputs.remove(s)

                s.close()

                del message_queues[s]

    for s in writable:

        try:

            msg = message_queues[s].get_nowait()

        except Queue.Empty:

            print "连接:" , s.getpeername() , ‘消息队列为空‘

            outputs.remove(s)

        else:

            print "发送数据:" , msg , "到", s.getpeername()

            s.send(msg)

     

    for s in exceptional:

        print "异常连接:", s.getpeername()

        inputs.remove(s)

        if s in outputs:

            outputs.remove(s)

        s.close()

        del message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

#!/usr/bin/python

# -*- coding: utf-8 -*-

import socket

import select

import Queue

 

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.setblocking(False)

server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

server_address = ("192.168.1.5", 8080)

server.bind(server_address)

server.listen(5)

print  "服务器启动成功,监听IP:" , server_address

message_queues = {}

#超时,毫秒

timeout = 5000

#监听哪些事件

READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)

READ_WRITE = (READ_ONLY|select.POLLOUT)

#新建轮询事件对象

poller = select.poll()

#注册本机监听socket到等待可读事件事件集合

poller.register(server,READ_ONLY)

#文件描述符到socket映射

fd_to_socket = {server.fileno():server,}

while True:

    print "等待活动连接......"

    #轮询注册的事件集合

    events = poller.poll(timeout)

    if not events:

      print "poll超时,无活动连接,重新poll......"

      continue

    print "有" , len(events), "个新事件,开始处理......"

    for fd ,flag in events:

        s = fd_to_socket[fd]

        #可读事件

        if flag & (select.POLLIN | select.POLLPRI) :

            if s is server :

                #如果socket是监听的server代表有新连接

                connection , client_address = s.accept()

                print "新连接:" , client_address

                connection.setblocking(False)

                 

                fd_to_socket[connection.fileno()] = connection

                #加入到等待读事件集合

                poller.register(connection,READ_ONLY)

                message_queues[connection]  = Queue.Queue()

            else :

                #接收客户端发送的数据

                data = s.recv(1024)

                if data:

                    print "收到数据:" , data , "客户端:" , s.getpeername()

                    message_queues[s].put(data)

                    #修改读取到消息的连接到等待写事件集合

                    poller.modify(s,READ_WRITE)

                else :

                    # Close the connection

                    print "  closing" , s.getpeername()

                    # Stop listening for input on the connection

                    poller.unregister(s)

                    s.close()

                    del message_queues[s]

        #连接关闭事件

        elif flag & select.POLLHUP :

            print " Closing ", s.getpeername() ,"(HUP)"

            poller.unregister(s)

            s.close()

        #可写事件

        elif flag & select.POLLOUT :

            try:

                msg = message_queues[s].get_nowait()

            except Queue.Empty:

                print s.getpeername() , " queue empty"

                poller.modify(s,READ_ONLY)

            else :

                print "发送数据:" , data , "客户端:" , s.getpeername()

                s.send(msg)

        #异常事件

        elif flag & select.POLLERR:

            print "  exception on" , s.getpeername()

            poller.unregister(s)

            s.close()

            del message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

#!/usr/bin/python

# -*- coding: utf-8 -*-

import socket, select

import Queue

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

server_address = ("192.168.1.5", 8080)

serversocket.bind(server_address)

serversocket.listen(1)

print  "服务器启动成功,监听IP:" , server_address

serversocket.setblocking(0)

timeout = 10

#新建epoll事件对象,后续要监控的事件添加到其中

epoll = select.epoll()

#添加服务器监听fd到等待读事件集合

epoll.register(serversocket.fileno(), select.EPOLLIN)

message_queues = {}

fd_to_socket = {serversocket.fileno():serversocket,}

while True:

  print "等待活动连接......"

  #轮询注册的事件集合

  events = epoll.poll(timeout)

  if not events:

     print "epoll超时无活动连接,重新轮询......"

     continue

  print "有" , len(events), "个新事件,开始处理......"

  for fd, event in events:

     socket = fd_to_socket[fd]

     #可读事件

     if event & select.EPOLLIN:

         #如果活动socket为服务器所监听,有新连接

         if socket == serversocket:

            connection, address = serversocket.accept()

            print "新连接:" , address

            connection.setblocking(0)

            #注册新连接fd到待读事件集合

            epoll.register(connection.fileno(), select.EPOLLIN)

            fd_to_socket[connection.fileno()] = connection

            message_queues[connection]  = Queue.Queue()

         #否则为客户端发送的数据

         else:

            data = socket.recv(1024)

            if data:

               print "收到数据:" , data , "客户端:" , socket.getpeername()

               message_queues[socket].put(data)

               #修改读取到消息的连接到等待写事件集合

               epoll.modify(fd, select.EPOLLOUT)

     #可写事件

     elif event & select.EPOLLOUT:

        try:

           msg = message_queues[socket].get_nowait()

        except Queue.Empty:

           print socket.getpeername() , " queue empty"

           epoll.modify(fd, select.EPOLLIN)

        else :

           print "发送数据:" , data , "客户端:" , socket.getpeername()

           socket.send(msg)

     #关闭事件

     elif event & select.EPOLLHUP:

        epoll.unregister(fd)

        fd_to_socket[fd].close()

        del fd_to_socket[fd]

epoll.unregister(serversocket.fileno())

epoll.close()

serversocket.close()

时间: 2024-10-10 16:53:16

转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】的相关文章

一起来写web server 06 -- 单线程非阻塞IO版本

阻塞IO的效率是在是低下,你如果要写高性能的web server的话,你必须使用非阻塞IO. 非阻塞IO的读写 在谈到非阻塞IO之前,必须先谈一谈阻塞IO,在网络编程中,我们假设有一个监听套接字的sockfd,这个sockfd是你调用了socket,listen, bind这一大票的函数得到的一个文件描述符.其实它默认就是阻塞的,具体的表现是: 使用accept函数监听sockfd时,如果没有连接到来,这个函数会一直阻塞在那里. 对sockfd调用recv函数的时候,如果对方还没有发送数据过来,

调侃985_不是我写的,我仅看过,呵呵

1 东方不败是清华, 朝上有人好提拔. (清华大学) 2 最敢放炮是北大, 媒介政坛爱自夸. (北京大学) 3 顾影自怜复旦花, 坐吃山空穷人家. (复旦大学) 4 来者不拒是浙大, 大杂烩里要数他. (浙江大学) 5 咸鱼翻身上交大, 拽着主席抬身价. (上海交通大学) 6 金玉其外是南大, 炮制论文网上挖. (南京大学) 7 党国大学中山大, 海外校友最庞大. (中山大学) 8 自以为是中科大, 崇洋媚外海外爬. (中国科技大学) 9 昙花一现华科大, 怨天尤人没身价. (华中科技大学) 1

一个印度人写的文章,看完惊出一身冷汗,肯定包括你!

我自己也有亲身体会,我们身边人看书的的确不多了.当然也不是说我多么喜欢读书了.个人也是没事喜欢看看电影了,单身狗就不没有逛街这个娱乐了.个人有空常去上海图书馆看书,这里看书的人很多,但是大部分是学生 和 老人,或者是父母陪同孩子了. 近日,一名印度工程师所写<令人忧虑,不阅读的中国人>红遍网络.他说,或许不应过分苛责.但我只是忧虑,如果就此疏远了灵魂,未来的中国可能会为此付出代价. 没事陪朋友到图书馆走一趟,而不是电影院,不是大街上! 文如下: 我在从飞往上海的飞机上.正是长途飞行中的睡眠时间

网页内容只是写给访客看的吗?

内容是网站优化的核心和灵魂.作为一名SEO人员,如果你网站的设计理念只是"网页内容只是写给访客看",那么只能说明你的网站建设思路太老套了,那是老一代的建站思路. 网站建设之初是有运营人员和网站编辑主动去创建内容,来吸引用户.当网站达到一定人气后,则要通过运营策略去刺激用户不断地创作更多的内容,通过不断循环带来新用户,以生产更多的内容.所以对新一代的SEO网站建设的理念是:网页内容不仅要给访客看,更重要的是要写给搜索引擎看.那么,网页内容的编辑需要注意些什么呢? 首先,写给访客看,需要注

用markdown写博客,看这一篇就够了,附markdown文件分享

0. 前言 为什么用markdown写博客? 在写博客的过程中,最大的"痛点"在于写作中总是被"格式"之类的困扰,无法专注于内容写作: 在线写博客,会被网络或者编辑器本身所拖累: 本文介绍的markdown写作,可以使你专注于博客内容本身,写好即成稿: 更重要的是,博客内容及相关素材保存在本地,可以几乎不用修改就可以发布在不同的平台,类似java程序的"一次编写,到处运行". 记得上次用markdown写博客,尽管我有markdown使用经验,但

关于写博客,看博客

每次看到很多人坚持写那么多的好博客,我都会产生怎么自己不写博客的想法.于是乎,在激情的驱使下就会随手写几段文字,然后就不了了之了. 今天又看了cici珵的博客(北大cs本科,博士,美女),再次萌生了写自己的博客的想法. (似乎又要开始写自己这个人怎么怎么样,兴趣广泛,但是做事情不够努力:目标很多,但是不能坚持去做,这些对个人的分析了.反正每次真正着手做一件事情之前总是想分析一下之前怎么怎么做的不好.) 关于cici珵,是昨天刷知乎,看到覃超(覃超大魔王)的知乎专栏文章,关于楼教主从google离

写了一个预约东南大学体育场馆的python脚本,目前刚刚实现功能,后续会继续完善

看到git上有人写了一个自动预约的脚本,正好前段时间在学python爬虫和脚本,索性也写了一个,大佬直接略过. 目前没有做图形化,账号和预约信息也是手动输入的,我也只写了羽毛球和乒乓球,其实就是一个属性的值.嫌麻烦的同学可以写一个文本文件,保存这些信息,然后倒入到脚本里,每次稍作修改就可以了.如果基友固定的话,也可以写死在脚本里. 本篇博客只说下博主写脚本的时候遇到的问题和心路历程,只是想要代码的可以直接忽略下文,代码已经上传github:https://github.com/CooperXxx

RESTful转载,多看几遍就理解了写点自己的看法和理解

要理解资源路由就要理解什么是RESTful.如果一个架构符合REST(即Representational State Transfer的缩写,意为表现层状态转化)原则,就称它为RESTful架构. REST提出了一些设计概念和准则: 1.网络上的所有事物都被抽象为资源(resource):2.每个资源对应一个唯一的资源标识(resource identifier):3.通过通用的连接器接口(generic connector interface)对资源进行操作:4.对资源的各种操作不会改变资源标

310实验室OTL问题----将写好的C++文件转换成Python文件,并将数据可视化

如图:文件夹 第一处:optimizer文件夹下的:optimizer.h文件中添加你所写代码的头文件  #include <OTL/Optimizer/Reference-NSGA-II/Reference-NSGA-II.h> 第二处:在Switch.h文件中添加 代码,格式请模仿文件中的内容 #define EXPORT_Reference_NSGA_II 第三处:因为我们一般编码都是以实数形式编码的,所以修改optimizer.real 下的文件Optimizer.h 文件的内容,格式