深入tornado中的ioLoop

本文所剖析的tornado源码版本为4.4.2

ioloop是tornado的关键,是他的最底层。

ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._instance中

ioloop实现了Reactor模型,将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。

另外,ioloop还被用来集中运行回调函数以及集中处理定时任务。

一 准备知识:

  1 首先我们要了解Reactor模型

  2 其次,我们要了解I/O多路复用,由于本文假设系统为Linux,所以要了解epoll以及Python中的select模块

  3 IOLoop类是Configurable类的子类,而Configurable类是一个工厂类,讲解在这

二  创建IOLoop实例

来看IOLoop,它的父类是Configurable类,也就是说:IOLoop是一个直属配置子类

class IOLoop(Configurable):
    ......

这里就要结合Configurable类进行讲解:

 Configurable中的__new__方法

1 首先实例化一个该直属配置子类的‘执行类对象‘,也就是调用该类的configurable_default方法并返回赋值给impl:

    @classmethod
    def configurable_default(cls):
        if hasattr(select, "epoll"):     # 因为我们假设我们的系统为Linux,且支持epoll,所以这里为True
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop
        if hasattr(select, "kqueue"):
            # Python 2.6+ on BSD or Mac
            from tornado.platform.kqueue import KQueueIOLoop
            return KQueueIOLoop
        from tornado.platform.select import SelectIOLoop
        return SelectIOLoop

2 也就是impl是EPollIOLoop类对象,然后实例化该对象,运行其initialize方法

class EPollIOLoop(PollIOLoop):  # 该类只有这么短短的几句,可见主要的方法是在其父类PollIOLoop中实现。
    def initialize(self, **kwargs):
        super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 执行了父类PollIOLoop的initialize方法,并将select.epoll()传入

  来看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())干了些啥:

class PollIOLoop(IOLoop):  # 从属配置子类

    def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)                # 调用IOLoop的initialize方法
        self._impl = impl                               # self._impl = select.epoll()
        if hasattr(self._impl, ‘fileno‘):               # 文件描述符的close_on_exec属性
            set_close_exec(self._impl.fileno())
        self.time_func = time_func or time.time
        self._handlers = {}                             # 文件描述符对应的fileno()作为key,(文件描述符对象,处理函数)作为value
        self._events = {}                               # 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……}
        self._callbacks = []
        self._callback_lock = threading.Lock()          # 添加线程锁
        self._timeouts = []                             # 存储定时任务
        self._cancellations = 0
        self._running = False
        self._stopped = False
        self._closing = False
        self._thread_ident = None                       # 获得当前线程标识符
        self._blocking_signal_threshold = None
        self._timeout_counter = itertools.count()

        # Create a pipe that we send bogus data to when we want to wake
        # the I/O loop when it is idle
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)

  首先调用了IOLoop.initialize(self,**kwargs)方法:

    def initialize(self, make_current=None):
        if make_current is None:
            if IOLoop.current(instance=False) is None:
                self.make_current()
        elif make_current:
            if IOLoop.current(instance=False) is not None:
                raise RuntimeError("current IOLoop already exists")
            self.make_current()
    @staticmethod
    def current(instance=True):
        current = getattr(IOLoop._current, "instance", None)
        if current is None and instance:
            return IOLoop.instance()
        return current

    def make_current(self):
        IOLoop._current.instance = self

我们可以看到IOLoop.initialize()主要是对线程做了一些支持和操作。

3 返回该实例

三 剖析PollIOLoop

1 处理I/O事件以及其对应handler的相关属性以及方法

使用self._handlers用来存储fd与handler的对应关系,文件描述符对应的fileno()作为key,元组(文件描述符对象,处理函数)作为value

  self._events 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……}

add_handler方法用来添加handler

  update_handle方法用来更新handler

remove_handler方法用来移除handler

    def add_handler(self, fd, handler, events):
        # 向epoll中注册事件 , 并在self._handlers[fd]中为该文件描述符添加相应处理函数
        fd, obj = self.split_fd(fd)   # fd.fileno(),fd
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        fd, obj = self.split_fd(fd)
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        fd, obj = self.split_fd(fd)
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except Exception:
            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

