深入tornado中的TCPServer

1 梳理:

  应用层的下一层是传输层,而http协议一般是使用tcp的,所以实现tcp的重要性就不言而喻。

  由于tornado中实现了ioloop这个反应器以及iostream这个对连接的异步读写,所以tcp就很容易实现异步。

  在tornado的tcpserver文件中,实现了TCPServer这个类,他是一个单线程的,非阻塞的tcp 服务。

  为了与上层协议(在tornado中就是HTTPServer)交互,TCPServer提供了一个接口:handle_stream, 要求其子类必需实现该方法。

  TCPserver大体上实现了两种启动方式:单进程模式以及多进程模式(多进程模式需要Linux环境)。 因为多进程方式是单进程的复杂版本,所以讲了多进程那么单进程就很好理解了。

下面就开始吧

2 准备知识点

  因为多进程模式需要Linux环境,所以需要对Linux有个基本的了解

  在Linux中,创建一个子进程只需要调用fork()系统调用就可以了,fork调用会返回两次,子进程返回0,父进程返回子进程的pid。然后子进程和父进程继续执行fork调用之后的语句,子进程获得父进程数据空间,堆,栈的完全副本(也就是内存空间是独立的)。因为fork调用之后经常会执行exec,所以Linux一般采用写时复制(copy on write),父进程和子进程共享统一数据空间,只有当某个内存区域被修改时,才将该区域复制为副本。

  另外,尽管父进程打开的文件描述符都“复制”到了子进程,但由于父子进程的文件描述符指向同一个文件表项,所以不管是父进程或者是子进程对文件描述符进行修改,都会反映到子进程或者父进程中。所以可以这么说:父子进程共享文件描述符。

import os
import socket
import fcntl

