Twisted是用python编写的事件驱动的网络框架,虽然Twisted从发布到现在已经有不少年头了,而且现在也出现了不少新的高性能异步I/O框架,比如说tornado,但是Twisted任然具有很好的学习价值。如果想要看Twisted的教程的话,Twisted有着非常好的教程Twisted introduction,这个是翻译
现在进入正题
我们通过一个简单的例子来开始我们的分析
from twisted.internet.protocol import ServerFactory, Protocol
class PoetryProtocol(Protocol):
def connectionMade(self):
self.transport.write(self.factory.poem)
self.transport.loseConnection()
class PoetryFactory(ServerFactory):
protocol = PoetryProtocol
def __init__(self, poem):
self.poem = poem
def main():
options, poetry_file = parse_args()
poem = open(poetry_file).read()
factory = PoetryFactory(poem)
from twisted.internet import reactor
port = reactor.listenTCP(options.port or 0, factory,
interface=options.iface)
print ‘Serving %s on %s.‘ % (poetry_file, port.getHost())
reactor.run()
排版需要,这里仅仅列出一部分代码,全部代码详见这里
这是一个非常简单的服务器,每当有客户端连接时,就向客户端发送一首诗歌的全部内容,然后断开连接,在这里我们仅仅关注reactor。reactor是事件循环管理器,用于注册,运行,销毁事件,以及当事件发生时调用回调函数。我们需要注意,reactor循环是在主进程中运行,也就是调用reactor.run()的进程中运行,一但循环开始运行,就会一直运行下去,知道调用reactor.stop()方法停止。在Twisted中,reactor是单例模式,当你首次导入reactor模块的时候就会创建它,接下来你在应用中的其他地方导入reactor时将返回第一次创建的对象
from twisted.internet import reactor
上面引入的方式是Twisted的默认方法,然我们来看看这段代码是如何实现单例模式的
# /twisted/internet/reactor.py
from __future__ import division, absolute_import
import sys
del sys.modules[‘twisted.internet.reactor‘]
from twisted.internet import default
default.install()
当第一次导入时,首先删除模块字典中的“twisted.internet.reactor”的值(如果它存在的话),然后安装默认的reactor。sys.modules是一个模块名和模块对象匹配的全局字典,当import一个模块时会检查这个字典,如果加载了只是将模块的名字加入到导入该模块的模块的命名空间中,如果没有加载就从sys.path目录中按照模块名称查找模块文件,然后将模块导入内存,将模块名和模块对象映射加入到字典中,在将名称导入到导入该模块的模块的命名空间中,那么default.py中代码为:
# /twisted/internet/default.py
from __future__ import division, absolute_import
__all__ = ["install"]
from twisted.python.runtime import platform
def _getInstallFunction(platform):
try:
if platform.isLinux():
try:
from twisted.internet.epollreactor import install
except ImportError:
from twisted.internet.pollreactor import install
elif platform.getType() == ‘posix‘ and not platform.isMacOSX():
from twisted.internet.pollreactor import install
else:
from twisted.internet.selectreactor import install
except ImportError:
from twisted.internet.selectreactor import install
return install
install = _getInstallFunction(platform)
这里会根据平台来选择相应的reactor,如果在linux下优先使用epollreactor,如果抛出异常那么使用pollreactor或者是selectreactor,如果是windows则使用selectreactor。我们在这里研究pollreactor
# /twisted/internet/pollreactor.py
def install():
"""Install the poll() reactor."""
p = PollReactor()
from twisted.internet.main import installReactor
installReactor(p)
# /twisted/internet/main.py
def installReactor(reactor):
"""
Install reactor C{reactor}.
@param reactor: An object that provides one or more IReactor* interfaces.
"""
# this stuff should be common to all reactors.
import twisted.internet
import sys
if ‘twisted.internet.reactor‘ in sys.modules:
raise error.ReactorAlreadyInstalledError("reactor already installed")
twisted.internet.reactor = reactor
sys.modules[‘twisted.internet.reactor‘] = reactor
在这里,将reactor赋值给twisted.internet.reactor对象,并且将reactor对象赋给模块字典的“twisted.internet.reactor”键,以后再导入reactor,就会导入这个单例了
# /twisted/internet/pollreactor.py
@implementer(IReactorFDSet)
class PollReactor(posixbase.PosixReactorBase, posixbase._PollLikeMixin):
_POLL_DISCONNECTED = (POLLHUP | POLLERR | POLLNVAL)
# POLLHUP 连接挂起
# POLLNVAL 非法请求:文件描述符无法打开
# POLLERR 连接出现错误
_POLL_IN = POLLIN # 代表有数据可读
_POLL_OUT = POLLOUT # 代表有数据可写,并且没有阻塞
def __init__(self):
"""
初始化polling对象,文件描述符追踪字典,以及基类
"""
self._poller = poll() # poll调用
self._selectables = {}
self._reads = {}
self._writes = {}
posixbase.PosixReactorBase.__init__(self)
def _updateRegistration(self, fd):
"""
更新polling对象对文件描述符状态的追踪
"""
try:
self._poller.unregister(fd)
# 移除被polling对象追踪的文件描述符
except KeyError:
pass
mask = 0
if fd in self._reads:
mask = mask | POLLIN
if fd in self._writes:
mask = mask | POLLOUT
if mask != 0:
self._poller.register(fd, mask)
else:
if fd in self._selectables:
del self._selectables[fd]
def _dictRemove(self, selectable, mdict):
try:
# the easy way
fd = selectable.fileno()
# 确保文件描述符是真实的
mdict[fd]
except:
for fd, fdes in self._selectables.items():
if selectable is fdes:
break
else:
return
if fd in mdict:
del mdict[fd]
self._updateRegistration(fd)
def addReader(self, reader):
fd = reader.fileno()
if fd not in self._reads:
self._selectables[fd] = reader
self._reads[fd] = 1
self._updateRegistration(fd)
def addWriter(self, writer):
"""Add a FileDescriptor for notification of data available to write.
"""
fd = writer.fileno()
if fd not in self._writes:
self._selectables[fd] = writer
self._writes[fd] = 1
self._updateRegistration(fd)
def removeReader(self, reader):
return self._dictRemove(reader, self._reads)
def removeWriter(self, writer):
return self._dictRemove(writer, self._writes)
def removeAll(self):
return self._removeAll(
[self._selectables[fd] for fd in self._reads],
[self._selectables[fd] for fd in self._writes])
# 这里是重点
def doPoll(self, timeout):
"""Poll the poller for new events."""
if timeout is not None:
timeout = int(timeout * 1000) # convert seconds to milliseconds
try:
l = self._poller.poll(timeout)
# 返回一组可能为空的文件描述符-事件二元组,文件描
# 述符代表当前有事件发生的socket对象,event代表
# 事件的种类,可能为上面定义的POLLIN,POLLOUT等
# 中的一种
except SelectError as e:
if e.args[0] == errno.EINTR:
# 系统调用被打断
return
else:
# 直接抛出异常
raise
_drdw = self._doReadOrWrite
for fd, event in l:
try:
selectable = self._selectables[fd]
except KeyError:
continue
log.callWithLogger(selectable, _drdw, selectable, fd, event)
doIteration = doPoll # 会被mainloop函数调用,实现事务监听循环
def getReaders(self):
return [self._selectables[fd] for fd in self._reads]
def getWriters(self):
return [self._selectables[fd] for fd in self._writes]
implementer表示PollReactor实现了IReactorFDSet的接口的方法:
/twisted/internet/interfaces.py
_doReadOrWrite方法的实现在pollreactor的基类_pollLikeMixin中:
# twisted/internet/posixbase.py
class _PollLikeMixin(object):
"""
Mixin for poll-like reactors.
Subclasses must define the following attributes::
- _POLL_DISCONNECTED - Bitmask for events indicating a connection was
lost.
- _POLL_IN - Bitmask for events indicating there is input to read.
- _POLL_OUT - Bitmask for events indicating output can be written.
Must be mixed in to a subclass of PosixReactorBase (for
_disconnectSelectable).
"""
def _doReadOrWrite(self, selectable, fd, event):
"""
文件描述符要可读,可写,能够完成工作并且能在必要时抛出异常
"""
why = None
inRead = False
if event & self._POLL_DISCONNECTED and not (event & self._POLL_IN):
# 处理断开的连接,只有当我们已经完成处理所有未决的输入时
if fd in self._reads:
# 表明不会再有读事件,即读取数据已经完毕,
# 并且有可能传输的另一边已经断开连接
inRead = True
why = CONNECTION_DONE
else:
# 如果我们没有从这个描述符中读取数据,
# 那么只有可能是一个错误的关闭
why = CONNECTION_LOST
else:
try:
if selectable.fileno() == -1:
# 表明这个socket已经被关闭
why = _NO_FILEDESC
else:
if event & self._POLL_IN:
# 处理读事件
why = selectable.doRead()
inRead = True
if not why and event & self._POLL_OUT:
# 处理写事件,读事件的eventmask是1,写事
# 件的eventmask是4,二者叠加是5,5的
# 话一般指连接关闭(这个只是我的个人理解,
# 我在做实验的时候发现连接关闭时响应事件
# 的eventmask为5),所以这里要检测只有
# 写事件出现,没有读事件出现
why = selectable.doWrite()
inRead = False
except:
# Any exception from application code gets logged and will
# cause us to disconnect the selectable.
why = sys.exc_info()[1]
log.err()
if why:
# 处理关闭的连接
self._disconnectSelectable(selectable, why, inRead)
_doReadOrWrite方法将根据各个socket发生的事件来调用响应的doRead,doWrite方法,或者关闭连接和报错
port = reactor.listenTCP(options.port or 0, factory,
interface=options.iface)
这里reactor监听了一个端口,这个方法在reactor的基类PosixReactorBase中实现:
# /twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,ReactorBase):
def listenTCP(self, port, factory, backlog=50, interface=‘‘):
p = tcp.Port(port, factory, backlog, interface, self)
p.startListening()
return p
# /twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
def __init__(self, port, factory, backlog=50, interface=‘‘, reactor=None):
"""Initialize with a numeric port to listen on.
"""
base.BasePort.__init__(self, reactor=reactor)
self.port = port
self.factory = factory
self.backlog = backlog
if abstract.isIPv6Address(interface):
self.addressFamily = socket.AF_INET6
self._addressType = address.IPv6Address
self.interface = interface
def startListening(self):
"""创建和绑定socket,然后启动侦听"""
# 看看可否复用之前创建的socket
if self._preexistingSocket is None:
try:
skt = self.createInternetSocket()
if self.addressFamily == socket.AF_INET6:
addr = _resolveIPv6(self.interface, self.port)
else:
addr = (self.interface, self.port)
skt.bind(addr)
except socket.error as le:
raise CannotListenError(self.interface, self.port, le)
skt.listen(self.backlog)
else:
skt = self._preexistingSocket
self._preexistingSocket = None
self._shouldShutdown = False
# Make sure that if we listened on port 0, we update that to
# reflect what the OS actually assigned us.
self._realPortNumber = skt.getsockname()[1]
log.msg("%s starting on %s" % (
self._getLogPrefix(self.factory), self._realPortNumber))
# The order of the next 5 lines is kind of bizarre. If no one
# can explain it, perhaps we should re-arrange them.
self.factory.doStart() # 启动工厂
self.connected = True
self.socket = skt
self.fileno = self.socket.fileno
self.numberAccepts = 100
self.startReading()
# 将该对象添加到reactor的polling对象的跟踪列表中
在这里,listenTCP创建一个监听某个端口的socket,并且将其添加到reactor的polling对象的跟踪列表中,一旦有客户端访问该服务器,这个reactor就会监控到,并且处理它。listenTCP返回一个Port对象,当客户端有连接请求时,便会调用doRead方法:
# twisted/internet/tcp.py
def doRead(self):
try:
if platformType == "posix":
numAccepts = self.numberAccepts
else:
# win32下只能调用一次socket.accept方法
numAccepts = 1
for i in range(numAccepts):
if self.disconnecting:
return
try:
skt, addr = self.socket.accept()
# 获得客户端连接的socket
except socket.error as e:
if e.args[0] in (EWOULDBLOCK, EAGAIN):
# EWOULDBLOCK 操作阻塞
# EAGAIN 再次尝试
self.numberAccepts = i
break
elif e.args[0] == EPERM:
# 操作不允许
# Netfilter on Linux may have rejected the
# connection, but we get told to try to accept()
# anyway.
continue
elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
# EMFILE 过多的文件描述符
# ENOBUFS 缓存区不足
# ENFILE 文件表溢出
# Linux gives EMFILE when a process is not allowed
# to allocate any more file descriptors. *BSD and
# Win32 give (WSA)ENOBUFS. Linux can also give
# ENFILE if the system is out of inodes, or ENOMEM
# if there is insufficient memory to allocate a new
# dentry. ECONNABORTED is documented as possible on
# both Linux and Windows, but it is not clear
# whether there are actually any circumstances under
# which it can happen (one might expect it to be
# possible if a client sends a FIN or RST after the
# server sends a SYN|ACK but before application code
# calls accept(2), however at least on Linux this
# _seems_ to be short-circuited by syncookies.
log.msg("Could not accept new connection (%s)" % (
errorcode[e.args[0]],))
break
raise
fdesc._setCloseOnExec(skt.fileno())
protocol = self.factory.buildProtocol(self._buildAddr(addr))
if protocol is None:
skt.close()
continue
s = self.sessionno
self.sessionno = s+1
transport = self.transport(skt, protocol, addr, self, s, self.reactor)
protocol.makeConnection(transport)
else:
self.numberAccepts = self.numberAccepts+20
except:
log.deferr()
在doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后创建Protocol对象,然后把transport加入到reactor的读集合。
protocol = self.factory.buildProtocol(self._buildAddr(addr))
transport = self.transport(skt, protocol, addr, self, s, self.reactor)
protocol.makeConnection(transport)
factory对象(Factory以后再讲)调用buildProtocol方法创建了我们自定义的Protocol类对象,然后创建transport之后调用了protocol的makeConnection方法,改方法的实现在其父类BaseProtocol中:
# /twisted/internet/protocol.py
class BaseProtocol:
connected = 0
transport = None
def makeConnection(self, transport):
self.connected = 1
self.transport = transport
self.connectionMade()
def connectionMade(self):
"""Called when a connection is made.
This may be considered the initializer of the protocol, because
it is called when the connection is completed. For clients,
this is called once the connection to the server has been
established; for servers, this is called after an accept() call
stops blocking and a socket has been received. If you need to
send any greeting or initial message, do it here.
"""
在这里它调用了我们自定义的makeConnection方法,这样服务器端和客户端就可以进行数据传输了
当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。而Connection是transport实例的类的父类,它实现了doRead方法:
# /twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
_AbortingMixin):
def doRead(self):
try:
data = self.socket.recv(self.bufferSize)
except socket.error as se:
if se.args[0] == EWOULDBLOCK:
# 如果被阻塞直接返回
return
else:
# 断开连接
return main.CONNECTION_LOST
return self._dataReceived(data)
def _dataReceived(self, data):
if not data:
return main.CONNECTION_DONE
rval = self.protocol.dataReceived(data)
if rval is not None:
offender = self.protocol.dataReceived
warningFormat = (
‘Returning a value other than None from %(fqpn)s is ‘
‘deprecated since %(version)s.‘)
warningString = deprecate.getDeprecationWarningString(
offender, versions.Version(‘Twisted‘, 11, 0, 0),
format=warningFormat)
deprecate.warnAboutFunction(offender, warningString)
return rval
_dataReceived方法将调用我们重写的protocol的dataReceived方法处理数据
reactor.run()方法的是reactor的基类_SignalReactorMixin实现的:
class _SignalReactorMixin(object):
def startRunning(self, installSignalHandlers=True):
self._installSignalHandlers = installSignalHandlers
ReactorBase.startRunning(self)
def run(self, installSignalHandlers=True):
self.startRunning(installSignalHandlers=installSignalHandlers)
self.mainLoop()
def mainLoop(self):
while self._started:
try:
while self._started:
# Advance simulation time in delayed event
# processors.
self.runUntilCurrent()
t2 = self.timeout()
t = self.running and t2
self.doIteration(t)
except:
log.msg("Unexpected error in main loop.")
log.err()
else:
log.msg(‘Main loop terminated.‘)
pollreactor的基类PosixReactorBase有两个基类,分别是_SignalReactorMixin和ReactorBase,由于_SignalReactorMixin和ReactorBase都实现了startRunning方法,所以根据继承的mro顺序的话,会先调用_SignalReactorMixin的,这样的话需要在最后再调用ReactorBase的startRunning方法
在这里mainloop将启动主循环了,mainloop调用上面讲的doIterarion方法来监控一组描述符,一旦有事件准备好读写,就调用事件处理程序来处理。
以上就是对于这个简单例子的简要分析,从创建事件监听循环,到与客户端建立连接。有一些细节我并没有做出说明(因为我也是边阅读源码边写博客),如果有兴趣可以仔细阅读源码,本文难免存在疏漏和错误,欢迎读者给与指正。因为要考研,我的时间并不是很多,但我至少每个月会写一篇这样的文章
参考:
1.http://www.jianshu.com/p/26ae331b09b0
2.https://docs.python.org/2/library/errno.html?highlight=errno#module-errno
3.https://github.com/twisted/twisted/tree/trunk/twisted
4.https://docs.python.org/2/library/select.html?highlight=select#module-select