关于flask线程安全的简单研究

  flask是python web开发比较主流的框架之一,也是我在工作中使用的主要开发框架。一直对其是如何保证线程安全的问题比较好奇,所以简单的探究了一番,由于只是简单查看了源码,并未深入细致研究,因此以下内容仅为个人理解,不保证正确性。

  首先是很多文章都说flask会为每一个request启动一个线程,每个request都在单独线程中处理,因此保证了线程安全。于是就做了一个简单的测试。首先是写一个简单的flask程序(只需要有最简单的功能用于测试即可),然后我们知道一个flask应用启动之后实际上是作为一个 WSGI application的,之后所有接收到的请求都会经由flask的wsgi_app(self, environ, start_response)方法去处理,所以就来看一下这个方法(注释已去掉)。

def wsgi_app(self, environ, start_response):
        ctx = self.request_context(environ)
        ctx.push()
        error = None
        try:
            try:
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)

那么这个request_context又是什么东西呢?它是一个RequestContext对象,文档是这么说的:

The request context contains all request relevant information.  It is
created at the beginning of the request and pushed to the
`_request_ctx_stack` and removed at the end of it.  It will create the
URL adapter and request object for the WSGI environment provided.

说的很清楚,这个对象的上下文包含着request相关的信息。也就是说每一个请求到来之后,flask都会为它新建一个RequestContext对象,并且将这个对象push进全局变量_request_ctx_stack中,在push前还要检查_app_ctx_stack,如果_app_ctx_stack的栈顶元素不存在或是与当前的应用不一致,则首先push appcontext 到_app_ctx_stack中,再push requestcontext。源码如下:

def push(self):

        top = _request_ctx_stack.top
        if top is not None and top.preserved:
            top.pop(top._preserved_exc)

        # Before we push the request context we have to ensure that there
        # is an application context.
        app_ctx = _app_ctx_stack.top
        if app_ctx is None or app_ctx.app != self.app:
            app_ctx = self.app.app_context()
            app_ctx.push()
            self._implicit_app_ctx_stack.append(app_ctx)
        else:
            self._implicit_app_ctx_stack.append(None)
        if hasattr(sys, ‘exc_clear‘):
            sys.exc_clear()

        _request_ctx_stack.push(self)
        # Open the session at the moment that the request context is
        # available. This allows a custom open_session method to use the
        # request context (e.g. code that access database information
        # stored on `g` instead of the appcontext).
        self.session = self.app.open_session(self.request)
        if self.session is None:
            self.session = self.app.make_null_session()

通过上面的两步,每一个请求的应用上下文和请求上下文就被push到了全局变量_request_ctx_stack和_app_ctx_stack中。
  现在我们知道了_request_ctx_stack和_app_ctx_stack是何时被push的,每一个请求到来都会导致新的RequestContext和AppContext被建立并push,一旦请求处理完毕就被pop出去。而无论是_app_ctx_stack还是_request_ctx_stack都是一个LocalStack对象,这是werkzeug中的一个对象,看看它里边有什么:

class LocalStack(object):

    def __init__(self):
        self._local = Local()

    def __release_local__(self):
        self._local.__release_local__()

    def _get__ident_func__(self):
        return self._local.__ident_func__

    def _set__ident_func__(self, value):
        object.__setattr__(self._local, ‘__ident_func__‘, value)
    __ident_func__ = property(_get__ident_func__, _set__ident_func__)
    del _get__ident_func__, _set__ident_func__

    def __call__(self):
        def _lookup():
            rv = self.top
            if rv is None:
                raise RuntimeError(‘object unbound‘)
            return rv
        return LocalProxy(_lookup)

    def push(self, obj):
        """Pushes a new item to the stack"""
        rv = getattr(self._local, ‘stack‘, None)
        if rv is None:
            self._local.stack = rv = []
        rv.append(obj)
        return rv

    def pop(self):
        """Removes the topmost item from the stack, will return the
        old value or `None` if the stack was already empty.
        """
        stack = getattr(self._local, ‘stack‘, None)
        if stack is None:
            return None
        elif len(stack) == 1:
            release_local(self._local)
            return stack[-1]
        else:
            return stack.pop()

    @property
    def top(self):
        """The topmost item on the stack.  If the stack is empty,
        `None` is returned.
        """
        try:
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None

可以看到,这个对象的几乎所有重要属性在_local这一属性中,它是一个Local对象,很有意思,如果看一下Local的构造器,会发现其中包含有重要属性__ident_func__,

def __init__(self):    object.__setattr__(self, ‘__storage__‘, {})    object.__setattr__(self, ‘__ident_func__‘, get_ident)

这一属性由get_ident方法提供,这个方法的作用是提供当前线程的id,用于区别同时存在的多个线程Return a non-zero integer that uniquely identifies the current thread

