Tornado异步阻塞解决方案

在 tornado 中异步无阻塞的执行耗时任务



在 linux 上 tornado 是基于 epoll 的事件驱动框架,在网络事件上是无阻塞的。但是因为 tornado 自身是单线程的,所以如果我们在某一个时刻执行了一个耗时的任务,那么就会阻塞在这里,无法响应其他的任务请求,这个和 tornado 的高性能服务器称号不符,所以我们要想办法把耗时的任务转换为不阻塞主线程,让耗时的任务不影响对其他请求的响应。

在 python 3.2 上,增加了一个并行库 concurrent.futures,这个库提供了更简单的异步执行函数的方法。

如果是在 2.7 之类的 python 版本上,可以使用 pip install futures 来安装这个库。

关于这个库的具体使用,这里就不详细展开了,可以去看官方文档,需要注意的是,前两个例子是示例错误的用法,可能会产生死锁。

下面说说如何在 tornado 中结合使用 futures 库,最好的参考莫过于有文档+代码。正好, tornado 中解析 ip 使用的 dns 解析服务是多线程无阻塞的。(netutils.ThreadedResolver)

我们来看看它的实现,看看如何应用到我们的程序中来。

tornado 中使用多线程无阻塞来处理 dns 请求

# 删除了注释
class ThreadedResolver(ExecutorResolver):
    _threadpool = None
    _threadpool_pid = None
    def initialize(self, io_loop=None, num_threads=10):
        threadpool = ThreadedResolver._create_threadpool(num_threads)
        super(ThreadedResolver, self).initialize(
            io_loop=io_loop, executor=threadpool, close_executor=False)
    @classmethod
    def _create_threadpool(cls, num_threads):
        pid = os.getpid()
        if cls._threadpool_pid != pid:
            # Threads cannot survive after a fork, so if our pid isn‘t what it
            # was when we created the pool then delete it.
            cls._threadpool = None
        if cls._threadpool is None:
            from concurrent.futures import ThreadPoolExecutor
            cls._threadpool = ThreadPoolExecutor(num_threads)
            cls._threadpool_pid = pid
        return cls._threadpool

ThreadedResolver 是 ExecutorEesolver 的子类,看看它的是实现。

class ExecutorResolver(Resolver):
    def initialize(self, io_loop=None, executor=None, close_executor=True):
        self.io_loop = io_loop or IOLoop.current()
        if executor is not None:
            self.executor = executor
            self.close_executor = close_executor
        else:
            self.executor = dummy_executor
            self.close_executor = False
    def close(self):
        if self.close_executor:
            self.executor.shutdown()
        self.executor = None
    @run_on_executor
    def resolve(self, host, port, family=socket.AF_UNSPEC):
        addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
        results = []
        for family, socktype, proto, canonname, address in addrinfo:
            results.append((family, address))
        return results

从 ExecutorResolver 的实现可以看出来,它的关键参数是 ioloop 和 executor,干活的 resolve 函数被@run_on_executor 修饰,结合起来看 ThreadedResolver 的实现,那么这里的 executor 就是from concurrent.futures import ThreadPoolExecutor

再来看看 @run_on_executor 的实现。

run_on_executor 的实现在 concurrent.py 文件中,它的源码如下:

def run_on_executor(fn):
    @functools.wraps(fn)
    def wrapper(self, *args, **kwargs):
        callback = kwargs.pop("callback", None)
        future = self.executor.submit(fn, self, *args, **kwargs)
        if callback:
            self.io_loop.add_future(future,
                                    lambda future: callback(future.result()))
        return future
    return wrapper

关于 functions.wraps() 的介绍可以参考官方文档 functools — Higher-order functions and operations on callable objects

简单的说,这里对传递进来的函数进行了封装,并用 self.executor.submit() 对包装的函数进行了执行,并判断是否有回调,如果有,就加入到 ioloop 的 callback 里面。

对比官方的 concurrent.futures.Executor 的接口,里面有个 submit() 方法,从头至尾看看ThreadedResolver 的实现,就是使用了 concurrent.futures.ThreadPoolExecutor 这个 Executor 的子类。

所以 tornado 中解析 dns 使用的多线程无阻塞的方法的实质就是使用了 concurrent.futures 提供的ThreadPoolExecutor 功能。


使用多线程无阻塞方法来执行耗时的任务

借鉴 tornado 的使用方法,在我们自己的程序中也使用这种方法来处理耗时的任务。

from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
class LongTimeTask(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(10)
    @run_on_executor()
    def get(self, data):
        long_time_task(data)

上面就是一个基本的使用方法,下面展示一个使用 sleep() 来模拟耗时的完整程序。

#!/usr/bin/env python
#-*-coding:utf-8-*-
import tornado.ioloop
import tornado.web
import tornado.httpserver
from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor
import time
class App(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r‘/‘, IndexHandler),
            (r‘/sleep/(\d+)‘, SleepHandler),
        ]
        settings = dict()
        tornado.web.Application.__init__(self, handlers, **settings)
class BaseHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(10)
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world %s" % time.time())
class SleepHandler(BaseHandler):
    @run_on_executor
    def get(self, n):
        time.sleep(float(n))
        self._callback()
    def _callback(self):
        self.write("after sleep, now I‘m back %s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if __name__ == "__main__":
    app = App()
    server = tornado.httpserver.HTTPServer(app, xheaders=True)
    server.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

此时先调用 127.0.0.1:8888/sleep/10 不会阻塞 127.0.0.1:8888/ 了。

以上,就是完整的在 tornado 中利用多线程来执行耗时的任务。


结语

epoll 的好处确实很多,事件就绪通知后,上层任务函数执行任务,如果任务本身需要较耗时,那么就可以考虑这个方法了,
当然也有其他的方法,比如使用 celery 来调度执行耗时太多的任务,比如频繁的需要写入数据到不同的文件中,我公司的一个项目中,需要把数据写入四千多个文件中,每天产生几亿条数据,就是使用了 tornado + redis + celery 的方法来高效的执行写文件任务。

完。

时间: 2024-10-19 00:12:33

Tornado异步阻塞解决方案的相关文章

tornado的非异步阻塞模式

[优化tornado阻塞任务的三个选择] 1.优化阻塞的任务,使其执行时间更快.经常由于是一个DB的慢查询,或者复杂的上层模板导致的,这个时候首要的是加速这些任务,而不是优化复杂的webserver.可以提升99%的效率. 2.开启一个单独的线程或者进程执行耗时任务.这意味着对于IOLoop来说,可以开启另一个线程(或进程)处理off-loading任务,这样它就可以再接收其他请求了,而不是阻塞住. 3.使用异步的驱动或者库函数来执行任务,例如gevent , motor. [例子1] impo

初始Tornado异步非阻塞

Tornado  异步非阻塞 from tornado import gen class MainHandler(tornado.web.RequestHandler): @gen.coroutine  #关键点 def get(self): futrue =Future()#关键点 #阻塞内容,必须写这个格式,time.sleep不好使 #tornado.ioloop.IOLoop.current().add_timeout(time.time()+10,self.doing) #关键点 se

Tornado异步非阻塞的使用以及原理

Tornado 和现在的主流 Web 服务器框架(包括大多数 Python 的框架)有着明显的区别:它是非阻塞式服务器,而且速度相当快.得利于其 非阻塞的方式和对 epoll 的运用,Tornado 每秒可以处理数以千计的连接,这意味着对于实时 Web 服务来说,Tornado 是一个理想的 Web 框架. 一.Tornado的两种模式使用 1.同步阻塞模式 由于doing中sleep10秒,此时其他连接将被阻塞,必须等这次请求完成后其他请求才能连接成功. 1 import tornado.io

(转)同步异步/阻塞非阻塞 和 5种linux网络通信模型

会阻塞的函数:connect, accept,send/recv/sendto/recvfrom等读写函数. 不会阻塞的函数:bind, listen,socket, closesocket. linux网络通信模型有: 阻塞IO模型(同步),非阻塞IO模型(拷贝同步),IO复用模型(多线程同步),信号驱动IO模型((拷贝同步),异步IO模型(异步). node.js对同步/异步,阻塞非阻塞的解释: 线程在执行中如果遇到磁盘读写或网络通信(统称为I/O 操作),通常要耗费较长的时间,这时 操作系

同步 异步 阻塞 非阻塞 的概念

转自:http://blog.chinaunix.net/uid-26000296-id-3754543.html 简介: Linux? 中最常用的输入/输出(I/O)模型是同步 I/O.在这个模型中,当请求发出之后,应用程序就会阻塞,直到请求满足为止.这是很好的一种解决方案,因为调用应用程序在等待 I/O 请求完成时不需要使用任何中央处理单元(CPU).但是在某些情况中,I/O 请求可能需要与其他进程产生交叠(并行).可移植操作系统接口(POSIX)异步 I/O(AIO)应用程序接口(API)

IO模型介绍 以及同步异步阻塞非阻塞的区别

阻塞:用户进程访问数据时,如果未完成IO,等待IO操作完成或者进行系统调用来判断IO是否完成非阻塞:用户进程访问数据时,会马上返回一个状态值,无论是否完成 同步:用户进程发起IO(就绪判断)后,轮询内核状态异步:用户进程发起IO后,可以做其他事情,等待内核通知 介绍一下IO模型 网络IO模型和文件IO模型是一样的,上图是IO的5种模型,包括阻塞IO.非阻塞IO.多路复用IO.信号驱动的IO.异步IO. 一次IO包括两个过程,内核数据准备 .把数据从内核空间copy到用户空间. 1.阻塞IO(re

同步/异步-阻塞/非阻塞

(A)同步和异步,是针对 调用结果是如何返回给调用者来说的,即调用的结果是调用者主动去获取的(比如一直等待recvfrom或者设置超时等待select),则为同步,而调用结果是被调用者在完成之后通知调用者的,则为异步(比如windows的IOCP).(B)阻塞和非阻塞,是针对调用者所在线程是否在调用之后主动挂起来说的,即如果在线程中调用者发出调用之后,再被调用这返回之前,该线程主动挂起,则为阻塞,若线程不主动挂起,而继续向下执行,则为非阻塞. 这样,在网络IO中,同步异步,阻塞非阻塞,就可以形成

Winform中子线程访问界面控件时被阻塞解决方案

public partial class WebData_Import : Form { //声明用于访问主界面的委托类型 public delegate void deleGetOrderdata(string info); //声明访问主界面委托类型的变量 public deleGetOrderdata OptGetOrderData; int CompanyID = 0; public WebData_Import() { InitializeComponent(); cmbCompany

epoll IO多路复用(异步阻塞AIO)

epoll的异步阻塞(AIO): 用户线程创建epoll后,其实是内核线程负责扫描 fd 列表(在网络服务器上可以是socket,socket在创建后返回的也是文件描述符),并填充事件链表.但是,并不会主动通知用户线程,没有一个回调函数调用之前注册好的函数,还是需要用户线程不停的轮询,所以epoll有人也叫他 伪AIO. epoll的三个函数: #include <sys/epoll.h> int epoll_create(int size); int epoll_ctl(int epfd,