def set_close_exec(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

a = ‘你好‘
sk = socket.socket()
set_close_exec(sk.fileno())
sk.bind((‘127.0.0.1‘, 8888))
sk.listen(1)

def start_child():
    id = os.fork()
    if id == 0:
        print(‘I am child process (%s) and my parent is %s.‘ % (os.getpid(), os.getppid()))
        print(a)
        print(‘----------‘, sk.fileno())
        return
    else:
        print(‘I (%s) just created a child process (%s).‘ % (os.getpid(), id))

print(‘haha‘)
start_child()
print(‘done‘)

可以通过这段代码简单测验一下fork调用的特性

3 要开车了:

tornado多进程模式启动:

sockets = bind_sockets(8888)
tornado.process.fork_processes(0)
server = TCPServer()
server.add_sockets(sockets)
IOLoop.current().start()

  tornado的多进程处理分为以下几个步骤:

    1  首先创建套接字,然后绑定并监听

    2  执行fork调用,创建子进程(默认创建cpu个数的进程)。

      2.5 fork完成后,父进程与子进程就开始分工了,父进程负责管理子进程(包括当子进程异常退出时,重新fork一个子进程;关闭所有子进程),子进程则开始3、4、5步的操作

    3  启动tcpserver

    4  为所有套接字注册对应的事件以及处理函数   

    5  运行ioloop这个反应器

实际上也就是:

  每一个进程共享套接字(这实际上是个文件描述符),

  每一个子进程都有一个反应器,

  每一个子进程都在反应器上为相同的套接字注册了相同的事件以及相同的处理函数。

那么问题也就来了:

  当某个套接字上要建立连接,实际上每个子进程都能捕获到该事件并执行对应的处理函数,但到底是哪个子进程要执行该操作呢? 当一个进程处理完了该操作,其他子进程该如何做呢?

我们带着以上问题开始剖析:

1  首先创建套接字,然后绑定并监听: sockets = bind_sockets(8888)

bind_sockets()方法位于tornado.netutil文件中,下面来详细剖析一下该方法:

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False):
    """Creates listening sockets bound to the given port and address.

        Returns a list of socket objects (multiple sockets are returned if
        the given address maps to multiple IP addresses, which is most common
        for mixed IPv4 and IPv6 use).

        Address may be either an IP address or hostname.  If it‘s a hostname,
        the server will listen on all IP addresses associated with the
        name.  Address may be an empty string or None to listen on all
        available interfaces.  Family may be set to either `socket.AF_INET`
        or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
        both will be used if available.

        The ``backlog`` argument has the same meaning as for
        `socket.listen() <socket.socket.listen>`.

        ``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like
        ``socket.AI_PASSIVE | socket.AI_NUMERICHOST``.

        ``reuse_port`` option sets ``SO_REUSEPORT`` option for every socket
        in the list. If your platform doesn‘t support this option ValueError will
        be raised.
    """
    if reuse_port and not hasattr(socket, "SO_REUSEPORT"):
        raise ValueError("the platform doesn‘t support SO_REUSEPORT")

    sockets = []
    if address == "":
        address = None
    # address family参数指定调用者期待返回的套接口地址结构的类型。它的值包括四种:AF_UNIX,AF_INET,AF_INET6和AF_UNSPEC。
    # AF_UNIX用于同一台机器上的进程间通信
    # 如果指定AF_INET,那么函数就不能返回任何IPV6相关的地址信息;如果仅指定了AF_INET6,则就不能返回任何IPV4地址信息。
    # AF_UNSPEC则意味着函数返回的是适用于指定主机名和服务名且适合任何协议族的地址。
    # 如果某个主机既有AAAA记录(IPV6)地址,同时又有A记录(IPV4)地址,那么AAAA记录将作为sockaddr_in6结构返回,而A记录则作为sockaddr_in结构返回
    if not socket.has_ipv6 and family == socket.AF_UNSPEC: # 如果系统不支持ipv6
        family = socket.AF_INET
    if flags is None:
        flags = socket.AI_PASSIVE
    bound_port = None
    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)):
        af, socktype, proto, canonname, sockaddr = res
        if (sys.platform == ‘darwin‘ and address == ‘localhost‘ and af == socket.AF_INET6 and sockaddr[3] != 0):
            # Mac OS X在“localhost”的getaddrinfo结果中包含一个链接本地地址fe80 :: 1%lo0。
            # 但是,防火墙不了解这是一个本地地址,并且会提示访问。 所以跳过这些地址。
            continue
        try:
            sock = socket.socket(af, socktype, proto)
        except socket.error as e:
            # 如果协议不支持该地址
            if errno_from_exception(e) == errno.EAFNOSUPPORT:
                continue
            raise
        # 为 fd 设置 FD_CLOEXEC 标识
        set_close_exec(sock.fileno())
        if os.name != ‘nt‘: # 非windows
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if reuse_port:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        if af == socket.AF_INET6:
            # On linux, ipv6 sockets accept ipv4 too by default,
            # but this makes it impossible to bind to both
            # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
            # separate sockets *must* be used to listen for both ipv4
            # and ipv6.  For consistency, always disable ipv4 on our
            # ipv6 sockets and use a separate ipv4 socket when needed.
            #
            # Python 2.x on windows doesn‘t have IPPROTO_IPV6.
            if hasattr(socket, "IPPROTO_IPV6"):
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

        # 自动端口分配,端口=None
        # 应该绑定在IPv4和IPv6地址上的同一个端口上
        host, requested_port = sockaddr[:2]
        if requested_port == 0 and bound_port is not None:
            sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
        # 设置socket为非阻塞
        sock.setblocking(0)
        sock.bind(sockaddr)
        bound_port = sock.getsockname()[1]
        sock.listen(backlog)
        sockets.append(sock)
    return sockets

2  与 2.5: tornado.process.fork_processes(0)

