Twisted源码分析3

在前两节中,我们通过一个简单的例子从服务器端的角度简要的研究了twisted的源码。在本节,我们将通过另外一个例子,从客户端的角度研究twisted的相关源码,完整例子在这里

下面是简化的代码:

class PoetryProtocol(Protocol):

    poem = ‘‘

    def dataReceived(self, data):
        self.poem += data

    def connectionLost(self, reason):
        self.poemReceived(self.poem)

    def poemReceived(self, poem):
        self.factory.poem_finished(poem)

class PoetryClientFactory(ClientFactory):

    protocol = PoetryProtocol

    def poem_finished(self, poem=None):
        print poem
        reactor.stop()

    def clientConnectionFailed(self, connector, reason):
        print ‘Failed to connect to:‘, connector.getDestination()
        self.poem_finished()

def main():
    address = parse_args() # 解析地址
    factory = PoetryClientFactory()
    from twisted.internet import reactor
    host, port = address
    reactor.connectTCP(host, port, factory)
    reactor.run()

if __name__ == "__main__":
    main()

我们可以看到,在main函数中,前四行代码与上两节的例子相差不多,但是第五行代码reactor.connectTCP(host, port, factory)却和上一节的reactor.listenTCP不一样,在上一节中,我们构建的是一个服务,服务器需要监听所有客户端的连接。而在本节中,我们要实现的是一个客户端,所以需要建立对于服务器端的连接请求。然后启动reactor的事件监听循环。

# /twisted/internet/posixbase.py

class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,ReactorBase):
    def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
        c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
        c.connect()
        return c

reactor的connectTCP方法是由reactor的基类PosixReactorBase实现的,该方法创建了一个Connector对象,并且调用了该对象的connect方法,并且返回该对象。

# /twisted/internet/tcp.py
class Connector(base.BaseConnector):
    _addressType = address.IPv4Address

    def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
        if isinstance(port, _portNameType):
            try:
                port = socket.getservbyname(port, ‘tcp‘)
            except socket.error as e:
                raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
        self.host, self.port = host, port
        if abstract.isIPv6Address(host):
            self._addressType = address.IPv6Address
        self.bindAddress = bindAddress
        base.BaseConnector.__init__(self, factory, timeout, reactor)

    def _makeTransport(self):
        """创建Client对象绑定到Connector对象"""
        return Client(self.host, self.port, self.bindAddress, self, self.reactor)

# /twisted/internet/base.py
@implementer(IConnector)
class BaseConnector:
    """Basic implementation of connector.
    State can be: "connecting", "connected", "disconnected"
    """
    timeoutID = None
    factoryStarted = 0

    def __init__(self, factory, timeout, reactor):
        self.state = "disconnected"
        self.reactor = reactor
        self.factory = factory
        self.timeout = timeout

    ...

    def connect(self):
        """Start connection to remote server."""
        if self.state != "disconnected":
            raise RuntimeError("can‘t connect in this state")

        self.state = "connecting"
        if not self.factoryStarted:
            self.factory.doStart()
            self.factoryStarted = 1
        self.transport = transport = self._makeTransport()
        if self.timeout is not None:
            self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
        # 调用factory对象的startedConnecting方法,该方法由用户重载,当连接建立时调用
        self.factory.startedConnecting(self)

在connect方法中,调用_makeTransport方法创建了一个Client对象绑定到Connector对象的transport属性上,有前两节我们知道,transport负责底层的数据传输工作,而这个Client对象就是建立到服务器端的连接

# /twisted/internet/tcp.py
class Client(_BaseTCPClient, BaseClient):
    """
    A transport for a TCP protocol; either TCPv4 or TCPv6.
    Do not create these directly; use L{IReactorTCP.connectTCP}.
    """

Client类是一个拓展的Connection类,它的基类是_BaseTCPClient和BaseClient,而BaseClient的基类是_BaseBaseClient和Connection,Client的构造函数是_BaseTCPClient的构造函数

