在前两节中,我们通过一个简单的例子从服务器端的角度简要的研究了twisted的源码。在本节,我们将通过另外一个例子,从客户端的角度研究twisted的相关源码,完整例子在这里
下面是简化的代码:
class PoetryProtocol(Protocol):
poem = ‘‘
def dataReceived(self, data):
self.poem += data
def connectionLost(self, reason):
self.poemReceived(self.poem)
def poemReceived(self, poem):
self.factory.poem_finished(poem)
class PoetryClientFactory(ClientFactory):
protocol = PoetryProtocol
def poem_finished(self, poem=None):
print poem
reactor.stop()
def clientConnectionFailed(self, connector, reason):
print ‘Failed to connect to:‘, connector.getDestination()
self.poem_finished()
def main():
address = parse_args() # 解析地址
factory = PoetryClientFactory()
from twisted.internet import reactor
host, port = address
reactor.connectTCP(host, port, factory)
reactor.run()
if __name__ == "__main__":
main()
我们可以看到,在main函数中,前四行代码与上两节的例子相差不多,但是第五行代码reactor.connectTCP(host, port, factory)
却和上一节的reactor.listenTCP
不一样,在上一节中,我们构建的是一个服务,服务器需要监听所有客户端的连接。而在本节中,我们要实现的是一个客户端,所以需要建立对于服务器端的连接请求。然后启动reactor的事件监听循环。
# /twisted/internet/posixbase.py
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,ReactorBase):
def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
c.connect()
return c
reactor的connectTCP方法是由reactor的基类PosixReactorBase实现的,该方法创建了一个Connector对象,并且调用了该对象的connect方法,并且返回该对象。
# /twisted/internet/tcp.py
class Connector(base.BaseConnector):
_addressType = address.IPv4Address
def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
if isinstance(port, _portNameType):
try:
port = socket.getservbyname(port, ‘tcp‘)
except socket.error as e:
raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
self.host, self.port = host, port
if abstract.isIPv6Address(host):
self._addressType = address.IPv6Address
self.bindAddress = bindAddress
base.BaseConnector.__init__(self, factory, timeout, reactor)
def _makeTransport(self):
"""创建Client对象绑定到Connector对象"""
return Client(self.host, self.port, self.bindAddress, self, self.reactor)
# /twisted/internet/base.py
@implementer(IConnector)
class BaseConnector:
"""Basic implementation of connector.
State can be: "connecting", "connected", "disconnected"
"""
timeoutID = None
factoryStarted = 0
def __init__(self, factory, timeout, reactor):
self.state = "disconnected"
self.reactor = reactor
self.factory = factory
self.timeout = timeout
...
def connect(self):
"""Start connection to remote server."""
if self.state != "disconnected":
raise RuntimeError("can‘t connect in this state")
self.state = "connecting"
if not self.factoryStarted:
self.factory.doStart()
self.factoryStarted = 1
self.transport = transport = self._makeTransport()
if self.timeout is not None:
self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
# 调用factory对象的startedConnecting方法,该方法由用户重载,当连接建立时调用
self.factory.startedConnecting(self)
在connect方法中,调用_makeTransport方法创建了一个Client对象绑定到Connector对象的transport属性上,有前两节我们知道,transport负责底层的数据传输工作,而这个Client对象就是建立到服务器端的连接
# /twisted/internet/tcp.py
class Client(_BaseTCPClient, BaseClient):
"""
A transport for a TCP protocol; either TCPv4 or TCPv6.
Do not create these directly; use L{IReactorTCP.connectTCP}.
"""
Client类是一个拓展的Connection类,它的基类是_BaseTCPClient和BaseClient,而BaseClient的基类是_BaseBaseClient和Connection,Client的构造函数是_BaseTCPClient的构造函数
# /twisted/internet/tcp.py
class _BaseTCPClient(object):
"""
Code shared with other (non-POSIX) reactors for management of outgoing TCP
connections (both TCPv4 and TCPv6).
"""
_addressType = address.IPv4Address
def __init__(self, host, port, bindAddress, connector, reactor=None):
# BaseClient.__init__ is invoked later
self.connector = connector
self.addr = (host, port)
whenDone = self.resolveAddress # IP地址处理方法
err = None
skt = None
if abstract.isIPAddress(host):
self._requiresResolution = False
elif abstract.isIPv6Address(host):
self._requiresResolution = False
self.addr = _resolveIPv6(host, port)
self.addressFamily = socket.AF_INET6
self._addressType = address.IPv6Address
else:
# 如果host参数不是IP地址格式,那么该属性设为True
self._requiresResolution = True
try:
skt = self.createInternetSocket()
# createInternetSocket方法是BaseClient实现的,创建一个socket
except socket.error as se:
err = error.ConnectBindError(se.args[0], se.args[1])
whenDone = None
if whenDone and bindAddress is not None:
# 如果需要绑定地址
try:
if abstract.isIPv6Address(bindAddress[0]):
bindinfo = _resolveIPv6(*bindAddress)
else:
bindinfo = bindAddress
skt.bind(bindinfo)
except socket.error as se:
err = error.ConnectBindError(se.args[0], se.args[1])
whenDone = None
self._finishInit(whenDone, skt, err, reactor)
_finishInit是由基类_BaseBaseClient实现的
# /twisted/internet/tcp.py
class _BaseBaseClient(object):
def _finishInit(self, whenDone, skt, error, reactor):
"""
由子类调用来继续socket连接的初始化工作
"""
if whenDone:
self._commonConnection.__init__(self, skt, None, reactor)
# _commonConnection就是Connection类
# 调用Connection类的构造函数
reactor.callLater(0, whenDone)
# 当reactor启动之后回调whenDone方法
else:
reactor.callLater(0, self.failIfNotConnected, error)
def resolveAddress(self):
"""
如果主机名不是Ip地址的格式,那么就将其转化为Ip地址格式
"""
if self._requiresResolution:
d = self.reactor.resolve(self.addr[0])
d.addCallback(lambda n: (n,) + self.addr[1:])
d.addCallbacks(self._setRealAddress,
self.failIfNotConnected)
else:
self._setRealAddress(self.addr)
def _setRealAddress(self, address):
self.realAddress = address
self.doConnect()
_finishInit进一步完成了Client对象的初始化工作,resolveAddress解决IP地址的问题, reactor.resolve函数负责将将主机名转化为IP地址,该方法返回了一个已经激活的deferred,将立即回调_setRealAddress方法,关于deferred我们放到以后再研究。_setRealAddress中会调用Baseclient的doConnect方法初始化连接。
# /twisted/internet/tcp.py
class BaseClient(_BaseBaseClient, _TLSClientMixin, Connection):
def doConnect(self):
self.doWrite = self.doConnect
self.doRead = self.doConnect
# 将doWrite和doRead置为doConnect方法,防止如果同时又该对象的doRead方法调用使得客户端出现不可预测的情况
if not hasattr(self, "connector"):
return
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) # 检查套接字是否有错误产生
if err:
self.failIfNotConnected(error.getConnectError((err, strerror(err))))
return
# doConnect gets called twice. The first time we actually need to
# start the connection attempt. The second time we don‘t really
# want to (SO_ERROR above will have taken care of any errors, and if
# it reported none, the mere fact that doConnect was called again is
# sufficient to indicate that the connection has succeeded), but it
# is not /particularly/ detrimental to do so. This should get
# cleaned up some day, though.
try:
connectResult = self.socket.connect_ex(self.realAddress)
except socket.error as se:
connectResult = se.args[0]
if connectResult:
# socket正处于连接状态
if connectResult == EISCONN:
pass
# on Windows EINVAL means sometimes that we should keep trying:
# http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
# socket处于阻塞状态或者非法参数的情况
elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
(connectResult == EINVAL and platformType == "win32")):
# select继续读和写事件的监听
self.startReading()
self.startWriting()
return
else:
self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
return
# If I have reached this point without raising or returning, that means
# that the socket is connected.
del self.doWrite
del self.doRead
# we first stop and then start, to reset any references to the old doRead
self.stopReading()
self.stopWriting()
self._connectDone()
socket.connect_ex和socket.connect很像,但是当错误发生时,connect_ex将返回错误码,而不是抛出异常。failIfNotConnected是在_BaseBaseClient实现的
def failIfNotConnected(self, err):
"""
当连接失败是调用,来进行清理工作,调用connectionFailed方法,阻止对该socket的读和写事件的监听
"""
if (self.connected or self.disconnected or
not hasattr(self, "connector")):
return
self._stopReadingAndWriting() # 移除对该对象的读和写事件的监听
try:
self._closeSocket(True)
# 关闭socket,并进行一些清理工作
except AttributeError:
pass
else:
self._collectSocketDetails()
self.connector.connectionFailed(failure.Failure(err))
# 调用connector的connectionFailed方法,该方法会回调factory对象的clientConnectionFailed方法,并进行一些清理工作
del self.connector
failIfNotConnected方法可靠的完成了当客户端连接失败是的清理工作,当doConnect方法执行到最后时,会调用_connectDone方法:
def _connectDone(self):
self.protocol = self.connector.buildProtocol(self.getPeer())
# 创建一个protocol对象
self.connected = 1
logPrefix = self._getLogPrefix(self.protocol)
self.logstr = "%s,client" % logPrefix
if self.protocol is None:
self.protocol = Protocol()
# But dispose of the connection quickly.
self.loseConnection() # 断开连接
else:
self.startReading()
self.protocol.makeConnection(self)
在该方法中,我们创建了Protocol对象,该类由用户自定义,来实现具体的服务。从Client类中,我们可以看到,相比于在上一节我们研究服务器端的时候遇到的Server类(其实就是Connection类),Client比Server多了很多对于连接的检查和错误处理(有一些我们并没有分析到,以后再研究其他部分时会再讲),这对于客户端而言是很重要的。因为服务器端要为许多客户端的连接提供服务,所以如果有一些意外的错误出现,服务器在进行一定的处理之后只能关掉连接然后恢复为其他客户端连接服务,所以服务器端对于错误处理并不如客户端那样严格。而客户端则不一样,首先客户端要确保连接能够成功,如果不成功也要给出正确的关闭和清理措施,然后将错误报告给用户,当遇到错误时也要保证占用的资源能正确释放,所以Client要比Server更加复杂,同时Client也要具有Server提供的基本的传输数据的功能,所以在这里Server是Client的基类
以上,我们对客户端部分的一些关键组件进行了分析,但是客户端要研究的不仅仅只有这么一点,还有很多内容值得我们继续分析