Twisted源码分析1

Twisted是用python编写的事件驱动的网络框架,虽然Twisted从发布到现在已经有不少年头了,而且现在也出现了不少新的高性能异步I/O框架,比如说tornado,但是Twisted任然具有很好的学习价值。如果想要看Twisted的教程的话,Twisted有着非常好的教程Twisted introduction,这个是翻译



现在进入正题

我们通过一个简单的例子来开始我们的分析

from twisted.internet.protocol import ServerFactory, Protocol

class PoetryProtocol(Protocol):

    def connectionMade(self):
        self.transport.write(self.factory.poem)
        self.transport.loseConnection()

class PoetryFactory(ServerFactory):

    protocol = PoetryProtocol

    def __init__(self, poem):
        self.poem = poem

def main():
    options, poetry_file = parse_args()

    poem = open(poetry_file).read()

    factory = PoetryFactory(poem)

    from twisted.internet import reactor

    port = reactor.listenTCP(options.port or 0, factory,
                             interface=options.iface)

    print ‘Serving %s on %s.‘ % (poetry_file, port.getHost())

    reactor.run()

排版需要,这里仅仅列出一部分代码,全部代码详见这里

这是一个非常简单的服务器,每当有客户端连接时,就向客户端发送一首诗歌的全部内容,然后断开连接,在这里我们仅仅关注reactor。reactor是事件循环管理器,用于注册,运行,销毁事件,以及当事件发生时调用回调函数。我们需要注意,reactor循环是在主进程中运行,也就是调用reactor.run()的进程中运行,一但循环开始运行,就会一直运行下去,知道调用reactor.stop()方法停止。在Twisted中,reactor是单例模式,当你首次导入reactor模块的时候就会创建它,接下来你在应用中的其他地方导入reactor时将返回第一次创建的对象

from twisted.internet import reactor

上面引入的方式是Twisted的默认方法,然我们来看看这段代码是如何实现单例模式的

# /twisted/internet/reactor.py
from __future__ import division, absolute_import

import sys
del sys.modules[‘twisted.internet.reactor‘]
from twisted.internet import default
default.install()

当第一次导入时,首先删除模块字典中的“twisted.internet.reactor”的值(如果它存在的话),然后安装默认的reactor。sys.modules是一个模块名和模块对象匹配的全局字典,当import一个模块时会检查这个字典,如果加载了只是将模块的名字加入到导入该模块的模块的命名空间中,如果没有加载就从sys.path目录中按照模块名称查找模块文件,然后将模块导入内存,将模块名和模块对象映射加入到字典中,在将名称导入到导入该模块的模块的命名空间中,那么default.py中代码为:

# /twisted/internet/default.py

from __future__ import division, absolute_import

__all__ = ["install"]

from twisted.python.runtime import platform

def _getInstallFunction(platform):
    try:
        if platform.isLinux():
            try:
                from twisted.internet.epollreactor import install
            except ImportError:
                from twisted.internet.pollreactor import install
        elif platform.getType() == ‘posix‘ and not platform.isMacOSX():
            from twisted.internet.pollreactor import install
        else:
            from twisted.internet.selectreactor import install
    except ImportError:
        from twisted.internet.selectreactor import install
    return install

install = _getInstallFunction(platform)

这里会根据平台来选择相应的reactor,如果在linux下优先使用epollreactor,如果抛出异常那么使用pollreactor或者是selectreactor,如果是windows则使用selectreactor。我们在这里研究pollreactor

# /twisted/internet/pollreactor.py

def install():
    """Install the poll() reactor."""
    p = PollReactor()
    from twisted.internet.main import installReactor
    installReactor(p)

# /twisted/internet/main.py

def installReactor(reactor):
    """
    Install reactor C{reactor}.
    @param reactor: An object that provides one or more IReactor* interfaces.
    """
    # this stuff should be common to all reactors.
    import twisted.internet
    import sys
    if ‘twisted.internet.reactor‘ in sys.modules:
        raise error.ReactorAlreadyInstalledError("reactor already installed")
    twisted.internet.reactor = reactor
    sys.modules[‘twisted.internet.reactor‘] = reactor

