Tornado异步非阻塞的使用以及原理

Tornado 和现在的主流 Web 服务器框架(包括大多数 Python 的框架)有着明显的区别:它是非阻塞式服务器,而且速度相当快。得利于其 非阻塞的方式和对 epoll 的运用,Tornado 每秒可以处理数以千计的连接,这意味着对于实时 Web 服务来说,Tornado 是一个理想的 Web 框架。

一、Tornado的两种模式使用

1.同步阻塞模式

由于doing中sleep10秒,此时其他连接将被阻塞,必须等这次请求完成后其他请求才能连接成功。

1 import tornado.ioloop
 2 import tornado.web
 3
 4
 5 class MainHandler(tornado.web.RequestHandler):
 6     def get(self):
 7         self.doing()
 8         self.write("Hello, world")
 9
10     def doing(self):
11         time.sleep(10)
12
13
14 application = tornado.web.Application([
15     (r"/index", MainHandler),
16 ])
17
18
19 if __name__ == "__main__":
20     application.listen(8888)
21     tornado.ioloop.IOLoop.instance().start()

同步阻塞

2.异步非阻塞模式

1、基本使用

装饰器 + Future 从而实现Tornado的异步非阻塞:

当发送GET请求时,由于方法被@gen.coroutine装饰且yield 一个 Future对象,那么Tornado会等待,等待用户向future对象中放置数据或者发送信号,如果获取到数据或信号之后,就开始执行doing方法。异步非阻塞体现在当在Tornaod等待用户向future对象中放置数据时,还可以处理其他请求。

注意:在等待用户向future对象中放置数据或信号时,此连接是不断开的。

 1 class AsyncHandler(tornado.web.RequestHandler):
 2     @gen.coroutine
 3     def get(self):
 4         future = Future()
 5         tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.doing)
 6         yield future
 7
 8
 9     def doing(self, *args, **kwargs):
10         self.write(‘async‘)
11         self.finish()

异步非阻塞

2、httpclient类库

当服务器接到的请求需要向第三方服务器发送请求才能解决时,Tornado提供了httpclient类库用于发送Http请求,其配合Tornado的异步非阻塞使用。

 1 1 import tornado.web
 2  2 from tornado import gen
 3  3 from tornado import httpclient
 4  4
 5  5 # 方式一:
 6  6 class AsyncHandler(tornado.web.RequestHandler):
 7  7     @gen.coroutine
 8  8     def get(self, *args, **kwargs):
 9  9         print(‘进入‘)
10 10         http = httpclient.AsyncHTTPClient()
11 11         data = yield http.fetch("http://www.google.com")
12 12         print(‘完事‘,data)
13 13         self.finish(‘6666‘)
14 14
15 15 # 方式二:
16 16 # class AsyncHandler(tornado.web.RequestHandler):
17 17 #     @gen.coroutine
18 18 #     def get(self):
19 19 #         print(‘进入‘)
20 20 #         http = httpclient.AsyncHTTPClient()
21 21 #         yield http.fetch("http://www.google.com", self.done)
22 22 #
23 23 #     def done(self, response):
24 24 #         print(‘完事‘)
25 25 #         self.finish(‘666‘)
26 26
27 27
28 28
29 29 application = tornado.web.Application([
30 30     (r"/async", AsyncHandler),
31 31 ])
32 32
33 33 if __name__ == "__main__":
34 34     application.listen(8888)
35 35     tornado.ioloop.IOLoop.instance().start() 

异步发送request请求

二、Tornado异步非阻塞的原理

1、普通同步阻塞服务器框架原理