# /twisted/internet/tcp.py
class _BaseTCPClient(object):
    """
    Code shared with other (non-POSIX) reactors for management of outgoing TCP
    connections (both TCPv4 and TCPv6).
    """

    _addressType = address.IPv4Address

    def __init__(self, host, port, bindAddress, connector, reactor=None):
        # BaseClient.__init__ is invoked later
        self.connector = connector
        self.addr = (host, port)

        whenDone = self.resolveAddress # IP地址处理方法
        err = None
        skt = None

        if abstract.isIPAddress(host):
            self._requiresResolution = False
        elif abstract.isIPv6Address(host):
            self._requiresResolution = False
            self.addr = _resolveIPv6(host, port)
            self.addressFamily = socket.AF_INET6
            self._addressType = address.IPv6Address
        else:
            # 如果host参数不是IP地址格式,那么该属性设为True
            self._requiresResolution = True
        try:
            skt = self.createInternetSocket()
            # createInternetSocket方法是BaseClient实现的,创建一个socket
        except socket.error as se:
            err = error.ConnectBindError(se.args[0], se.args[1])
            whenDone = None
        if whenDone and bindAddress is not None:
            # 如果需要绑定地址
            try:
                if abstract.isIPv6Address(bindAddress[0]):
                    bindinfo = _resolveIPv6(*bindAddress)
                else:
                    bindinfo = bindAddress
                skt.bind(bindinfo)
            except socket.error as se:
                err = error.ConnectBindError(se.args[0], se.args[1])
                whenDone = None
        self._finishInit(whenDone, skt, err, reactor)

_finishInit是由基类_BaseBaseClient实现的

# /twisted/internet/tcp.py

class _BaseBaseClient(object):
    def _finishInit(self, whenDone, skt, error, reactor):
        """
        由子类调用来继续socket连接的初始化工作
        """
        if whenDone:
            self._commonConnection.__init__(self, skt, None, reactor)
            # _commonConnection就是Connection类
            # 调用Connection类的构造函数
            reactor.callLater(0, whenDone)
            # 当reactor启动之后回调whenDone方法
        else:
            reactor.callLater(0, self.failIfNotConnected, error)

    def resolveAddress(self):
        """
        如果主机名不是Ip地址的格式,那么就将其转化为Ip地址格式
        """
        if self._requiresResolution:
            d = self.reactor.resolve(self.addr[0])
            d.addCallback(lambda n: (n,) + self.addr[1:])
            d.addCallbacks(self._setRealAddress,
            self.failIfNotConnected)
        else:
            self._setRealAddress(self.addr)

    def _setRealAddress(self, address):
        self.realAddress = address
        self.doConnect()

_finishInit进一步完成了Client对象的初始化工作,resolveAddress解决IP地址的问题, reactor.resolve函数负责将将主机名转化为IP地址,该方法返回了一个已经激活的deferred,将立即回调_setRealAddress方法,关于deferred我们放到以后再研究。_setRealAddress中会调用Baseclient的doConnect方法初始化连接。

# /twisted/internet/tcp.py

class BaseClient(_BaseBaseClient, _TLSClientMixin, Connection):
    def doConnect(self):
        self.doWrite = self.doConnect
        self.doRead = self.doConnect
        # 将doWrite和doRead置为doConnect方法,防止如果同时又该对象的doRead方法调用使得客户端出现不可预测的情况
        if not hasattr(self, "connector"):
            return

        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) # 检查套接字是否有错误产生
        if err:
            self.failIfNotConnected(error.getConnectError((err, strerror(err))))
            return

        # doConnect gets called twice.  The first time we actually need to
        # start the connection attempt.  The second time we don‘t really
        # want to (SO_ERROR above will have taken care of any errors, and if
        # it reported none, the mere fact that doConnect was called again is
        # sufficient to indicate that the connection has succeeded), but it
        # is not /particularly/ detrimental to do so.  This should get
        # cleaned up some day, though.
        try:
            connectResult = self.socket.connect_ex(self.realAddress)
        except socket.error as se:
            connectResult = se.args[0]
        if connectResult:
            # socket正处于连接状态
            if connectResult == EISCONN:
                pass
            # on Windows EINVAL means sometimes that we should keep trying:
            # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
            # socket处于阻塞状态或者非法参数的情况
            elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
                  (connectResult == EINVAL and platformType == "win32")):
                # select继续读和写事件的监听
                self.startReading()
                self.startWriting()
                return
            else:
                self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
                return

        # If I have reached this point without raising or returning, that means
        # that the socket is connected.
        del self.doWrite
        del self.doRead
        # we first stop and then start, to reset any references to the old doRead
        self.stopReading()
        self.stopWriting()
        self._connectDone()