2 处理回调函数的相关属性以及方法

  self._callbacks用来存储回调函数

  add_callback方法用来直接添加回调函数

  add_future方法用来间接的添加回调函数,future对象详解在这

    def add_callback(self, callback, *args, **kwargs):
        # 因为Python的GIL的限制,导致Python线程并不算高效。加上tornado实现了多进程 + 协程的模式,所以我们略过源码中的部分线程相关的一些操作
        if self._closing:
            return
        self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))
    def add_future(self, future, callback):
        # 为future对象添加经过包装后的回调函数,该回调函数会在future对象被set_done后添加至_callbacks中
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future))

3 处理定时任务的相关属性以及方法

  self._timeouts用来存储定时任务

  self.add_timeout用来添加定时任务(self.call_later   self.call_at都是间接调用了该方法)

def add_timeout(self, deadline, callback, *args, **kwargs):
        """
            ``deadline``可能是一个数字,表示相对于当前时间的时间(与“IOLoop.time”通常为“time.time”相同的大小),或者是datetime.timedelta对象。
            自从Tornado 4.0以来,`call_later`是一个比较方便的替代方案,因为它不需要timedelta对象。

        """
        if isinstance(deadline, numbers.Real):
            return self.call_at(deadline, callback, *args, **kwargs)
        elif isinstance(deadline, datetime.timedelta):
            return self.call_at(self.time() + timedelta_to_seconds(deadline),
                                callback, *args, **kwargs)
        else:
            raise TypeError("Unsupported deadline %r" % deadline)

4 启动io多路复用器

  启动也一般就意味着开始循环,那么循环什么呢?

    1 运行回调函数

    2 运行时间已到的定时任务

    3 当某个文件描述法发生事件时,运行该事件对应的handler

  使用start方法启动ioloop,看一下其简化版(去除线程相关,以及一些相对不重要的细节):

def start(self):
        try:
            while True:
                callbacks = self._callbacks
                self._callbacks = []
                due_timeouts = []
                # 将时间已到的定时任务放置到due_timeouts中,过程省略
                for callback in callbacks:          # 执行callback
                    self._run_callback(callback)
                for timeout in due_timeouts:        # 执行定时任务
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)
                callbacks = callback = due_timeouts = timeout = None    # 释放内存
                # 根据情况设置poll_timeout的值,过程省略
                if not self._running:    # 终止ioloop运行时,在执行完了callback后结束循环
                    breaktry:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    if errno_from_exception(e) == errno.EINTR:  # 系统调用被信号处理函数中断,进行下一次循环
                        continue
                    else:
                        raise
                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()             # 获取一个fd以及对应事件
                    try:
                        fd_obj, handler_func = self._handlers[fd]   # 获取该fd对应的事件处理函数
                        handler_func(fd_obj, events)                # 运行该事件处理函数
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:     # 当客户端关闭连接时会产生EPIPE错误
                            pass
                        # 其他异常处理已经省略
                fd_obj = handler_func = None       # 释放内存空间          

 start完整版

5 关闭io多路复用器

def close(self, all_fds=False):
        with self._callback_lock:
            self._closing = True
        self.remove_handler(self._waker.fileno())
        if all_fds:    # 该参数若为True,则表示会关闭所有文件描述符
            for fd, handler in self._handlers.values():
                self.close_fd(fd)
        self._waker.close()
        self._impl.close()
        self._callbacks = None
        self._timeouts = None

四 参考 

  https://zhu327.github.io/2016/06/14/tornado%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-ioloop/
  https://www.zhihu.com/question/20021164 
  http://stackoverflow.com/questions/12179271/meaning-of-classmethod-and-staticmethod-for-beginner/12179752#12179752
  http://blog.csdn.net/benkaoya/article/details/17262053

时间: 2024-10-19 07:23:31

深入tornado中的ioLoop的相关文章

在tornado中使用celery实现异步任务处理之一

一.简介 tornado-celery是用于Tornado web框架的非阻塞 celery客户端. 通过tornado-celery可以将耗时任务加入到任务队列中处理, 在celery中创建任务,tornado中就可以像调用AsyncHttpClient一样调用这些任务. ? Celery中两个基本的概念:Broker.Backend Broker : 其实就是一开始说的 消息队列 ,用来发送和接受消息. Broker有几个方案可供选择:RabbitMQ,Redis,数据库等 Backend:

深入tornado中的协程

tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费. tornado中的I/O多路复用前面已经讲过了.本文不做详细解释. 来看一下tornado中的协程模块:tornado.gen: tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步. 先来说一下tornado.gen.coroutine的实现思路: 我们知道generator中的yield语句可以使

Python Tornado框架(ioloop对象分析)

网上都说nginx和lighthttpd是高性能web服务器,而tornado也是著名的高抗负载应用,它们间有什么相似处呢?上节提到的ioloop对象是如何循环的呢?往下看. 首先关于TCP服务器的开发上节已经提过,很明显那个三段式的示例是个效率很低的(因为只有一个连接被端开新连接才能被接受).要想开发高性能的服务器,就得在这accept上下功夫. 首先,新连接的到来一般是经典的三次握手,只有当服务器收到一个SYN时才说明有一个新连接(还没建立),这时监听fd是可读的可以调用accept,此前服

深入tornado中的TCPServer

1 梳理: 应用层的下一层是传输层,而http协议一般是使用tcp的,所以实现tcp的重要性就不言而喻. 由于tornado中实现了ioloop这个反应器以及iostream这个对连接的异步读写,所以tcp就很容易实现异步. 在tornado的tcpserver文件中,实现了TCPServer这个类,他是一个单线程的,非阻塞的tcp 服务. 为了与上层协议(在tornado中就是HTTPServer)交互,TCPServer提供了一个接口:handle_stream, 要求其子类必需实现该方法.

在 tornado 中异步无阻塞的执行耗时任务

在 linux 上 tornado 是基于 epoll 的事件驱动框架,在网络事件上是无阻塞的.但是因为 tornado 自身是单线程的,所以如果我们在某一个时刻执行了一个耗时的任务,那么就会阻塞在这里,无法响应其他的任务请求,这个和 tornado 的高性能服务器称号不符,所以我们要想办法把耗时的任务转换为不阻塞主线程,让耗时的任务不影响对其他请求的响应. 在 python 3.2 上,增加了一个并行库 concurrent.futures,这个库提供了更简单的异步执行函数的方法. 如果是在

tornado中使用torndb,连接数过高的问题

问题背景 最近新的产品开发中,使用了到了Tornado和mysql数据库.但在基本框架完成之后,我在开发时候发现了一个很奇怪的现象,我在测试时,发现数据库返回不了结果,于是我在mysql中输入show processlist,发现连接数很高,我就将目标锁定在程序代码和torndb上了. 探索原因 当我kill掉tornado的进程时候,连接数就会被关闭,所以果断判断会不会是torndb的原因,导致我连接没有断开.于是我猜想会不会是在tornado服务挂掉的时候,mysql的连接没关.而且我想到我

Python设计模式中单例模式的实现及在Tornado中的应用

单例模式的实现方式 将类实例绑定到类变量上 class Singleton(object): _instance = None def new(cls, *args): if not isinstance(cls._instance, cls): cls._instance = super(Singleton, cls).__new__(cls, *args) return cls._instance 但是子类在继承后可以重写__new__以失去单例特性 class D(Singleton):

Tornado中异步框架的使用

tornado的同步框架与其他web框架相同都是处理先来的请求,如果先来的请求阻塞,那么后面的请求也会处理不了.一直处于等待过程中.但是请求一旦得到响应,那么: 请求发送过来后,将需要的本站资源直接返回给客户端 请求发送过来后,本站没有需要的资源,从其它站点获取过来,再返回给客户端 一.Tornado中的同步框架 1.本站资源直接返回 import tornado.web import time class LoginHandler(tornado.web.RequestHandler): de

深入tornado中的IOStream

IOStream对tornado的高效起了很大的作用,他封装了socket的非阻塞IO的读写操作.大体上可以这么说,当连接建立后,服务端与客户端的请求响应都是基于IOStream的,也就是说:IOStream是用来处理连接的. 接下来说一下有关接收请求的大体流程: 当连接建立,服务器端会产生一个对应该连接的socket,同时将该socket封装至IOStream实例中(这代表着IOStream的初始化). 我们知道tornado是基于IO多路复用的(就拿epoll来说),此时将socket进行r