使用内select模块构造单线程异步服务器
关键:
poll(),生成器
服务器模块:
#!/usr/bin/env python3 #-*- encoding:UTF -*- import argparse,socket,time aphorisms = {b‘Beautiful is better than?‘:b‘Ugly.‘, b‘Explicit is better than?‘:b‘Implicit.‘, b‘Simple is better than?‘:b‘Complex.‘} def get_answer(aphorism): return aphorisms.get(aphorism,b‘Error:unknow aphorism‘) def parse_command_line(description): parser = argparse.ArgumentParser(description=description) parser.add_argument(‘host‘,help=‘IP or hostname‘) parser.add_argument(‘-p‘,metavar=‘port‘,type=int,default=1060, help=‘TCP port (default 1060)‘) args = parser.parse_args() address =(args.host,args.p) return address def create_srv_socket(address): listener = socket.socket(socket.AF_INET,socket.SOCK_STREAM) listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) listener.bind(address) listener.listen(64) print(‘Listening at {}‘.format(address)) return listener def accept_connections_forever(listener): while True: sock,address = listener.accpet() print(‘Accepted connection from {}‘.format(address)) handle_conversation(sock,address) def handle_conversation(sock,address): try: while True: handle_request(sock) except EOFError: print(‘Client socket to {} has closed‘.format(address)) except Exception as e: print(‘Client {} error :{}‘.format(address,e)) finally: socket.close() def handle_request(sock): aphorism = recv_until(sock,b‘?‘) answer = get_answer(aphorism) sock.sendall(answer) def recv_until(sock,suffix): message = sock.recv(4096) if not message: raise EOFError(‘socket closed‘) while not message.endswith(suffix): data = sock.recv(4096) if not data: raise IOError(‘received {!r} then socket closed‘.format(message)) message += data return message
客户端模块:
#!/usr/bin/env python3 #-*- encding:UTF-8 -*- import argparse,random,socket,zen_utils def client(address,cause_error=False): sock =socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.connect(address) aphorisms = list(zen_utils.aphorisms) if cause_error: sock.sendall(aphorisms[0][:-1]) return for aphorism in random.sample(aphorisms,3): sock.sendall(aphorism) print(aphorism,zen_utils.recv_until(sock,b‘.‘)) sock.close() if __name__ == ‘__main__‘: parser = argparse.ArgumentParser(description = ‘Example client‘) parser.add_argument(‘host‘,help=‘IP or Hostname‘) parser.add_argument(‘-p‘,metavar=‘port‘,type=int,default=1060, help=‘TCP Port number‘) parser.add_argument(‘-e‘,action=‘store_true‘,help=‘cause an error‘) args = parser.parse_args() address = (args.host,args.p) client(address,args.e)
异步服务器:
#!/usr/bin/env python3 #-*- encoding:UTF -*- import select,zen_utils def all_events_forever(poll_object): while True: for fd,event in poll_object.poll(): yield fd,event def serve(listener): sockets = {listener.fileno():listener} addresses = {} bytes_received = {} bytes_to_send = {} poll_object = select.poll() poll_object.register(listener,select.POLLIN) for fd,event in all_events_forever(poll_object): sock = sockets[fd] if event &(select.POLLHUP | select.POLLERR | select.POLLNVAL): address = addresses.pop(sock) rb = bytes_received.pop(sock,b‘‘) sb = bytes_to_send.pop(sock,b‘‘) if rb: print(‘Client {} sent {} but then closed‘.format(address,rb)) elif sb: print(‘Client {} closed before we sent {}‘.format(address,sb)) else: print(‘Client {} closed socket normally‘.format(address)) poll_object.unregister(fd) del sockets[fd] elif sock is listener: sock,address = sock.accept() print(‘Accepted connection from {}‘.format(address)) sock.setblocking(False) sockets[sock.fileno()] = sock addresses[sock] = address poll_object.register(sock,select.POLLIN) elif event & select.POLLIN: more_data = sock.recv(4096) if not more_data: sock.close() continue data = bytes_received.pop(sock,b‘‘) + more_data if data.endswith(b‘?‘): bytes_to_send[sock] = zen_utils.get_answer(data) poll_object.modify(sock,select.POLLOUT) elif event & select.POLLOUT: data = bytes_to_send.pop(sock) n = sock.send(data) if n < len(data): bytes_to_send[sock] = data[n:] else: poll_object.modify(sock,select.POLLIN) if __name__ == ‘__main__‘: address = zen_utils.parse_command_line(‘Low-level async server‘) listener = zen_utils.create_srv_socket(address) serve(listener)
这个异步服务器的核心是它的缓冲区:
在等待某个请求完成时,会将受到的数据存储在bytes_received字典中;在等待操作系统安排发送数据时,会将要发送的字节存储在bytes_to_send字典中。
这两个缓冲区与我们告知poll()要在每个套接字上等待的事件一起形成了一个完整的状态机,用于一步一步的处理客户端会话。
时间: 2024-12-11 00:01:56