socket.connect_ex和socket.connect很像,但是当错误发生时,connect_ex将返回错误码,而不是抛出异常。failIfNotConnected是在_BaseBaseClient实现的

def failIfNotConnected(self, err):
        """
        当连接失败是调用,来进行清理工作,调用connectionFailed方法,阻止对该socket的读和写事件的监听
        """
        if (self.connected or self.disconnected or
            not hasattr(self, "connector")):
            return

        self._stopReadingAndWriting() # 移除对该对象的读和写事件的监听
        try:
            self._closeSocket(True)
            # 关闭socket,并进行一些清理工作
        except AttributeError:
            pass
        else:
            self._collectSocketDetails()
        self.connector.connectionFailed(failure.Failure(err))
        # 调用connector的connectionFailed方法,该方法会回调factory对象的clientConnectionFailed方法,并进行一些清理工作
        del self.connector

failIfNotConnected方法可靠的完成了当客户端连接失败是的清理工作,当doConnect方法执行到最后时,会调用_connectDone方法:

def _connectDone(self):
        self.protocol = self.connector.buildProtocol(self.getPeer())
        # 创建一个protocol对象
        self.connected = 1
        logPrefix = self._getLogPrefix(self.protocol)
        self.logstr = "%s,client" % logPrefix
        if self.protocol is None:
            self.protocol = Protocol()
            # But dispose of the connection quickly.
            self.loseConnection() # 断开连接
        else:
            self.startReading()
            self.protocol.makeConnection(self)

在该方法中,我们创建了Protocol对象,该类由用户自定义,来实现具体的服务。从Client类中,我们可以看到,相比于在上一节我们研究服务器端的时候遇到的Server类(其实就是Connection类),Client比Server多了很多对于连接的检查和错误处理(有一些我们并没有分析到,以后再研究其他部分时会再讲),这对于客户端而言是很重要的。因为服务器端要为许多客户端的连接提供服务,所以如果有一些意外的错误出现,服务器在进行一定的处理之后只能关掉连接然后恢复为其他客户端连接服务,所以服务器端对于错误处理并不如客户端那样严格。而客户端则不一样,首先客户端要确保连接能够成功,如果不成功也要给出正确的关闭和清理措施,然后将错误报告给用户,当遇到错误时也要保证占用的资源能正确释放,所以Client要比Server更加复杂,同时Client也要具有Server提供的基本的传输数据的功能,所以在这里Server是Client的基类



以上,我们对客户端部分的一些关键组件进行了分析,但是客户端要研究的不仅仅只有这么一点,还有很多内容值得我们继续分析

时间: 2024-10-15 04:39:40

Twisted源码分析3的相关文章

Twisted源码分析系列01-reactor

转载自:http://www.jianshu.com/p/26ae331b09b0 简介 Twisted是用Python实现的事件驱动的网络框架. 如果想看教程的话,我觉得写得最好的就是Twisted Introduction了,这是翻译. 下面就直接进入主题了. 我们通过一个示例开始分析源码,那么先看下面这个示例. #!/usr/bin/env python # coding=utf8 from twisted.internet.protocol import Protocol, Server

Twisted源码分析1

Twisted是用python编写的事件驱动的网络框架,虽然Twisted从发布到现在已经有不少年头了,而且现在也出现了不少新的高性能异步I/O框架,比如说tornado,但是Twisted任然具有很好的学习价值.如果想要看Twisted的教程的话,Twisted有着非常好的教程Twisted introduction,这个是翻译 现在进入正题 我们通过一个简单的例子来开始我们的分析 from twisted.internet.protocol import ServerFactory, Pro

爬虫5 scrapy框架2 全站爬取cnblogs, scarpy请求传参, 提高爬取效率, 下载中间件, 集成selenium, fake-useragent, 去重源码分析, 布隆过滤器, 分布式爬虫, java等语言概念补充, bilibili爬视频参考

1 全站爬取cnblogs # 1 scrapy startproject cnblogs_crawl # 2 scrapy genspider cnblogs www.cnblogs.com 示例: # cnblogs_crawl/cnblogs_crawl/spiders/cnblogs.py import scrapy from cnblogs_crawl.items import CnblogsCrawlItem from scrapy.http import Request class

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