amongst other threads that exist simultaneously.

  到此为止,可见作为一个全局变量_request_ctx_stack和_app_ctx_stack应该都是只有一个线程去处理,没有发现哪里有可以为每个请求都开启一个线程的代码,实际测试一下,可以发现确实所有的请求都只运行在一个线程上(其中1为_request_ctx_stack,2为_app_ctx_stack,可见无论多少请求都是同一个对象)。
这下就有趣了,传说中的每个请求一个线程果然没有出现,那么flask的线程安全是如何保证的呢?如果把每次请求到来时附带的environ(wsgi_app方法参数中的environ)打印看看的话就会发现,每个environ都携带了请求相关的全部上下文信息,在请求到来的时候通过附带的environ重建context,并push到栈中,那后尽快处理完该请求,处理完毕后将其pop出去。在这种情况下,如果在高并发状态下,应该是无法保证线程安全的(这个结论不敢保证肯定正确,但至少我没有发现有什么机制可以在此情景下保证线程安全,既没有锁,也没有看到阻塞。严谨说这里应该做一个高并发测试来验证结论,但是懒的写脚本去测试了,有兴趣的朋友可以自己试试,如果结论不对,欢迎指正)。  那么很多文章说的每个请求一个线程到底是在哪里建立的呢?这就要去仔细看一下flask.app的run方法了:
    def run(self, host=None, port=None, debug=None, **options):

        from werkzeug.serving import run_simple
        if host is None:
            host = ‘127.0.0.1‘
        if port is None:
            server_name = self.config[‘SERVER_NAME‘]
            if server_name and ‘:‘ in server_name:
                port = int(server_name.rsplit(‘:‘, 1)[1])
            else:
                port = 5000
        if debug is not None:
            self.debug = bool(debug)
        options.setdefault(‘use_reloader‘, self.debug)
        options.setdefault(‘use_debugger‘, self.debug)
        try:
            run_simple(host, port, self, **options)
        finally:
            # reset the first request information if the development server
            # reset normally.  This makes it possible to restart the server
            # without reloader and that stuff from an interactive shell.
            self._got_first_request = False