在这里,将reactor赋值给twisted.internet.reactor对象,并且将reactor对象赋给模块字典的“twisted.internet.reactor”键,以后再导入reactor,就会导入这个单例了

# /twisted/internet/pollreactor.py

@implementer(IReactorFDSet)
class PollReactor(posixbase.PosixReactorBase, posixbase._PollLikeMixin):

    _POLL_DISCONNECTED = (POLLHUP | POLLERR | POLLNVAL)
    # POLLHUP 连接挂起
    # POLLNVAL 非法请求:文件描述符无法打开
    # POLLERR 连接出现错误
    _POLL_IN = POLLIN # 代表有数据可读
    _POLL_OUT = POLLOUT # 代表有数据可写,并且没有阻塞

    def __init__(self):
        """
        初始化polling对象,文件描述符追踪字典,以及基类
        """
        self._poller = poll() # poll调用
        self._selectables = {}
        self._reads = {}
        self._writes = {}
        posixbase.PosixReactorBase.__init__(self)

    def _updateRegistration(self, fd):
        """
        更新polling对象对文件描述符状态的追踪
        """
        try:
            self._poller.unregister(fd)
            # 移除被polling对象追踪的文件描述符
        except KeyError:
            pass

        mask = 0
        if fd in self._reads:
            mask = mask | POLLIN
        if fd in self._writes:
            mask = mask | POLLOUT
        if mask != 0:
            self._poller.register(fd, mask)
        else:
            if fd in self._selectables:
                del self._selectables[fd]

    def _dictRemove(self, selectable, mdict):
        try:
            # the easy way
            fd = selectable.fileno()
            # 确保文件描述符是真实的
            mdict[fd]
        except:
            for fd, fdes in self._selectables.items():
                if selectable is fdes:
                    break
            else:
                return
        if fd in mdict:
            del mdict[fd]
            self._updateRegistration(fd)

    def addReader(self, reader):
        fd = reader.fileno()
        if fd not in self._reads:
            self._selectables[fd] = reader
            self._reads[fd] =  1
            self._updateRegistration(fd)

    def addWriter(self, writer):
        """Add a FileDescriptor for notification of data available to write.
        """
        fd = writer.fileno()
        if fd not in self._writes:
            self._selectables[fd] = writer
            self._writes[fd] =  1
            self._updateRegistration(fd)

    def removeReader(self, reader):
        return self._dictRemove(reader, self._reads)

    def removeWriter(self, writer):
        return self._dictRemove(writer, self._writes)

    def removeAll(self):
        return self._removeAll(
            [self._selectables[fd] for fd in self._reads],
            [self._selectables[fd] for fd in self._writes])

    # 这里是重点
    def doPoll(self, timeout):
        """Poll the poller for new events."""
        if timeout is not None:
            timeout = int(timeout * 1000) # convert seconds to milliseconds

        try:
            l = self._poller.poll(timeout)
            # 返回一组可能为空的文件描述符-事件二元组,文件描
            # 述符代表当前有事件发生的socket对象,event代表
            # 事件的种类,可能为上面定义的POLLIN,POLLOUT等
            # 中的一种
        except SelectError as e:
            if e.args[0] == errno.EINTR:
                # 系统调用被打断
                return
            else:
                # 直接抛出异常
                raise
        _drdw = self._doReadOrWrite
        for fd, event in l:
            try:
                selectable = self._selectables[fd]
            except KeyError:
                continue
            log.callWithLogger(selectable, _drdw, selectable, fd, event)

    doIteration = doPoll # 会被mainloop函数调用,实现事务监听循环

    def getReaders(self):
        return [self._selectables[fd] for fd in self._reads]

    def getWriters(self):
        return [self._selectables[fd] for fd in self._writes]

implementer表示PollReactor实现了IReactorFDSet的接口的方法:

/twisted/internet/interfaces.py

_doReadOrWrite方法的实现在pollreactor的基类_pollLikeMixin中:

# twisted/internet/posixbase.py