通过select与socket我们可以开发一个微型的框架,使用select实现IO多路复用监听本地服务端socket。当有客户端发送请求时,select监听的本地socket发生变化,通过socket.accept()得到客户端发送来的conn(也是一个socket),并将conn也添加到select监听列表里。当客户端通过conn发送数据时,服务端select监听列表的conn发生变化,我们将conn发送的数据(请求数据)接收保存并处理得到request_header与request_body,然后可以根据request_header中的url来匹配本地路由中的url,然后得到对应的view函数,然后将view的返回值(一般为字符串)通过conn发送回请求客户端,然后将conn关闭,并且移除select监听列表中的conn,这样一次网络IO请求便算结束。

  1 import socket
  2 import select
  3
  4 class HttpRequest(object):
  5     """
  6     用户封装用户请求信息
  7     """
  8     def __init__(self, content):
  9         """
 10
 11         :param content:用户发送的请求数据:请求头和请求体
 12         """
 13         self.content = content
 14
 15         self.header_bytes = bytes()
 16         self.body_bytes = bytes()
 17
 18         self.header_dict = {}
 19
 20         self.method = ""
 21         self.url = ""
 22         self.protocol = ""
 23
 24         self.initialize()
 25         self.initialize_headers()
 26
 27     def initialize(self):
 28
 29         temp = self.content.split(b‘\r\n\r\n‘, 1)
 30         if len(temp) == 1:
 31             self.header_bytes += temp
 32         else:
 33             h, b = temp
 34             self.header_bytes += h
 35             self.body_bytes += b
 36
 37     @property
 38     def header_str(self):
 39         return str(self.header_bytes, encoding=‘utf-8‘)
 40
 41     def initialize_headers(self):
 42         headers = self.header_str.split(‘\r\n‘)
 43         first_line = headers[0].split(‘ ‘)
 44         if len(first_line) == 3:
 45             self.method, self.url, self.protocol = headers[0].split(‘ ‘)
 46             for line in headers:
 47                 kv = line.split(‘:‘)
 48                 if len(kv) == 2:
 49                     k, v = kv
 50                     self.header_dict[k] = v
 51
 52 # class Future(object):
 53 #     def __init__(self):
 54 #         self.result = None
 55
 56 def main(request):
 57     return "main"
 58
 59 def index(request):
 60     return "indexasdfasdfasdf"
 61
 62
 63 routers = [
 64     (‘/main/‘,main),
 65     (‘/index/‘,index),
 66 ]
 67
 68 def run():
 69     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 70     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 71     sock.bind(("127.0.0.1", 9999,))
 72     sock.setblocking(False)
 73     sock.listen(128)
 74
 75     inputs = []
 76     inputs.append(sock)
 77     while True:
 78         rlist,wlist,elist = select.select(inputs,[],[],0.05)
 79         for r in rlist:
 80             if r == sock:
 81                 """新请求到来"""
 82                 conn,addr = sock.accept()
 83                 conn.setblocking(False)
 84                 inputs.append(conn)
 85             else:
 86                 """客户端发来数据"""
 87                 data = b""
 88                 while True:
 89                     try:
 90                         chunk = r.recv(1024)
 91                         data = data + chunk
 92                     except Exception as e:
 93                         chunk = None
 94                     if not chunk:
 95                         break
 96                 # data进行处理:请求头和请求体
 97                 request = HttpRequest(data)
 98                 # 1. 请求头中获取url
 99                 # 2. 去路由中匹配,获取指定的函数
100                 # 3. 执行函数,获取返回值
101                 # 4. 将返回值 r.sendall(b‘alskdjalksdjf;asfd‘)
102                 import re
103                 flag = False
104                 func = None
105                 for route in routers:
106                     if re.match(route[0],request.url):
107                         flag = True
108                         func = route[1]
109                         break
110                 if flag:
111                     result = func(request)
112                     r.sendall(bytes(result,encoding=‘utf-8‘))
113                 else:
114                     r.sendall(b"404")
115
116                 inputs.remove(r)
117                 r.close()
118
119 if __name__ == ‘__main__‘:
120     run()

自定义同步阻塞框架

2、Tornado异步非阻塞实现原理

