Tornado 高并发源码分析之四--- HTTPServer 与 TCPServer 对象

主要工作:

服务器启动的时候做的事:

1、把包含了各种配置信息的 application 对象封装到了 HttpServer 对象的 request_callback 字段中,等待被调用

2、TCPServer 通过 listen 方法启动端口监听, 封装_handle_connection回调函数,并注册到 IOLoop 中

服务器运行时做的事:

3、当有请求到来时,注册在 IOLoop 中的 _handle_connection 将会被调用, _handle_connection 方法将会调用handle_stream 方法。

4、handle_stream 方法是由 HTTPServer 重写TCPServer的方法,它将会创建 HTTP1ServerConnection对象和_ServerRequestAdapter 对象(这里只是创建好,并没有执行),并调用 HTTP1ServerConnection 对象的start_serving 方法

5、start_serving 方法创建创建HTTP1Connection 对象,并在方法 _server_request_loop 中异步yield conn.read_response(request_delegate) 接受请求发过来的数据, 这里传入的delegate就是在HTTPServer 中创建的_ServerRequestAdapter对象。

4、在异步接收的时候,_ServerRequestAdapter 负责将数据封装成 HTTPRequest 对象, 接收完毕之后,调用_ServerRequestAdapter的 finish 方法

5、在调用_ServerRequestAdapter 的 finish 方法时,数据就会调用 application 对象的 __call__ 方法, 这时就回到了 Application 类了(这里不解释,直接看Application 的 __call__ 方法)

1、TCPServer 作为工厂类,自身只做统一的端口绑定、监听、回调函数绑定的操作

2、HTTPServer 作为子类,实现数据接收类的创建,接受数据,最后封装成 HTTPRequest 对象,交给Application 对象

3、在整个接收数据的过程中,并不能分辨出 url 是什么,该匹配哪个 handler, 这件事是由 Application 对象来处理的

源码分析,省略部分源码,只粘贴出主要部分

 1 class HTTPServer(TCPServer, httputil.HTTPServerConnectionDelegate):
 2 #继承自TCPServer 和 httputil.HTTPServerConnectionDelegate
 3
 4 def __init__(self, request_callback, no_keep_alive=False, io_loop=None,
 5              xheaders=False, ssl_options=None, protocol=None,
 6              decompress_request=False,
 7              chunk_size=None, max_header_size=None,
 8              idle_connection_timeout=None, body_timeout=None,
 9              max_body_size=None, max_buffer_size=None):
10     self.request_callback = request_callback   #获取 application对象
11     self.no_keep_alive = no_keep_alive
12     self.xheaders = xheaders
13     self.protocol = protocol
14     self.conn_params = HTTP1ConnectionParameters(   #获取基本的http 参数
15         decompress=decompress_request,
16         chunk_size=chunk_size,
17         max_header_size=max_header_size,
18         header_timeout=idle_connection_timeout or 3600,
19         max_body_size=max_body_size,
20         body_timeout=body_timeout)
21     TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,   传递HTTPServer 给TCPServer对象,初始化
22                        max_buffer_size=max_buffer_size,
23                        read_chunk_size=chunk_size)
24     self._connections = set()
25
26 def handle_stream(self, stream, address):
27  #请求到来时,真正的数据处理方法
28     context = _HTTPRequestContext(stream, address,
29                                   self.protocol)
30     conn = HTTP1ServerConnection(   #创建HTTP1ServerConnection对象,用来获取链接
31         stream, self.conn_params, context)
32     self._connections.add(conn)
33     conn.start_serving(self)   #启动获取数据服务, 传递自己为代理
34
35 def start_request(self, server_conn, request_conn): #将会在handle_stream添加进去的代理中执行
36     return _ServerRequestAdapter(self, request_conn)   #创建_ServerRequestAdapter对象, 开始接受封装数据

HTTPServer

 1 class _ServerRequestAdapter(httputil.HTTPMessageDelegate):
 2     """负责数据流整理,将数据最后封装成 HTTPRequest对象
 3     """
 4     def __init__(self, server, connection):
 5         self.server = server
 6         self.connection = connection
 7         self.request = None
 8         if isinstance(server.request_callback, httputil.HTTPServerConnectionDelegate):
 9             self.delegate = server.request_callback.start_request(connection)