这个方法实际上是对werkzeug的run_simple方法的简单包装。而run_simple方法则有趣的多(这一段把注释也贴上)
def run_simple(hostname, port, application, use_reloader=False,
               use_debugger=False, use_evalex=True,
               extra_files=None, reloader_interval=1,
               reloader_type=‘auto‘, threaded=False,
               processes=1, request_handler=None, static_files=None,
               passthrough_errors=False, ssl_context=None):
    """Start a WSGI application. Optional features include a reloader,
    multithreading and fork support.

    This function has a command-line interface too::

        python -m werkzeug.serving --help

    .. versionadded:: 0.5
       `static_files` was added to simplify serving of static files as well
       as `passthrough_errors`.

    .. versionadded:: 0.6
       support for SSL was added.

    .. versionadded:: 0.8
       Added support for automatically loading a SSL context from certificate
       file and private key.

    .. versionadded:: 0.9
       Added command-line interface.

    .. versionadded:: 0.10
       Improved the reloader and added support for changing the backend
       through the `reloader_type` parameter.  See :ref:`reloader`
       for more information.

    :param hostname: The host for the application.  eg: ``‘localhost‘``
    :param port: The port for the server.  eg: ``8080``
    :param application: the WSGI application to execute
    :param use_reloader: should the server automatically restart the python
                         process if modules were changed?
    :param use_debugger: should the werkzeug debugging system be used?
    :param use_evalex: should the exception evaluation feature be enabled?
    :param extra_files: a list of files the reloader should watch
                        additionally to the modules.  For example configuration
                        files.
    :param reloader_interval: the interval for the reloader in seconds.
    :param reloader_type: the type of reloader to use.  The default is
                          auto detection.  Valid values are ``‘stat‘`` and
                          ``‘watchdog‘``. See :ref:`reloader` for more
                          information.
    :param threaded: should the process handle each request in a separate
                     thread?
    :param processes: if greater than 1 then handle each request in a new process
                      up to this maximum number of concurrent processes.
    :param request_handler: optional parameter that can be used to replace
                            the default one.  You can use this to replace it
                            with a different
                            :class:`~BaseHTTPServer.BaseHTTPRequestHandler`
                            subclass.
    :param static_files: a dict of paths for static files.  This works exactly
                         like :class:`SharedDataMiddleware`, it‘s actually
                         just wrapping the application in that middleware before
                         serving.
    :param passthrough_errors: set this to `True` to disable the error catching.
                               This means that the server will die on errors but
                               it can be useful to hook debuggers in (pdb etc.)
    :param ssl_context: an SSL context for the connection. Either an
                        :class:`ssl.SSLContext`, a tuple in the form
                        ``(cert_file, pkey_file)``, the string ``‘adhoc‘`` if
                        the server should automatically create one, or ``None``
                        to disable SSL (which is the default).
    """
    if use_debugger:
        from werkzeug.debug import DebuggedApplication
        application = DebuggedApplication(application, use_evalex)
    if static_files:
        from werkzeug.wsgi import SharedDataMiddleware
        application = SharedDataMiddleware(application, static_files)

    def log_startup(sock):
        display_hostname = hostname not in (‘‘, ‘*‘) and hostname or ‘localhost‘
        if ‘:‘ in display_hostname:
            display_hostname = ‘[%s]‘ % display_hostname
        quit_msg = ‘(Press CTRL+C to quit)‘
        port = sock.getsockname()[1]
        _log(‘info‘, ‘ * Running on %s://%s:%d/ %s‘,
             ssl_context is None and ‘http‘ or ‘https‘,
             display_hostname, port, quit_msg)

    def inner():
        try:
            fd = int(os.environ[‘WERKZEUG_SERVER_FD‘])
        except (LookupError, ValueError):
            fd = None
        srv = make_server(hostname, port, application, threaded,
                          processes, request_handler,
                          passthrough_errors, ssl_context,
                          fd=fd)
        if fd is None:
            log_startup(srv.socket)
        srv.serve_forever()

    if use_reloader:
        # If we‘re not running already in the subprocess that is the
        # reloader we want to open up a socket early to make sure the
        # port is actually available.
        if os.environ.get(‘WERKZEUG_RUN_MAIN‘) != ‘true‘:
            if port == 0 and not can_open_by_fd:
                raise ValueError(‘Cannot bind to a random port with enabled ‘
                                 ‘reloader if the Python interpreter does ‘
                                 ‘not support socket opening by fd.‘)

            # Create and destroy a socket so that any exceptions are
            # raised before we spawn a separate Python interpreter and
            # lose this ability.
            address_family = select_ip_version(hostname, port)
            s = socket.socket(address_family, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind((hostname, port))
            if hasattr(s, ‘set_inheritable‘):
                s.set_inheritable(True)

            # If we can open the socket by file descriptor, then we can just
            # reuse this one and our socket will survive the restarts.
            if can_open_by_fd:
                os.environ[‘WERKZEUG_SERVER_FD‘] = str(s.fileno())
                s.listen(LISTEN_QUEUE)
                log_startup(s)
            else:
                s.close()

        from ._reloader import run_with_reloader
        run_with_reloader(inner, extra_files, reloader_interval,
                          reloader_type)
    else:
        inner()

默认情况下会执行inner方法,inner方法创建了一个server并启动,这样一个flask应用算是真正的启动了。那么秘密就在make_server里了
def make_server(host=None, port=None, app=None, threaded=False, processes=1,
                request_handler=None, passthrough_errors=False,
                ssl_context=None, fd=None):
    """Create a new server instance that is either threaded, or forks
    or just processes one request after another.
    """
    if threaded and processes > 1:
        raise ValueError("cannot have a multithreaded and "
                         "multi process server.")
    elif threaded:
        return ThreadedWSGIServer(host, port, app, request_handler,
                                  passthrough_errors, ssl_context, fd=fd)
    elif processes > 1:
        return ForkingWSGIServer(host, port, app, processes, request_handler,
                                 passthrough_errors, ssl_context, fd=fd)
    else:
        return BaseWSGIServer(host, port, app, request_handler,
                              passthrough_errors, ssl_context, fd=fd)

好了,这一下我们一直以来的疑问就找到答案了,原来一个flask应用的server并非只有一种类型,它是可以设定的,默认情况下创建的是一个 BaseWSGIServer,如果指定了threaded参数就启动一个ThreadedWSGIServer,如果设定的processes>1则启动一个ForkingWSGIServer。事已至此,后面的事情就是追本溯源了:
class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer):

    """A WSGI server that does threading."""
    multithread = True
 
ThreadedWSGIServer是ThreadingMixIn和BaseWSGIServer的子类,
class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()

源码写的太明白了,原来是ThreadingMixIn的实例以多线程的方式去处理每一个请求,这样对开发者来说,只有在启动app时将threaded参数设定为True,flask才会真正以多线程的方式去处理每一个请求。  实际去测试一下,发现将threaded设置没True后,果然打印出的每一个_request_ctx_stack都变成不同的对象了。
时间: 2024-08-10 21:20:35

关于flask线程安全的简单研究的相关文章

进程与线程的一些简单理解

一.进程与线程的定义 进程是执行中的程序:程序是一段描述指令的文本,是一个静态的概念,把这段指令运行起来,每次运行就得到了一个进程,进程是动态的概念:操作系统会为进程分配资源. 线程是进程中一段实际执行的代码:它也是一个动态的概念:操作系统调度和分派线程,为线程分配CPU时间片,使其执行. 二.进程与线程的比较 1.从操作系统角度而言 进程是资源分配的基本单位:也就是说操作系统会为不同的进程分配不同的资源,如Window句柄.文件系统句柄.内核对象.虚拟内存等,我们可以说操作系统把Window句