tornado通过装饰器 + Future 从而实现异步非阻塞。在view中yield一个future对象,然后再在发送相应数据前判断view函数返回来的数据类型,如果是字符串类型直接返回,如果是future对象,则将返回来的future对象添加到async_request_dict中,先不给客户端返回响应数据(此时可以处理其他客户端的连接请求),等future对象的result有值时再返回,还可以设置超时时间,在规定的时间过后返回响应数据。  !! 关键是future对象,future对象里有result属性,默认为None,当result有值时再返回数据。

  1 import socket
  2 import select
  3 import time
  4
  5 class HttpRequest(object):
  6     """
  7     用户封装用户请求信息
  8     """
  9     def __init__(self, content):
 10         """
 11
 12         :param content:用户发送的请求数据:请求头和请求体
 13         """
 14         self.content = content
 15
 16         self.header_bytes = bytes()
 17         self.body_bytes = bytes()
 18
 19         self.header_dict = {}
 20
 21         self.method = ""
 22         self.url = ""
 23         self.protocol = ""
 24
 25         self.initialize()
 26         self.initialize_headers()
 27
 28     def initialize(self):
 29
 30         temp = self.content.split(b‘\r\n\r\n‘, 1)
 31         if len(temp) == 1:
 32             self.header_bytes += temp
 33         else:
 34             h, b = temp
 35             self.header_bytes += h
 36             self.body_bytes += b
 37
 38     @property
 39     def header_str(self):
 40         return str(self.header_bytes, encoding=‘utf-8‘)
 41
 42     def initialize_headers(self):
 43         headers = self.header_str.split(‘\r\n‘)
 44         first_line = headers[0].split(‘ ‘)
 45         if len(first_line) == 3:
 46             self.method, self.url, self.protocol = headers[0].split(‘ ‘)
 47             for line in headers:
 48                 kv = line.split(‘:‘)
 49                 if len(kv) == 2:
 50                     k, v = kv
 51                     self.header_dict[k] = v
 52
 53 class Future(object):
 54     def __init__(self,timeout=0):
 55         self.result = None
 56         self.timeout = timeout
 57         self.start = time.time()
 58 def main(request):
 59     f = Future(5)
 60     return f
 61
 62 def index(request):
 63     return "indexasdfasdfasdf"
 64
 65
 66 routers = [
 67     (‘/main/‘,main),
 68     (‘/index/‘,index),
 69 ]
 70
 71 def run():
 72     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 73     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 74     sock.bind(("127.0.0.1", 9999,))
 75     sock.setblocking(False)
 76     sock.listen(128)
 77
 78     inputs = []
 79     inputs.append(sock)
 80
 81     async_request_dict = {
 82         # ‘socket‘: futrue
 83     }
 84
 85     while True:
 86         rlist,wlist,elist = select.select(inputs,[],[],0.05)
 87         for r in rlist:
 88             if r == sock:
 89                 """新请求到来"""
 90                 conn,addr = sock.accept()
 91                 conn.setblocking(False)
 92                 inputs.append(conn)
 93             else:
 94                 """客户端发来数据"""
 95                 data = b""
 96                 while True:
 97                     try:
 98                         chunk = r.recv(1024)
 99                         data = data + chunk
100                     except Exception as e:
101                         chunk = None
102                     if not chunk:
103                         break
104                 # data进行处理:请求头和请求体
105                 request = HttpRequest(data)
106                 # 1. 请求头中获取url
107                 # 2. 去路由中匹配,获取指定的函数
108                 # 3. 执行函数,获取返回值
109                 # 4. 将返回值 r.sendall(b‘alskdjalksdjf;asfd‘)
110                 import re
111                 flag = False
112                 func = None
113                 for route in routers:
114                     if re.match(route[0],request.url):
115                         flag = True
116                         func = route[1]
117                         break
118                 if flag:
119                     result = func(request)
120                     if isinstance(result,Future):
121                         async_request_dict[r] = result
122                     else:
123                         r.sendall(bytes(result,encoding=‘utf-8‘))
124                         inputs.remove(r)
125                         r.close()
126                 else:
127                     r.sendall(b"404")
128                     inputs.remove(r)
129                     r.close()
130
131         for conn in async_request_dict.keys():
132             future = async_request_dict[conn]
133             start = future.start
134             timeout = future.timeout
135             ctime = time.time()
136             if (start + timeout) <= ctime :
137                 future.result = b"timeout"
138             if future.result:
139                 conn.sendall(future.result)
140                 conn.close()
141                 del async_request_dict[conn]
142                 inputs.remove(conn)
143
144 if __name__ == ‘__main__‘:
145     run()

自定义异步非阻塞框架

  
  

原文地址:https://www.cnblogs.com/dominik/p/9991158.html

时间: 2024-07-28 15:57:43

Tornado异步非阻塞的使用以及原理的相关文章