class _PollLikeMixin(object):
    """
    Mixin for poll-like reactors.
    Subclasses must define the following attributes::
      - _POLL_DISCONNECTED - Bitmask for events indicating a connection was
        lost.
      - _POLL_IN - Bitmask for events indicating there is input to read.
      - _POLL_OUT - Bitmask for events indicating output can be written.
    Must be mixed in to a subclass of PosixReactorBase (for
    _disconnectSelectable).
    """

    def _doReadOrWrite(self, selectable, fd, event):
        """
       文件描述符要可读,可写,能够完成工作并且能在必要时抛出异常
        """
        why = None
        inRead = False
        if event & self._POLL_DISCONNECTED and not (event & self._POLL_IN):
            # 处理断开的连接,只有当我们已经完成处理所有未决的输入时
            if fd in self._reads:
                # 表明不会再有读事件,即读取数据已经完毕,
                # 并且有可能传输的另一边已经断开连接
                inRead = True
                why = CONNECTION_DONE
            else:
                # 如果我们没有从这个描述符中读取数据,
                # 那么只有可能是一个错误的关闭
                why = CONNECTION_LOST
        else:
            try:
                if selectable.fileno() == -1:
                    # 表明这个socket已经被关闭
                    why = _NO_FILEDESC
                else:
                    if event & self._POLL_IN:
                        # 处理读事件
                        why = selectable.doRead()
                        inRead = True
                    if not why and event & self._POLL_OUT:
                        # 处理写事件,读事件的eventmask是1,写事
                        # 件的eventmask是4,二者叠加是5,5的
                        # 话一般指连接关闭(这个只是我的个人理解,
                        # 我在做实验的时候发现连接关闭时响应事件
                        # 的eventmask为5),所以这里要检测只有
                        # 写事件出现,没有读事件出现
                        why = selectable.doWrite()
                        inRead = False
            except:
                # Any exception from application code gets logged and will
                # cause us to disconnect the selectable.
                why = sys.exc_info()[1]
                log.err()
        if why:
            # 处理关闭的连接
            self._disconnectSelectable(selectable, why, inRead)

_doReadOrWrite方法将根据各个socket发生的事件来调用响应的doRead,doWrite方法,或者关闭连接和报错

port = reactor.listenTCP(options.port or 0, factory,
                             interface=options.iface)

这里reactor监听了一个端口,这个方法在reactor的基类PosixReactorBase中实现:

# /twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,ReactorBase):

    def listenTCP(self, port, factory, backlog=50, interface=‘‘):
        p = tcp.Port(port, factory, backlog, interface, self)
        p.startListening()
        return p

# /twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
    def __init__(self, port, factory, backlog=50, interface=‘‘, reactor=None):
        """Initialize with a numeric port to listen on.
        """
        base.BasePort.__init__(self, reactor=reactor)
        self.port = port
        self.factory = factory
        self.backlog = backlog
        if abstract.isIPv6Address(interface):
            self.addressFamily = socket.AF_INET6
            self._addressType = address.IPv6Address
        self.interface = interface

    def startListening(self):
        """创建和绑定socket,然后启动侦听"""
        # 看看可否复用之前创建的socket
        if self._preexistingSocket is None:
            try:
                skt = self.createInternetSocket()
                if self.addressFamily == socket.AF_INET6:
                    addr = _resolveIPv6(self.interface, self.port)
                else:
                    addr = (self.interface, self.port)
                skt.bind(addr)
            except socket.error as le:
                raise CannotListenError(self.interface, self.port, le)
            skt.listen(self.backlog)
        else:
            skt = self._preexistingSocket
            self._preexistingSocket = None
            self._shouldShutdown = False

        # Make sure that if we listened on port 0, we update that to
        # reflect what the OS actually assigned us.
        self._realPortNumber = skt.getsockname()[1]

        log.msg("%s starting on %s" % (
                self._getLogPrefix(self.factory), self._realPortNumber))

        # The order of the next 5 lines is kind of bizarre.  If no one
        # can explain it, perhaps we should re-arrange them.
        self.factory.doStart() # 启动工厂
        self.connected = True
        self.socket = skt
        self.fileno = self.socket.fileno
        self.numberAccepts = 100

        self.startReading()
        # 将该对象添加到reactor的polling对象的跟踪列表中