def fork_processes(num_processes, max_restarts=100):
    # 第一个参数表示启动多少个子进程,第二个参数表示当子进程由于某些情况结束,父进程可以重新启动一个子进程的次数
    global _task_id
    assert _task_id is None
    # 默认生成的子进程的个数等于cpu个数
    if num_processes is None or num_processes <= 0:
        num_processes = cpu_count()
    # 不允许在未完成创建子进程之前启动ioloop
    if ioloop.IOLoop.initialized():
        raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
                           "has already been initialized. You cannot call "
                           "IOLoop.instance() before calling start_processes()")
    gen_log.info("Starting %d processes", num_processes)
    children = {}

    def start_child(i):
        pid = os.fork()
        if pid == 0: # 表示子进程
            # child process
            _reseed_random()
            global _task_id
            _task_id = i
            return i  # 子进程会直接退出该函数(也就是start_child)的执行,并返回i的值
        else:
            # 父进程则会将子进程pid与子进程对应的i值进行映射,然后返回None
            children[pid] = i
            return None

    for i in range(num_processes):
        id = start_child(i)
        # 因为子进程继承了父进程的执行流,并且子进程不应fork子进程,而是应该由父进程进行对子进程的创建和管理等操作
        # 所以子进程的执行流需要跳出fork_processes这个函数
        if id is not None:
            return id

    num_restarts = 0
    # 只有父进程的执行流才能到达这里,父进程的执行流会一直处于该循环中,直到tornado服务主动关闭
    while children:
        try:
            # 等待任何一个子进程结束,返回一个tuple,包括子进程的进程ID和退出状态信息
            pid, status = os.wait()
        except OSError as e:
            # 当阻塞于某个慢系统调用的一个进程捕获某个信号且相应信号处理函数返回时,该系统调用可能返回一个EINTR错误
            if errno_from_exception(e) == errno.EINTR:
                continue
            raise
        if pid not in children:
            continue
        id = children.pop(pid)
        if os.WIFSIGNALED(status): # 如果进程由于信号而退出,则返回True,否则返回False
            gen_log.warning("child %d (pid %d) killed by signal %d, restarting",
                            id, pid, os.WTERMSIG(status))
        elif os.WEXITSTATUS(status) != 0: # 如果WIFEXITED(status)返回True,则返回一个整数,该整数是exit()调用的参数。否则返回值是未定义的
            gen_log.warning("child %d (pid %d) exited with status %d, restarting",
                            id, pid, os.WEXITSTATUS(status))
        else:
            gen_log.info("child %d (pid %d) exited normally", id, pid)
            continue
        num_restarts += 1
        if num_restarts > max_restarts:
            raise RuntimeError("Too many child restarts, giving up")
        # 新启动一个子进程
        new_id = start_child(id)
        # 保证子进程执行流离开fork_processes函数
        if new_id is not None:
            return new_id
    # 当所有的子进程都完全退出,这时候我们需要结束父进程
    # 如果我们仅仅是结束fork_processes函数的执行,那么父进程的执行流可能会启动ioloop
    sys.exit(0)

3  启动tcpserver: server = TCPServer()

这一步很简单,仅仅是一些初始化的操作

def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None, read_chunk_size=None):
        self.io_loop = io_loop
        self.ssl_options = ssl_options
        self._sockets = {}  # fd -> socket object    用来存储文件描述符与socket对象的映射关系
        self._pending_sockets = []
        self._started = False
        self.max_buffer_size = max_buffer_size    # 最大缓冲长度
        self.read_chunk_size = read_chunk_size    # 每次读的chunk大小

        # 校验ssl选项.
        if self.ssl_options is not None and isinstance(self.ssl_options, dict):
            # Only certfile is required: it can contain both keys
            if ‘certfile‘ not in self.ssl_options:
                raise KeyError(‘missing key "certfile" in ssl_options‘)

            if not os.path.exists(self.ssl_options[‘certfile‘]):
                raise ValueError(‘certfile "%s" does not exist‘ % self.ssl_options[‘certfile‘])
            if (‘keyfile‘ in self.ssl_options and not os.path.exists(self.ssl_options[‘keyfile‘])):
                raise ValueError(‘keyfile "%s" does not exist‘ % self.ssl_options[‘keyfile‘])