初始Tornado异步非阻塞

Tornado  异步非阻塞 from tornado import gen class MainHandler(tornado.web.RequestHandler): @gen.coroutine  #关键点 def get(self): futrue =Future()#关键点 #阻塞内容,必须写这个格式,time.sleep不好使 #tornado.ioloop.IOLoop.current().add_timeout(time.time()+10,self.doing) #关键点 se

Tornado的异步非阻塞

阻塞和非阻塞Web框架 只有Tornado和Node.js是异步非阻塞的,其他所有的web框架都是阻塞式的. Tornado阻塞和非阻塞两种模式都支持. 阻塞式: 代表:Django.Flask.Tornado.Bottle 一个请求到来未处理完成,后续请求则一直等待. 解决方案:多线程或多进程. 异步非阻塞(存在IO请求): 代表:Tornado(默认单进程/单线程) Tornado的阻塞模式示例 from tornado import ioloop from tornado.web impo

200行自定义异步非阻塞Web框架

Python的Web框架中Tornado以异步非阻塞而闻名.本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow. 一.源码 本文基于非阻塞的Socket以及IO多路复用从而实现异步非阻塞的Web框架,其中便是众多异步非阻塞Web框架内部原理. #!/usr/bin/env python # -*- coding:utf-8 -*- import re import socket import select import time class HttpResponse(object)

Tornado----自定义异步非阻塞Web框架:Snow

Python的Web框架中Tornado以异步非阻塞而闻名.本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow. 一.源码 本文基于非阻塞的Socket以及IO多路复用从而实现异步非阻塞的Web框架,其中便是众多异步非阻塞Web框架内部原理. 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import re 4 import socket 5 import select 6 import time 7 8 9 class Ht

使用tornado让你的请求异步非阻塞

http://www.dongwm.com/archives/shi-yong-tornadorang-ni-de-qing-qiu-yi-bu-fei-zu-sai/?utm_source=tuicool&utm_medium=referral 前言 也许有同学很迷惑:tornado不是标榜异步非阻塞解决10K问题的嘛?但是我却发现不是torando不好,而是你用错了.比如最近发现一个事情:某网站打开页面很慢,服务器cpu/内存都正常.网络状态也良好. 后来发现,打开页面会有很多请求后端数据库

异步非阻塞IO的Python Web框架--Tornado

Tornado的全称是Torado Web Server,从名字上就可知它可用作Web服务器,但同时它也是一个Python Web的开发框架.最初是在FriendFeed公司的网站上使用,FaceBook收购之后便进行了开源. 作为Web框架,是一个轻量级的Web框架,类似于另一个Python web 框架Web.py,其拥有异步非阻塞IO的处理方式. 作为Web服务器,Tornado有较为出色的抗负载能力,官方用nginx反向代理的方式部署Tornado和其它Python web应用框架进行对

利用tornado使请求实现异步非阻塞

基本IO模型 网上搜了很多关于同步异步,阻塞非阻塞的说法,理解还是不能很透彻,有必要买书看下. 参考:使用异步 I/O 大大提高应用程序的性能 怎样理解阻塞非阻塞与同步异步的区别? 同步和异步:主要关注消息通信机制(重点在B?). 同步:A调用B,B处理直到获得结果,才返回给A. 异步:A调用B,B直接返回.无需等待结果,B通过状态,通知等来通知A或回调函数来处理. 阻塞和非阻塞:主要关注程序等待(重点在A?). 阻塞:A调用B,A被挂起直到B返回结果给A,A继续执行. 非阻塞:A调用B,A不会

爬虫必备—性能相关(异步非阻塞)

在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢. 1. 同步执行 1 import requests 2 3 def fetch_async(url): 4 response = requests.get(url) 5 return response 6 7 8 url_list = ['http://www.github.com', 'http://www.bing.com'] 9 10 for url in url_list:

转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】

下面这篇,原理理解了, 再结合 这一周来的心得体会,整个框架就差不多了... http://www.haiyun.me/archives/1056.html 有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的.下面记录下分别基于Select/Poll/Epoll的echo server实现.Python Select Server,可监控事件数量有限制: 1 2 3 4 5 6 7 8 9 10 11 12 13 14