10             self._chunks = None
11         else:
12             self.delegate = None
13             self._chunks = []
14
15     def headers_received(self, start_line, headers):   #接受请求头部数据
16         if self.server.xheaders:
17             self.connection.context._apply_xheaders(headers)
18         if self.delegate is None:
19             self.request = httputil.HTTPServerRequest(  #创建HTTPServerRequest对象
20                 connection=self.connection, start_line=start_line,
21                 headers=headers)
22         else:
23             return self.delegate.headers_received(start_line, headers)
24
25     def data_received(self, chunk):  #不停的接受数据,放到_chunks里面
26         if self.delegate is None:
27             self._chunks.append(chunk)
28         else:
29             return self.delegate.data_received(chunk)
30
31     def finish(self):  #request_callback本身就是一个application对象,在加上括号,那就是执行这个对象,也是就是调用application 的 __call__方法,并传入了封装好的request对象
32         if self.delegate is None:
33             self.request.body = b‘‘.join(self._chunks)
34             self.request._parse_body()
35             self.server.request_callback(self.request)
36         else:
37             self.delegate.finish()
38         self._cleanup()

_ServerRequestAdapter

 1 TCPServer 启动一个 Socket 服务端口监听有三中方式
 2 1. `listen`: 单进程启动一个监听
 3         server = TCPServer()
 4         server.listen(8888)
 5         IOLoop.instance().start()
 6
 7 2. `bind`/`start`: 多进程启动一个监听,取决于start(1)的进程数
 8         server = TCPServer()
 9         server.bind(8888)
10         server.start(0)  # Forks multiple sub-processes
11         IOLoop.instance().start()
12
13 3. `add_sockets`: 另外一种方式启动多进程
14         sockets = bind_sockets(8888)
15         tornado.process.fork_processes(0)
16         server = TCPServer()
17         server.add_sockets(sockets)
18         IOLoop.instance().start()
19
20 class TCPServer(object):
21
22 def listen(self, port, address=""):   #单进程启动
23     sockets = bind_sockets(port, address=address)    #绑定 socket 监听端口
24     self.add_sockets(sockets)      #添加监听
25
26 def add_sockets(self, sockets):
27     if self.io_loop is None:
28         self.io_loop = IOLoop.current()  #获取 IOLoop 单例
29     for sock in sockets:
30         self._sockets[sock.fileno()] = sock
31  #将封装好的 self._handle_connection 回调函数与sock对象一起绑定到IOLoop 中, 这一步很重要,_handle_connection 是封装之后的回调函数,在 IOLoop 中会回调它
32         add_accept_handler(sock, self._handle_connection,   io_loop=self.io_loop)  #add_accept_handler方法封装在netutil 文件中
33
34 def _handle_connection(self, connection, address):
35  #创建IOStream对象,并封装回调函数, 这个方法并不会在这里执行,只是在这里封装好之后,被当做参数传递与IOLoop 绑定 .......
36     try:
37         if self.ssl_options is not None:   #https 相关
38             stream = SSLIOStream(connection, io_loop=self.io_loop,
39                                  max_buffer_size=self.max_buffer_size,
40                                  read_chunk_size=self.read_chunk_size)
41         else:
42             stream = IOStream(connection, io_loop=self.io_loop,    #创建一个 IOStream, 主要用来读取HTTP 数据流
43                               max_buffer_size=self.max_buffer_size,
44                               read_chunk_size=self.read_chunk_size)
45  #调用继承至 TCPServer的类的handle_stream方法(也就是HTTPServer 的handle_stream 方法),传入IOStream和address,
46         self.handle_stream(stream, address)
47     except Exception:
48         app_log.error("Error in connection callback", exc_info=True)
49
50 #*******下面两个方法是用于多进程的方式启动 TCPServer 可以不看********
51 def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128):
52         #绑定socket
53     sockets = bind_sockets(port, address=address, family=family,
54                            backlog=backlog)
55     if self._started:
56         self.add_sockets(sockets)
57     else:
58         self._pending_sockets.extend(sockets)
59
60 def start(self, num_processes=1):
61 #多进程方式启动TCPServer,也就是一个 tornado 运行时,启动多个进程,同时监听一个端口,一般不推荐这么使用,而是采用 Supervisor 启动多个tornado进程,分别监听不同的端口
62     assert not self._started
63     self._started = True
64     if num_processes != 1:
65         process.fork_processes(num_processes)   #fork 子进程
66     sockets = self._pending_sockets
67     self._pending_sockets = []
68     self.add_sockets(sockets)