线程池的简单实现

几个基本的线程函数: //线程操纵函数//创建:   int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);//终止自身    void pthread_exit(void *retval);//终止其他:   int pthread_cancel(pthread_t tid); 发送终止信号后目标线程不一定终止,要调用join函数等待//阻塞

一个线程池的简单的实现

线程池实现: 用于执行大量相对短暂的任务 当任务增加的时候能够动态的增加线程池中线程的数量直到达到一个阈值. 当任务执行完毕的时候,能够动态的销毁线程池中的线程 该线程池的实现本质上也是生产者与消费模型的应用.生产者线程向任务队列中添加任务,一旦队列有任务到来,如果有等待线程就唤醒来执行任务,如果没有等待线程并且线程数没有达到阈值,就创建新线程来执行任务. contion.h #ifndef _CONDITION_H_ #define _CONDITION_H_ #include <pthrea

进程与线程的一个简单解释(转)

进程(process)和线程(thread)是操作系统的基本概念,但是它们比较抽象,不容易掌握. 最近,我读到一篇材料,发现有一个很好的类比,可以把它们解释地清晰易懂. 1. 计算机的核心是CPU,它承担了所有的计算任务.它就像一座工厂,时刻在运行. 2. 假定工厂的电力有限,一次只能供给一个车间使用.也就是说,一个车间开工的时候,其他车间都必须停工.背后的含义就是,单个CPU一次只能运行一个任务. 3. 进程就好比工厂的车间,它代表CPU所能处理的单个任务.任一时刻,CPU总是运行一个进程,其

进程和线程的一个简单形象的解释

转眼暑假一过,2015年的校招即将开启大幕,身为计算机专业的朋友们,在面试中是不是经常会被问到一个问题:进程和线程的区别,今日偶然看到阮一峰的博客,他用一个很好的类比把他们解释的清晰易懂,会不会突然让大家有种豁然开朗的感觉呢? 进程(process)和线程(thread)是操作系统的基本概念,但是它们比较抽象,不容易掌握,但下面的类比解释,是不是让你明白很多呢? 1.计算机的核心是CPU,它承担了所有的计算任务.它就像一座工厂,时刻在运行. 2.假定工厂的电力有限,一次只能供给一个车间使用.也就

通过flask实现web页面简单的增删改查

通过flask实现web页面简单的增删改查 # 1.后台程序falsk_web01.py #coding:utf-8 from flask import Flask,render_template,request,redirect import fileutils # 引入file_dict用户列表 fileutils.file_read() app = Flask(__name__) @app.route('/') def index(): return render_template('lo

进程与线程的一个简单解释

作者: 阮一峰 日期: 2013年4月24日 进程(process)和线程(thread)是操作系统的基本概念,但是它们比较抽象,不容易掌握. 最近,我读到一篇材料,发现有一个很好的类比,可以把它们解释地清晰易懂. 1. 计算机的核心是CPU,它承担了所有的计算任务.它就像一座工厂,时刻在运行. 2. 假定工厂的电力有限,一次只能供给一个车间使用.也就是说,一个车间开工的时候,其他车间都必须停工.背后的含义就是,单个CPU一次只能运行一个任务. 3. 进程就好比工厂的车间,它代表CPU所能处理的

利用JAVA线程安全队列简单实现读者写者问题。

常见的操作系统教科书中,会使用互斥锁来实现读者线程和写者线程的同步问题,但是在JDK5推出线程安全队列之后,将该问题变得异常简单. java.util.concurrent.ConcurrentLinkedQueue 是线程安全的非阻塞队列,其实很容易想到,非阻塞队列当线程需要等待的时候,则不会阻塞等待,而是直接根据情况返回. java.util.concurrent.LinkedBlockingQueue 是线程安全的阻塞队列,该队列能够在很多情况下对线程进行阻塞,比如队列为空时调用take(

线程,临界区的研究

线程是进程的一条执行路径,它包含独立的堆栈和CPU寄存器状态,每个线程共享所有的进程资源,包括打开的文件.信号标识及动态分配的内存等.一个进程内的所有线程使用同一个地址空间,而这些线程的执行由系统调度程序控制,调度程序决定哪个线程可执行以及什么时候执行线程.线程有优先级别,优先权较低的线程必须等到优先权较高的线程执行完后再执行.在多处理器的机器上,调度程序可将多个线程放到不同的处理器上去运行,这样可使处理器任务平衡,并提高系统的运行效率. 多线程编程在Win32方式下和MFC类库支持下的原理是一