所用模块
asyncore
英文捉鸡点 这里
源码中可以看到其实本质上就对 select 以及 socket 的进一步封装
简单说明
Python的asyncore模块提供了以异步的方式写入套接字服务的客户端和服务器的基础结构。
主要包括
- asyncore.loop(…) - 用于循环监听网络事件。loop()函数负责检测一个字典,字典中保存dispatcher的实例。
- asyncore.dispatcher类 - 一个底层套接字对象的简单封装。这个类有少数由异步循环调用的,用来事件处理的函数。
- dispatcher类中的writable()和readable()在检测到一个socket可以写入或者数据到达的时候被调用,并返回一个bool值,决定是否调用handle_read或者handle_write。
- asyncore.dispatcher_with_send类 - 一个 dispatcher的子类,添加了简单的缓冲输出能力,对简单的客户端很有用。
可用方法
- handle_read():当socket有可读的数据的时候执行这个方法,可读的数据的判定条件就是看方法readable()返回为True还是False。即readable()返回为True的时候执行该方法。
- handle_write():当socket有可写的数据的时候执行这个方法,可写的数据的判定条件就是看方法writable()返回为True还是False。即writable()返回为True的时候执行该方法。
- handle_expt():当socket通信过程中出现OOB异常的时候执行该方法。
- handle_connect():当有客户端连接的时候,执行该方法进行处理。
- handle_close():可连接关闭的时候执行该方法。
- handle_error():当通信过程中出现异常并且没有在其他的地方进行处理的时候执行该方法。
- handle_accept():当作为server socket监听的时候,有客户端连接的时候,利用这个方法进行处理。
- readable():缓冲区是否有可读数据。
- writable():缓冲区是否有可写数据。
- create_socket(family, type):创建一个socket连接。
- connect(address):连接一个socket server。
- send(data):发送数据。
- recv(buffer_size):收取数据到内存缓冲中。
- listen(backlog):server socket开始监听。
- bind(address):server socket绑定某个地址和端口。
- accept():当有客户端连接的时候,执行该方法接受客户端连接。
- close():关闭socket。
- asyncore.loop([timeout[, use_poll[, map[, count]]]])
-
- 进入轮询循环直到所有打开的通道已被关闭或计数通过。
- 所有的参数都是可选的。
- count参数默认为None,只有当所有通道都被关闭时循环才会终止。
- timeout参数设置为select()或poll()调用设置超时,以秒为单位,默认为30秒。
- use_poll参数,如果为true ,则表示 poll()优先于select(),默认值为False。
- map是包含监控的channel的字典。channel关闭时会从map中删除。不指定map时会使用全局map。
- Channel(asyncore.dispatcher , asynchat.async_chat和其子类的实例)可以自由地混合在map上)。
- asyncore.dispatcher_with_send
- dispatcher的子类,增加了简单的缓冲输出,对于简单的客户端有用。
- 详细资料参考:asynchat.async_chat。
- class asyncore.file_dispatcher
- 封装了文件描述符或文件对象及映射参数(可选)供poll()和loop()函数使用的文件分发器。
- 它提供了文件对象或其他具备fileno()方法的对象,调用fileno()并传递到file_wrapper构造函数。
- 可用于UNIX。
- class asyncore.file_wrapper
- 接收整数文件描述符并调用os.dup()复制句柄,这样原句柄可以关闭,而文件包装器不受影响。
- 该类封装了大量方法以模拟socket给file_dispatcher类使用。
- 可用于UNIX。
asynchat
英捉鸡 , 这里
简单说明
该模块建立在asyncore
基础架构之上,简化了异步客户端和服务器,并且更容易处理元素被任意字符串终止或者长度可变的协议。
主要包括
- asynchat.async_chat类 - 这个类是asyncore.dispatcher的抽象子类。一般使用其collect_incoming_data()和found_terminator()方法。
- collect_incoming_data() - 接收数据。
- found_terminator() - 当输入数据流符合由 set_terminator() 设置的终止条件时被调用。
- set_terminator() - 设置终止条件。
- push() - 向通道压入数据以确保其传输。
聊天室开发
接口
本次项目开发所需要用到的模块和接口
asyncore
- dispacher
- loop
- handle_read
- handle_write
asynchat
- collect_incoming_data
- set_terminator
- tound_terminator
- push
- handle_close
流程
- 用户连接
- 登记用户
- 建立会话
- 处理用户消息
聊天室代码剖析
服务端
ChatServer 类 - 套接字处理
用于创建 server_socket 套接字
整体操作类似于 socket 的使用
import asynchat import asyncore # 定义端口 PORT = 6666 # 定义结束异常类 class EndSession(Exception): pass class ChatServer(asyncore.dispatcher): """ 聊天服务器 """ def __init__(self, port): asyncore.dispatcher.__init__(self) # 创建socket self.create_socket() # 设置 socket 为可重用 self.set_reuse_addr() # 监听端口 self.bind((‘‘, port)) self.listen(5) self.users = {} self.main_room = ChatRoom(self) def handle_accept(self): conn, addr = self.accept() ChatSession(self, conn)
ChatSession 类 - 会话处理
用于维护聊天室
重写了 collect_incoming_data 用于数据存放
以及 found_terminator 来进行结束标志
以及 handle_close 来进行结束操作
class ChatSession(asynchat.async_chat): """ 负责和客户端通信 """ def __init__(self, server, sock): asynchat.async_chat.__init__(self, sock) self.server = server self.set_terminator(b‘\n‘) self.data = [] self.name = None self.enter(LoginRoom(server)) def enter(self, room): # 从当前房间移除自身,然后添加到指定房间 try: cur = self.room except AttributeError: pass else: cur.remove(self) self.room = room room.add(self) def collect_incoming_data(self, data): # 接收客户端的数据 self.data.append(data.decode("utf-8")) def found_terminator(self): # 当客户端的一条数据结束时的处理 line = ‘‘.join(self.data) self.data = [] try: self.room.handle(self, line.encode("utf-8")) # 退出聊天室的处理 except EndSession: self.handle_close() def handle_close(self): # 当 session 关闭时,将进入 LogoutRoom asynchat.async_chat.handle_close(self) self.enter(LogoutRoom(self.server))
CommandHandler 类 - 命令处理
用于自定义协议, 类似于开发 httpserver 的时候的 协议格式定制处理
我们预设了4种命令分别由 其同名函数进行分发处理
- do_login 登录
- de_logout 登出
- do_say 发送消息
- do_look 查看在线用户
class CommandHandler: """ 命令处理类 """ def unknown(self, session, cmd): # 响应未知命令 # 通过 asynchat.async_chat.push 方法发送消息 session.push((‘Unknown command {} \n‘.format(cmd)).encode("utf-8")) def handle(self, session, line): line = line.decode() # 命令处理 if not line.strip(): return parts = line.split(‘ ‘, 1) cmd = parts[0] try: line = parts[1].strip() except IndexError: line = ‘‘ # 通过协议代码执行相应的方法 method = getattr(self, ‘do_‘ + cmd, None) try: method(session, line) except TypeError: self.unknown(session, cmd)
Room 类 - 初始 聊天室基类
Room 类继承了 CommandHandler 可以处理聊天室中的命令处理
主要用于维护一个存有所有用户的 sessions 列表以及 广播发送信息处理
class Room(CommandHandler): """ 包含多个用户的环境,负责基本的命令处理和广播 """ def __init__(self, server): self.server = server self.sessions = [] def add(self, session): # 一个用户进入房间 self.sessions.append(session) def remove(self, session): # 一个用户离开房间 self.sessions.remove(session) def broadcast(self, line): # 向所有的用户发送指定消息 # 使用 asynchat.asyn_chat.push 方法发送数据 for session in self.sessions: session.push(line) def do_logout(self, session, line): # 退出房间 raise EndSession
LoginRoom 类 - 用户登录处理
用户登录后需要广播一条信息 xxx 加入聊天室
class LoginRoom(Room): """ 处理登录用户 """ def add(self, session): # 用户连接成功的回应 Room.add(self, session) # 使用 asynchat.asyn_chat.push 方法发送数据 session.push(b‘Connect Success‘) def do_login(self, session, line): # 用户登录逻辑 name = line.strip() # 获取用户名称 if not name: session.push(b‘UserName Empty‘) # 检查是否有同名用户 elif name in self.server.users: session.push(b‘UserName Exist‘) # 用户名检查成功后,进入主聊天室 else: session.name = name session.enter(self.server.main_room)
Loginout 类 - 退出聊天室处理
class LogoutRoom(Room): """ 处理退出用户 """ def add(self, session): # 从服务器中移除 try: del self.server.users[session.name] except KeyError: pass
ChatRoom 类 - 聊天处理
class ChatRoom(Room): """ 聊天用的房间 """ def add(self, session): # 广播新用户进入 session.push(b‘Login Success‘) self.broadcast((session.name + ‘ has entered the room.\n‘).encode("utf-8")) self.server.users[session.name] = session Room.add(self, session) def remove(self, session): # 广播用户离开 Room.remove(self, session) self.broadcast((session.name + ‘ has left the room.\n‘).encode("utf-8")) def do_say(self, session, line): # 客户端发送消息 self.broadcast((session.name + ‘: ‘ + line + ‘\n‘).encode("utf-8")) def do_look(self, session, line): # 查看在线用户 session.push(b‘Online Users:\n‘) for other in self.sessions: session.push((other.name + ‘\n‘).encode("utf-8"))
mian - 主函数处理
if __name__ == ‘__main__‘: s = ChatServer(PORT) try: print("chat server run at ‘0.0.0.0:{0}‘".format(PORT)) asyncore.loop() except KeyboardInterrupt: print("chat server exit")
客户端
pass
流程梳理
初始状态
- socket 创建后 handle_accept 执行来调用了 ChatSession ,
- ChatSession 的 初始化方法中 执行了 enter 方法需要 LoginRoom 的实例化作为参数
- LoginRoom 继承自 Room , 且 没有自己定义 初始化方法因此, 利用 Room 进行初始化
- Room 初始化方法中创建了一个 sessions 列表, 此列表用于维护 用户会话
- enter 方法中执行了一个 add 方法, LoginRoom 和 其基类中的 Room 中都有 add 方法
- 根据 python 面向对象的定义, 执行的是 LoginRoom 中的 add , 此 add 方法中又再次执行了一个 Room.add
- 最终还是执行到了 Room 中的 add 方法, 即往 sessions 列表中加入了这个会话. 以上设计的是初始化方法
用户操作
- 初始的接口程序经由 found_terminator 进行发起 ( 官方解释如下 )
To build a functioning async_chat subclass your input methods collect_incoming_data() and found_terminator() must handle the data that the channel receives asynchronously. The methods are described below
- 然后又此方法分发在 CommandHandler 类中进行字符串的分解以及反射分别到 do_ 开头的4个方法进行分发执行
- do_login 相关的验证后, 分流到 ChatRoom 中进行相关的 do_say / do_look 操作
- do_say 经由 广播 ( Room.broadcast ) 进行想的操作
- do_logout 直接退出
- do_look 查看当前所有用户
原文地址:https://www.cnblogs.com/shijieli/p/10662419.html
时间: 2024-10-08 12:44:51