TCPServer

时间: 2024-11-07 22:42:09

Tornado 高并发源码分析之四--- HTTPServer 与 TCPServer 对象的相关文章

Tornado 高并发源码分析之三--- Application 对象

Application 对象主要工作: 服务器启动时: 1.在新建一个app的时候,根据设置好的 URL 和回调函数 Handler 封装成URLSpec 对象 服务器运行时: 2.在请求到来,将 HTTPServer 封装好的HTTPRequest 传入_RequestDispatcher对象,_RequestDispatcher对象根据传入的 HTTPRequest 使用URLSpec解析匹 match 正则匹配找到对应的 RequestHandler ,执行它的 _execute 方法 A

Tornado 高并发源码分析之一---启动一个web服务

前言: 启动一个tornado 服务器基本代码 1 class HomeHandler(tornado.web.RequestHandler): #创建 RequesHandler 对象,处理接收到的 http 请求 2 def get(self): 3 entries = self.db.query("SELECT * FROM entries ORDER BY published DESC LIMIT 5") 4 if not entries: 5 self.redirect(&q

Tornado 高并发源码分析之二---Tornado启动和请求处理流程

Tornado 服务器启动流程 因为Tornado 里使用了很多传类的方式,也就是delegate,之所以要这么做,其实和 iOS 开发那样,也很多的 delegate, 如此来实现高度解耦,但是比较绕,所以建议: 1.先浏览一遍启动流程,再看源码 2.在看一遍请求到来时的处理流程,再看源码 备注: 流程图是xmind 编辑的,好像这里无法上传源文件,所以只能把图片下载下来看了,会没那么清晰

Tornado 高并发源码分析之六---异步编程的几种实现方式

方式一:通过线程池或者进程池 导入库futures是python3自带的库,如果是python2,需要pip安装future这个库 备注:进程池和线程池写法相同 1 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor 2 from tornado.concurrent import run_on_executor 3 4 def doing(s): 5 print('xiumian--{}'.format(

zg手册 之 python2.7.7源码分析(3)-- list 对象和 dict 对象

list 对象 list 对象的定义 list对象内部是使用数组实现,在数组中存储的是指针,指向要保存的对象. allocated是list中数组的大小,ob_size是当前已经使用的数组大小. typedef struct {     // 可变长对象中有 ob_size,保存当前已经使用的数组大小     PyObject_VAR_HEAD     PyObject **ob_item; // 数组的指针     Py_ssize_t allocated; // 分配的数组长度 } PyLi

libevent源码分析:http-server例子

http-server例子是libevent提供的一个简单web服务器,实现了对静态网页的处理功能. 1 /* 2 * gcc -g -o http-server http-server.c -levent 3 */ 4 #include <stdio.h> 5 #include <stdlib.h> 6 #include <string.h> 7 8 #include <sys/types.h> 9 #include <sys/stat.h>

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

jQuery.buildFragment源码分析以及在构造jQuery对象的作用

这个方法在jQuery源码中比较靠后的位置出现,主要用于两处.1是构造jQuery对象的时候使用 2.是为DOM操作提供底层支持,这也就是为什么先学习它的原因.之前的随笔已经分析过jQuery的构造函数了,也提到了有12个分支,其中有一个分支就是通过jQuery.buildFragment方法来处理的,什么情况呢?就是在处理复杂html标签的时候,例如$('<div>123</div>')这样的形式,在构造函数内部通过ret变量判断是不是简单标签,如果是就调用js的createEl

Tornado Etag实现源码分析

Etag(URL的Entity Tag): 对于具体Etag是什么,请求流程,实现原理,这里不进行介绍,可以参考下面链接: http://www.oschina.net/question/234345_42536?sort=time https://zh.wikipedia.org/wiki/HTTP_ETag Tornado实现分析: 先从Tornado处理一个请求的调用顺序开始看(摘自文档:http://www.tornadoweb.cn/documentation): 程序为每一个请求创建