4  为所有套接字注册对应的事件以及处理函数: server.add_sockets(sockets)

首先来看add_sockets()方法

def add_sockets(self, sockets):
        if self.io_loop is None:
            self.io_loop = IOLoop.current()    # 获取IOLoop实例对象

        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)

其中调用了add_accept_handler()方法,并将自身的_handle_connection()方法作为参数传入(在这里我们不讲解这个方法,因为涉及的东西有点多)。

def add_accept_handler(sock, callback, io_loop=None):
    if io_loop is None: # 获取IOLoop实例对象
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        # 我们处理回调时可能会有许多的连接等待建立; 为了防止其他任务的饥饿,我们必须限制我们一次接受的连接数。
        # 理想情况下,我们接受在处理回调过程中等待的连接数,但此可能会对负载产生不利影响。
        # 相反,我们使用listen backlog作为我们可以合理接受的连接数的。
        for i in xrange(_DEFAULT_BACKLOG): # _DEFAULT_BACKLOG默认为128
            try:
                connection, address = sock.accept()
            except socket.error as e:
                # _ERRNO_WOULDBLOCK 表示我们已经接受了所有可用的连接。
                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
                    return
                # ECONNABORTED表示有一个连接,在他处于等待被服务端accept的时候主动关闭了。
                if errno_from_exception(e) == errno.ECONNABORTED:
                    continue
                raise
            callback(connection, address)
    io_loop.add_handler(sock, accept_handler, IOLoop.READ) # 为socket注册handler:当发生READ事件时运行accept_handler函数。

5  运行ioloop这个反应器: IOLoop.current().start()

这一步在之前的文章中已经介绍了,详细请看:这里

这里只简单的讲解一下start()方法:

def start(self):
        try:
            while True:
                callbacks = self._callbacks
                self._callbacks = []
                due_timeouts = []
                # 将时间已到的定时任务放置到due_timeouts中,过程省略
                for callback in callbacks:          # 执行callback
                    self._run_callback(callback)
                for timeout in due_timeouts:        # 执行定时任务
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)
                callbacks = callback = due_timeouts = timeout = None    # 释放内存
                # 根据情况设置poll_timeout的值,过程省略
                if not self._running:    # 终止ioloop运行时,在执行完了callback后结束循环
                    breaktry:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    if errno_from_exception(e) == errno.EINTR:  # 系统调用被信号处理函数中断,进行下一次循环
                        continue
                    else:
                        raise
                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()             # 获取一个fd以及对应事件
                    try:
                        fd_obj, handler_func = self._handlers[fd]   # 获取该fd对应的事件处理函数
                        handler_func(fd_obj, events)                # 运行该事件处理函数
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:     # 当客户端关闭连接时会产生EPIPE错误
                            pass
                        # 其他异常处理已经省略
                fd_obj = handler_func = None       # 释放内存空间    

另外的:

  TCPServer还有几个其他的方法,但很多调用了bind_sockets, add_accept_handler,  fork_processes这几个方法,当理解了这几个方法后TCPServer的其他方法就显得很简单了。所以本文并不做讨论

参考

  os模块: http://www.cnblogs.com/now-fighting/p/3534185.html

  Linux异常: http://blog.csdn.net/a8039974/article/details/25830705

  Linux多进程: UNIX环境高级编程

  tornado多进程分析: http://www.nowamagic.net/academy/detail/13321081

    http://strawhatfy.github.io/2015/10/14/tornado.tcpserver/

    https://www.linuxzen.com/tornado-duo-jin-cheng-shi-xian-fen-xi.html

  

时间: 2024-08-02 07:00:19

深入tornado中的TCPServer的相关文章

在tornado中使用celery实现异步任务处理之一

一.简介 tornado-celery是用于Tornado web框架的非阻塞 celery客户端. 通过tornado-celery可以将耗时任务加入到任务队列中处理, 在celery中创建任务,tornado中就可以像调用AsyncHttpClient一样调用这些任务. ? Celery中两个基本的概念:Broker.Backend Broker : 其实就是一开始说的 消息队列 ,用来发送和接受消息. Broker有几个方案可供选择:RabbitMQ,Redis,数据库等 Backend:

深入tornado中的协程

tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费. tornado中的I/O多路复用前面已经讲过了.本文不做详细解释. 来看一下tornado中的协程模块:tornado.gen: tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步. 先来说一下tornado.gen.coroutine的实现思路: 我们知道generator中的yield语句可以使

在 tornado 中异步无阻塞的执行耗时任务

在 linux 上 tornado 是基于 epoll 的事件驱动框架,在网络事件上是无阻塞的.但是因为 tornado 自身是单线程的,所以如果我们在某一个时刻执行了一个耗时的任务,那么就会阻塞在这里,无法响应其他的任务请求,这个和 tornado 的高性能服务器称号不符,所以我们要想办法把耗时的任务转换为不阻塞主线程,让耗时的任务不影响对其他请求的响应. 在 python 3.2 上,增加了一个并行库 concurrent.futures,这个库提供了更简单的异步执行函数的方法. 如果是在

tornado中使用torndb,连接数过高的问题

问题背景 最近新的产品开发中,使用了到了Tornado和mysql数据库.但在基本框架完成之后,我在开发时候发现了一个很奇怪的现象,我在测试时,发现数据库返回不了结果,于是我在mysql中输入show processlist,发现连接数很高,我就将目标锁定在程序代码和torndb上了. 探索原因 当我kill掉tornado的进程时候,连接数就会被关闭,所以果断判断会不会是torndb的原因,导致我连接没有断开.于是我猜想会不会是在tornado服务挂掉的时候,mysql的连接没关.而且我想到我

Python设计模式中单例模式的实现及在Tornado中的应用

单例模式的实现方式 将类实例绑定到类变量上 class Singleton(object): _instance = None def new(cls, *args): if not isinstance(cls._instance, cls): cls._instance = super(Singleton, cls).__new__(cls, *args) return cls._instance 但是子类在继承后可以重写__new__以失去单例特性 class D(Singleton):

Tornado中异步框架的使用

tornado的同步框架与其他web框架相同都是处理先来的请求,如果先来的请求阻塞,那么后面的请求也会处理不了.一直处于等待过程中.但是请求一旦得到响应,那么: 请求发送过来后,将需要的本站资源直接返回给客户端 请求发送过来后,本站没有需要的资源,从其它站点获取过来,再返回给客户端 一.Tornado中的同步框架 1.本站资源直接返回 import tornado.web import time class LoginHandler(tornado.web.RequestHandler): de

深入tornado中的IOStream

IOStream对tornado的高效起了很大的作用,他封装了socket的非阻塞IO的读写操作.大体上可以这么说,当连接建立后,服务端与客户端的请求响应都是基于IOStream的,也就是说:IOStream是用来处理连接的. 接下来说一下有关接收请求的大体流程: 当连接建立,服务器端会产生一个对应该连接的socket,同时将该socket封装至IOStream实例中(这代表着IOStream的初始化). 我们知道tornado是基于IO多路复用的(就拿epoll来说),此时将socket进行r

深入tornado中的ioLoop

本文所剖析的tornado源码版本为4.4.2 ioloop是tornado的关键,是他的最底层. ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._instance中 ioloop实现了Reactor模型,将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上:一旦有I/O事件到来或是准备就绪(文件描述符或socket可读.写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中. 另外,ioloop还被

深入tornado中的Configurable

Configurable十分重要! 位于tornado.util文件中,它是一个工厂类. 我们暂且称这个类为 配置类 . 我们暂且约定:该类的子类称之为 直属配置子类 , 该类的孙类.重孙类--称之为 从属配置子类.就像这样: class Configurable(object): # 配置类 pass class IOLoop(Configurable): # 直属配置子类 pass class PollIOLoop(IOLoop): # 从属配置子类 pass 这个配置类类似于java中的接