来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力
client
import socket ip_port = (‘127.0.0.1‘,8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print(‘receive:‘,data.decode()) inp = input(‘please input:‘) sk.sendall(inp.encode()) if inp == ‘exit‘: break sk.close()
server
‘‘‘ 对于socketserver,你需要做的事: 定义个一类,继续socketserver.BaseRequestHandler 重写handle方法 把写好的类和端口进行实例,启动程序 ‘‘‘ import socketserver class MyServer(socketserver.BaseRequestHandler): def handle(self): # print self.request,self.client_address,self.server conn = self.request conn.sendall(‘欢迎致电 10086,请输入1xxx,0转人工服务.‘.encode()) Flag = True while Flag: data = conn.recv(1024).decode() if data == ‘exit‘: Flag = False elif data == ‘0‘: conn.sendall(‘通过可能会被录音.balabala一大推‘.encode()) else: conn.sendall(‘请重新输入.‘.encode()) if __name__ == ‘__main__‘: server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8009),MyServer) server.serve_forever()
代码执行看源码
我们看到最后两句代码,第一句--带有括号,我们第一想到的是要么是个函数,要是是个类,那么看一下什么吧
是个类,继承了ThreadingMixIn和TCPServer两个类,好!实例对象肯定要找构造方法,当前这个类没有,就需要到父类中找了,从左往右
我们发现ThreadingMixIn类里没有,那肯定在TCPServer类里啦,果真在,并且还执行了TCPServer的父类BaseServer里的构造方法
在BaseServer里只是做了一些数据初始化的事,那我们回到TCPServer里构造方法里接着往下看吧
执行父类构造方法后,实例一个socket对象,然后就是在if下执行self.server_bind()方法,要想知道这个方法,必须弄清楚self是指代谁?
说到这里,我必须屡屡类的继承关系了...
self是指实例对象,谁实例的,是ThreadingTCPServer类,所以找server_bind方法,要从ThreadingTCPServer往上找
我们发现这个方法还是TCPServer里,
似乎是做了绑定socket的事,这里过,在找看构造方法里,后又执行self.server_activate方法,我们按照那继承关系又在TCPServer找到了
似乎是做了有关监听数的事,好这里构造方法执行完毕,第一代码也就这样看完了
看到第二句,开始找serve_forever方法在哪了
在BaseServer里,这方法大概说的是,一次处理一个请求直到连接关闭,如果处理其他要求另外开启一个线程
我发现下有个self._handle_request_noblock(),我们看看这个是做啥的吧?
还是在BaseServer,好像是做不阻塞的工作
在这里还是执行了一个process_request方法,这个方法我们在ThreadingMixIn找到了
主要是实例线程和启动线程的...
没错,似乎我有了一点感悟--
socketserver是基于多线程来做的,并做到多并发处理
内部调用流程为:
- 启动服务端程序
- 执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和 端口
- 执行 BaseServer.__init__ 方法,将自定义的继承自SocketServer.BaseRequestHandler 的类 MyRequestHandle赋值给 self.RequestHandlerClass
- 执行 BaseServer.server_forever 方法,While 循环一直监听是否有客户端请求到达 ...
- 当客户端连接到达服务器
- 执行 ThreadingMixIn.process_request 方法,创建一个 “线程” 用来处理请求
- 执行 ThreadingMixIn.process_request_thread 方法
- 执行 BaseServer.finish_request 方法,执行 self.RequestHandlerClass() 即:执行 自定义 MyRequestHandler 的构造方法(自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用 MyRequestHandler的handle方法)
服务类:
SocketServer提供了5个基本的服务类:
BaseServer
: 基础类,由于下面四个网络服务类的继承
TCPServer
:针对TCP套接字流
UDPServer
:针对UDP数据报套接字
UnixStreamServer
:处理流式套接字,与TCPServer配合
UnixDatagramServer
:处理数据报套接字,与UDPServer配合
异步处理类:
这个四个服务类都是同步处理请求的。一个请求没处理完不能处理下一个请求。要想支持异步模型,可以利用多继承让server类继承ForkingMixIn 或 ThreadingMixIn。
ForkingMixIn
: 利用多进程(分叉)实现异步。(Mix-in class to handle each request in a new process)
ThreadingMixIn
: 利用多线程实现异步。(Mix-in class to handle each request in a new thread)
请求处理类:
要实现一项服务,还必须派生一个handler请求处理类,并重写父类的handle()方法。handle方法就是用来专门是处理请求的。该模块是通过服务类和请求处理类组合来处理请求的。
SocketServer模块提供的请求处理类有BaseRequestHandler
,以及它的派生类StreamRequestHandler
和DatagramRequestHandler
。从名字看出可以一个处理流式套接字,一个处理数据报套接字。
#服务器端 from SocketServer import TCPServer,StreamRequestHandler, ThreadingMixIn, ForkingMixIn #定义请求处理类 class Handler(StreamRequestHandler): def handle(self): addr = self.request.getpeername() print ‘connection:‘, addr while 1: self.request.sendall(self.request.recv(1024)) #实例化服务类对象 server = TCPServer( server_address=(‘127.0.0.1‘, 8123), # address RequestHandlerClass=Handler # 请求类 ) #开启服务 server.serve_forever()
#客户端 import socket def socketClient(): so = socket.socket() so.connect((‘127.0.0.1‘, 8123)) # so.close() while 1: so.sendall(raw_input(‘msg‘)) print so.recv(1024) if __name__ == ‘__main__‘: socketClient()
多线程服务端
from SocketServer import TCPServer,StreamRequestHandler, ThreadingMixIn, ForkingMixIn #定义基于多线程的服务类 class Server(ThreadingMixIn, TCPServer): pass #定义请求处理类 class Handler(StreamRequestHandler): def handle(self): addr = self.request.getpeername() print ‘connection:‘, addr while 1: self.request.sendall(self.request.recv(1024)) #实例化服务类对象 server = Server( server_address=(‘127.0.0.1‘, 8123), # address RequestHandlerClass=Handler # 请求类 ) #开启服务 server.serve_forever()
源码分析:
"""普通的Socket服务类. socket服务: - address family: - AF_INET{,6}: IP (Internet Protocol) sockets (default) - AF_UNIX: Unix domain sockets - others, e.g. AF_DECNET are conceivable (see <socket.h> - socket type: - SOCK_STREAM (reliable stream, e.g. TCP) - SOCK_DGRAM (datagrams, e.g. UDP) 请求服务类 (including socket-based): - 客户端地址验证之前进一步查看请求 (实际上是一个请求处理的钩子在请求之前,例如logging) - 如何处理多个请求: - synchronous (同步:同一时间只能有一个请求) - forking (分叉:每个请求分配一个新的进程) - threading (线程:每个请求分配一个新的线程) 五个类的继承关系如下: +------------+ | BaseServer | +------------+ | v +-----------+ +------------------+ | TCPServer |------->| UnixStreamServer | +-----------+ +------------------+ | v +-----------+ +--------------------+ | UDPServer |------->| UnixDatagramServer | +-----------+ +--------------------+ 通过ForkingMixIn创建进程,通过ThreadingMixIn创建线程,如下实例: class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass """ __version__ = "0.4" import socket import select import sys import os import errno try: import threading except ImportError: import dummy_threading as threading __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer", "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler", "StreamRequestHandler","DatagramRequestHandler", "ThreadingMixIn", "ForkingMixIn"] #family参数代表地址家族,比较常用的为AF_INET或AF_UNIX。 #AF_UNIX用于同一台机器上的进程间通信,AF_INET对于IPV4协议的TCP和UDP 。 if hasattr(socket, "AF_UNIX"): __all__.extend(["UnixStreamServer","UnixDatagramServer", "ThreadingUnixStreamServer", "ThreadingUnixDatagramServer"]) def _eintr_retry(func, *args): """重新启动系统调用EINTR中断""" while True: try: return func(*args) except (OSError, select.error) as e: if e.args[0] != errno.EINTR: raise class BaseServer: """服务类的基类. 调用方法: - __init__(server_address, RequestHandlerClass) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you do not use serve_forever() - fileno() -> int # for select() 可以被重写的方法: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - server_close() - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() 派生类(derived classes)方法: - finish_request(request, client_address) 可以由派生类或重写类变量实例: - timeout - address_family - socket_type - allow_reuse_address 实例变量: - RequestHandlerClass - socket """ timeout = None def __init__(self, server_address, RequestHandlerClass): """初始化,能被扩展但不要重写.""" self.server_address = server_address # 地址元祖如(‘127.0.0.1‘, 8123) self.RequestHandlerClass = RequestHandlerClass # 请求处理类 self.__is_shut_down = threading.Event() # 多线程通信机制 self.__shutdown_request = False def server_activate(self): """通过构造函数激活服务器.可被重写.""" pass def serve_forever(self, poll_interval=0.5): """在一个时间段内处理一个请求直到关闭. 处理请求,直到一个明确的shutdown()请求。每poll_interval秒轮询一次shutdown。 忽略self.timeout。如果你需要做周期性的任务,建议放置在其他线程。 """ self.__is_shut_down.clear() #Event是Python多线程通信的最简单的机制之一.一个线程标识一个事件,其他线程一直处于等待状态。 #一个事件对象管理一个内部标示符,这个标示符可以通过set()方法设为True,通过clear()方法重新设为False,wait()方法则使线程一直处于阻塞状态,直到标示符变为True #也就是说我们可以通过 以上三种方法来多个控制线程的行为。 try: while not self.__shutdown_request: #考虑使用其他文件描述符或者连接socket去唤醒它取代轮询 #轮询减少在其他时间我们响应了关闭请求CPU。 r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): """终止serve_forever的循环. 阻塞直到循环结束. 当serve_forever()方法正运行在另外的线程中必须调用它,否则会发生死锁. """ self.__shutdown_request = True self.__is_shut_down.wait() # - handle_request() 是顶层调用. 它调用select,get_request(),verify_request()和process_request() # - get_request() 不同于流式和报文socket # - process_request() 产生进程的位置,或者产生线程去结束请求 # - finish_request() 请求处理类的实例,此构造都将处理请求本身 def handle_request(self): """处理一个请求, 可能阻塞.考虑self.timeout.""" # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() # 返回当前超时期的值,如果没有设置超时期,则返回None if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return self._handle_request_noblock() def _handle_request_noblock(self): """处理一个请求, 非阻塞.""" try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) def handle_timeout(self): """超时处理。默认对于forking服务器是收集退出的子进程状态,threading服务器则什么都不做""" pass def verify_request(self, request, client_address): """ 返回一个布尔值,如果该值为True ,则该请求将被处理,反之请求将被拒绝。 此功能可以重写来实现对服务器的访问控制。 默认的实现始终返回True。client_address可以限定客户端,比如只处理指定ip区间的请求。 常用。 """ return True def process_request(self, request, client_address): """调用finish_request.被 ForkingMixIn and ThreadingMixIn重写 如果用户提供handle()方法抛出异常,将调用服务器的handle_error()方法。 如果self.timeout内没有请求收到, 将调用handle_timeout()并返回handle_request()。 """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """关闭并清理server.""" pass def finish_request(self, request, client_address): """通过请求类的实例结束请求,实际处理RequestHandlerClass发起的请求并调用其handle()方法。 常用.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """关闭结束一个单独的请求.""" self.close_request(request) def close_request(self, request): pass def handle_error(self, request, client_address): """优雅的操作错误,可重写,默认打印异常并继续 """ print ‘-‘*40 print ‘Exception happened during processing of request from‘, print client_address import traceback traceback.print_exc() # XXX But this goes to stderr! print ‘-‘*40
基于协程实现socket多并发
在这里,首先我们先了解一下协程一个概念:
-
协程又称微线程,单线程实现异步并发
-
线程寄存在cpu里,而协程有自己的寄存器,上下文和栈
-
由于协程本质上是单线程,所以不存在上下文切换花销,以及锁和同步的概念
-
低成本,高并发,唯一不足的就是不能利用cpu的核资源
-
你就记住协程干了这么一件事:遇IO阻塞就去做别的事了(socket连接就是IO操作)
从上面的源码解析,我们知道,socketserver实现多并发本质就是多线程或多进程,但这样还是有些低效,你想啊,如果有几万客户连接过来,就要创建几万个线程,如果用的人其实不是很多,CPU还要不断的去检测socket客户端有没有传输数据,花销很大,效率就低
server
import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind((‘0.0.0.0‘, port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == ‘__main__‘: server(8001)
client
import socket HOST = ‘localhost‘ # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print(‘Received‘, repr(data)) s.close()