在这里,listenTCP创建一个监听某个端口的socket,并且将其添加到reactor的polling对象的跟踪列表中,一旦有客户端访问该服务器,这个reactor就会监控到,并且处理它。listenTCP返回一个Port对象,当客户端有连接请求时,便会调用doRead方法:

# twisted/internet/tcp.py

def doRead(self):
        try:
            if platformType == "posix":
                numAccepts = self.numberAccepts
            else:
                # win32下只能调用一次socket.accept方法
                numAccepts = 1
            for i in range(numAccepts):
                if self.disconnecting:
                    return
                try:
                    skt, addr = self.socket.accept()
                    # 获得客户端连接的socket
                except socket.error as e:
                    if e.args[0] in (EWOULDBLOCK, EAGAIN):
                        # EWOULDBLOCK 操作阻塞
                        # EAGAIN 再次尝试
                        self.numberAccepts = i
                        break
                    elif e.args[0] == EPERM:
                        # 操作不允许
                        # Netfilter on Linux may have rejected the
                        # connection, but we get told to try to accept()
                        # anyway.
                        continue
                    elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
                        # EMFILE 过多的文件描述符
                        # ENOBUFS 缓存区不足
                        # ENFILE 文件表溢出

                        # Linux gives EMFILE when a process is not allowed
                        # to allocate any more file descriptors.  *BSD and
                        # Win32 give (WSA)ENOBUFS.  Linux can also give
                        # ENFILE if the system is out of inodes, or ENOMEM
                        # if there is insufficient memory to allocate a new
                        # dentry.  ECONNABORTED is documented as possible on
                        # both Linux and Windows, but it is not clear
                        # whether there are actually any circumstances under
                        # which it can happen (one might expect it to be
                        # possible if a client sends a FIN or RST after the
                        # server sends a SYN|ACK but before application code
                        # calls accept(2), however at least on Linux this
                        # _seems_ to be short-circuited by syncookies.

                        log.msg("Could not accept new connection (%s)" % (
                            errorcode[e.args[0]],))
                        break
                    raise

                fdesc._setCloseOnExec(skt.fileno())
                protocol = self.factory.buildProtocol(self._buildAddr(addr))
                if protocol is None:
                    skt.close()
                    continue
                s = self.sessionno
                self.sessionno = s+1
                transport = self.transport(skt, protocol, addr, self, s, self.reactor)
                protocol.makeConnection(transport)
            else:
                self.numberAccepts = self.numberAccepts+20
        except:
            log.deferr()

在doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后创建Protocol对象,然后把transport加入到reactor的读集合。

protocol = self.factory.buildProtocol(self._buildAddr(addr))
transport = self.transport(skt, protocol, addr, self, s, self.reactor)
protocol.makeConnection(transport)

factory对象(Factory以后再讲)调用buildProtocol方法创建了我们自定义的Protocol类对象,然后创建transport之后调用了protocol的makeConnection方法,改方法的实现在其父类BaseProtocol中:

# /twisted/internet/protocol.py
class BaseProtocol:
    connected = 0
    transport = None

    def makeConnection(self, transport):
        self.connected = 1
        self.transport = transport
        self.connectionMade()

    def connectionMade(self):
        """Called when a connection is made.
        This may be considered the initializer of the protocol, because
        it is called when the connection is completed.  For clients,
        this is called once the connection to the server has been
        established; for servers, this is called after an accept() call
        stops blocking and a socket has been received.  If you need to
        send any greeting or initial message, do it here.
        """

在这里它调用了我们自定义的makeConnection方法,这样服务器端和客户端就可以进行数据传输了

当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。而Connection是transport实例的类的父类,它实现了doRead方法:

# /twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
                 _AbortingMixin):
     def doRead(self):
        try:
            data = self.socket.recv(self.bufferSize)
        except socket.error as se:
            if se.args[0] == EWOULDBLOCK:
                # 如果被阻塞直接返回
                return
            else:
                # 断开连接
                return main.CONNECTION_LOST

        return self._dataReceived(data)

    def _dataReceived(self, data):
        if not data:
            return main.CONNECTION_DONE
        rval = self.protocol.dataReceived(data)
        if rval is not None:
            offender = self.protocol.dataReceived
            warningFormat = (
                ‘Returning a value other than None from %(fqpn)s is ‘
                ‘deprecated since %(version)s.‘)
            warningString = deprecate.getDeprecationWarningString(
                offender, versions.Version(‘Twisted‘, 11, 0, 0),
                format=warningFormat)
            deprecate.warnAboutFunction(offender, warningString)
        return rval

_dataReceived方法将调用我们重写的protocol的dataReceived方法处理数据

reactor.run()方法的是reactor的基类_SignalReactorMixin实现的:

class _SignalReactorMixin(object):
    def startRunning(self, installSignalHandlers=True):
        self._installSignalHandlers = installSignalHandlers
        ReactorBase.startRunning(self)

    def run(self, installSignalHandlers=True):
    self.startRunning(installSignalHandlers=installSignalHandlers)
        self.mainLoop()

    def mainLoop(self):
        while self._started:
            try:
                while self._started:
                    # Advance simulation time in delayed event
                    # processors.
                    self.runUntilCurrent()
                    t2 = self.timeout()
                    t = self.running and t2
                    self.doIteration(t)
            except:
                log.msg("Unexpected error in main loop.")
                log.err()
            else:
                log.msg(‘Main loop terminated.‘)

pollreactor的基类PosixReactorBase有两个基类,分别是_SignalReactorMixin和ReactorBase,由于_SignalReactorMixin和ReactorBase都实现了startRunning方法,所以根据继承的mro顺序的话,会先调用_SignalReactorMixin的,这样的话需要在最后再调用ReactorBase的startRunning方法

在这里mainloop将启动主循环了,mainloop调用上面讲的doIterarion方法来监控一组描述符,一旦有事件准备好读写,就调用事件处理程序来处理。

以上就是对于这个简单例子的简要分析,从创建事件监听循环,到与客户端建立连接。有一些细节我并没有做出说明(因为我也是边阅读源码边写博客),如果有兴趣可以仔细阅读源码,本文难免存在疏漏和错误,欢迎读者给与指正。因为要考研,我的时间并不是很多,但我至少每个月会写一篇这样的文章



参考:

1.http://www.jianshu.com/p/26ae331b09b0

2.https://docs.python.org/2/library/errno.html?highlight=errno#module-errno

3.https://github.com/twisted/twisted/tree/trunk/twisted

4.https://docs.python.org/2/library/select.html?highlight=select#module-select

时间: 2024-10-07 21:56:21

Twisted源码分析1的相关文章

Twisted源码分析系列01-reactor

转载自:http://www.jianshu.com/p/26ae331b09b0 简介 Twisted是用Python实现的事件驱动的网络框架. 如果想看教程的话,我觉得写得最好的就是Twisted Introduction了,这是翻译. 下面就直接进入主题了. 我们通过一个示例开始分析源码,那么先看下面这个示例. #!/usr/bin/env python # coding=utf8 from twisted.internet.protocol import Protocol, Server

Twisted源码分析3

在前两节中,我们通过一个简单的例子从服务器端的角度简要的研究了twisted的源码.在本节,我们将通过另外一个例子,从客户端的角度研究twisted的相关源码,完整例子在这里 下面是简化的代码: class PoetryProtocol(Protocol): poem = '' def dataReceived(self, data): self.poem += data def connectionLost(self, reason): self.poemReceived(self.poem)

爬虫5 scrapy框架2 全站爬取cnblogs, scarpy请求传参, 提高爬取效率, 下载中间件, 集成selenium, fake-useragent, 去重源码分析, 布隆过滤器, 分布式爬虫, java等语言概念补充, bilibili爬视频参考

1 全站爬取cnblogs # 1 scrapy startproject cnblogs_crawl # 2 scrapy genspider cnblogs www.cnblogs.com 示例: # cnblogs_crawl/cnblogs_crawl/spiders/cnblogs.py import scrapy from cnblogs_crawl.items import CnblogsCrawlItem from scrapy.http import Request class

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