socketserver源码解析和协程版socketserver

  来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力

client

import socket

ip_port = (‘127.0.0.1‘,8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print(‘receive:‘,data.decode())
    inp = input(‘please input:‘)
    sk.sendall(inp.encode())
    if inp == ‘exit‘:
        break

sk.close()

server

‘‘‘
对于socketserver,你需要做的事:
    定义个一类,继续socketserver.BaseRequestHandler
    重写handle方法
    把写好的类和端口进行实例,启动程序
‘‘‘

import socketserver

class MyServer(socketserver.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall(‘欢迎致电 10086,请输入1xxx,0转人工服务.‘.encode())
        Flag = True
        while Flag:
            data = conn.recv(1024).decode()
            if data == ‘exit‘:
                Flag = False
            elif data == ‘0‘:
                conn.sendall(‘通过可能会被录音.balabala一大推‘.encode())
            else:
                conn.sendall(‘请重新输入.‘.encode())

if __name__ == ‘__main__‘:
    server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8009),MyServer)
    server.serve_forever()

代码执行看源码

  我们看到最后两句代码,第一句--带有括号,我们第一想到的是要么是个函数,要是是个类,那么看一下什么吧

  是个类,继承了ThreadingMixIn和TCPServer两个类,好!实例对象肯定要找构造方法,当前这个类没有,就需要到父类中找了,从左往右

  我们发现ThreadingMixIn类里没有,那肯定在TCPServer类里啦,果真在,并且还执行了TCPServer的父类BaseServer里的构造方法

  在BaseServer里只是做了一些数据初始化的事,那我们回到TCPServer里构造方法里接着往下看吧

  执行父类构造方法后,实例一个socket对象,然后就是在if下执行self.server_bind()方法,要想知道这个方法,必须弄清楚self是指代谁?

  说到这里,我必须屡屡类的继承关系了...

  self是指实例对象,谁实例的,是ThreadingTCPServer类,所以找server_bind方法,要从ThreadingTCPServer往上找

  我们发现这个方法还是TCPServer里,

  似乎是做了绑定socket的事,这里过,在找看构造方法里,后又执行self.server_activate方法,我们按照那继承关系又在TCPServer找到了

  似乎是做了有关监听数的事,好这里构造方法执行完毕,第一代码也就这样看完了

  看到第二句,开始找serve_forever方法在哪了

  在BaseServer里,这方法大概说的是,一次处理一个请求直到连接关闭,如果处理其他要求另外开启一个线程

  我发现下有个self._handle_request_noblock(),我们看看这个是做啥的吧?

  还是在BaseServer,好像是做不阻塞的工作

  在这里还是执行了一个process_request方法,这个方法我们在ThreadingMixIn找到了

  主要是实例线程和启动线程的...

  没错,似乎我有了一点感悟--

  socketserver是基于多线程来做的,并做到多并发处理

内部调用流程为:

  • 启动服务端程序
  • 执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和 端口
  • 执行 BaseServer.__init__ 方法,将自定义的继承自SocketServer.BaseRequestHandler 的类 MyRequestHandle赋值给 self.RequestHandlerClass
  • 执行 BaseServer.server_forever 方法,While 循环一直监听是否有客户端请求到达 ...
  • 当客户端连接到达服务器
  • 执行 ThreadingMixIn.process_request 方法,创建一个 “线程” 用来处理请求
  • 执行 ThreadingMixIn.process_request_thread 方法
  • 执行 BaseServer.finish_request 方法,执行 self.RequestHandlerClass()  即:执行 自定义 MyRequestHandler 的构造方法(自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用 MyRequestHandler的handle方法)

服务类:

  SocketServer提供了5个基本的服务类:

  BaseServer: 基础类,由于下面四个网络服务类的继承

  TCPServer:针对TCP套接字流

  UDPServer:针对UDP数据报套接字

  UnixStreamServer:处理流式套接字,与TCPServer配合

  UnixDatagramServer:处理数据报套接字,与UDPServer配合

异步处理类:

  这个四个服务类都是同步处理请求的。一个请求没处理完不能处理下一个请求。要想支持异步模型,可以利用多继承让server类继承ForkingMixIn 或 ThreadingMixIn。

  ForkingMixIn: 利用多进程(分叉)实现异步。(Mix-in class to handle each request in a new process)

  ThreadingMixIn: 利用多线程实现异步。(Mix-in class to handle each request in a new thread)

请求处理类:

  要实现一项服务,还必须派生一个handler请求处理类,并重写父类的handle()方法。handle方法就是用来专门是处理请求的。该模块是通过服务类和请求处理类组合来处理请求的。

  SocketServer模块提供的请求处理类有BaseRequestHandler,以及它的派生类StreamRequestHandlerDatagramRequestHandler。从名字看出可以一个处理流式套接字,一个处理数据报套接字。

#服务器端

from SocketServer import TCPServer,StreamRequestHandler,    ThreadingMixIn, ForkingMixIn

#定义请求处理类
class Handler(StreamRequestHandler):
    def handle(self):
        addr = self.request.getpeername()
        print ‘connection:‘, addr
        while 1:
            self.request.sendall(self.request.recv(1024))

#实例化服务类对象
server = TCPServer(
    server_address=(‘127.0.0.1‘, 8123),     # address
    RequestHandlerClass=Handler             # 请求类
)

#开启服务
server.serve_forever()
#客户端

import socket

def socketClient():
    so = socket.socket()
    so.connect((‘127.0.0.1‘, 8123))
    # so.close()
    while 1:
        so.sendall(raw_input(‘msg‘))
        print so.recv(1024)

if __name__ == ‘__main__‘:
    socketClient()

多线程服务端

from SocketServer import TCPServer,StreamRequestHandler,    ThreadingMixIn, ForkingMixIn

#定义基于多线程的服务类
class Server(ThreadingMixIn, TCPServer):
    pass

#定义请求处理类
class Handler(StreamRequestHandler):
    def handle(self):
        addr = self.request.getpeername()
        print ‘connection:‘, addr
        while 1:
            self.request.sendall(self.request.recv(1024))

#实例化服务类对象
server = Server(
    server_address=(‘127.0.0.1‘, 8123),     # address
    RequestHandlerClass=Handler             # 请求类
)

#开启服务
server.serve_forever()

源码分析:

"""普通的Socket服务类.

socket服务:

- address family:
        - AF_INET{,6}: IP (Internet Protocol) sockets (default)
        - AF_UNIX: Unix domain sockets
        - others, e.g. AF_DECNET are conceivable (see <socket.h>
- socket type:
        - SOCK_STREAM (reliable stream, e.g. TCP)
        - SOCK_DGRAM (datagrams, e.g. UDP)

请求服务类 (including socket-based):

- 客户端地址验证之前进一步查看请求
        (实际上是一个请求处理的钩子在请求之前,例如logging)
- 如何处理多个请求:
        - synchronous (同步:同一时间只能有一个请求)
        - forking (分叉:每个请求分配一个新的进程)
        - threading (线程:每个请求分配一个新的线程)

五个类的继承关系如下:

        +------------+
        | BaseServer |
        +------------+
              |
              v
        +-----------+        +------------------+
        | TCPServer |------->| UnixStreamServer |
        +-----------+        +------------------+
              |
              v
        +-----------+        +--------------------+
        | UDPServer |------->| UnixDatagramServer |
        +-----------+        +--------------------+

通过ForkingMixIn创建进程,通过ThreadingMixIn创建线程,如下实例:

        class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

"""

__version__ = "0.4"

import socket
import select
import sys
import os
import errno
try:
    import threading
except ImportError:
    import dummy_threading as threading

__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
           "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
           "StreamRequestHandler","DatagramRequestHandler",
           "ThreadingMixIn", "ForkingMixIn"]

#family参数代表地址家族,比较常用的为AF_INET或AF_UNIX。
#AF_UNIX用于同一台机器上的进程间通信,AF_INET对于IPV4协议的TCP和UDP 。
if hasattr(socket, "AF_UNIX"):
    __all__.extend(["UnixStreamServer","UnixDatagramServer",
                    "ThreadingUnixStreamServer",
                    "ThreadingUnixDatagramServer"])

def _eintr_retry(func, *args):
    """重新启动系统调用EINTR中断"""
    while True:
        try:
            return func(*args)
        except (OSError, select.error) as e:
            if e.args[0] != errno.EINTR:
                raise

class BaseServer:

    """服务类的基类.

    调用方法:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    可以被重写的方法:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    派生类(derived classes)方法:

    - finish_request(request, client_address)

    可以由派生类或重写类变量实例:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    实例变量:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """初始化,能被扩展但不要重写."""
        self.server_address = server_address                # 地址元祖如(‘127.0.0.1‘, 8123)
        self.RequestHandlerClass = RequestHandlerClass      # 请求处理类
        self.__is_shut_down = threading.Event()             # 多线程通信机制
        self.__shutdown_request = False

    def server_activate(self):
        """通过构造函数激活服务器.可被重写."""
        pass

    def serve_forever(self, poll_interval=0.5):
        """在一个时间段内处理一个请求直到关闭.

        处理请求,直到一个明确的shutdown()请求。每poll_interval秒轮询一次shutdown。
        忽略self.timeout。如果你需要做周期性的任务,建议放置在其他线程。
        """

        self.__is_shut_down.clear()
        #Event是Python多线程通信的最简单的机制之一.一个线程标识一个事件,其他线程一直处于等待状态。
        #一个事件对象管理一个内部标示符,这个标示符可以通过set()方法设为True,通过clear()方法重新设为False,wait()方法则使线程一直处于阻塞状态,直到标示符变为True
        #也就是说我们可以通过 以上三种方法来多个控制线程的行为。
        try:
            while not self.__shutdown_request:
                #考虑使用其他文件描述符或者连接socket去唤醒它取代轮询
                #轮询减少在其他时间我们响应了关闭请求CPU。
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """终止serve_forever的循环.

        阻塞直到循环结束. 当serve_forever()方法正运行在另外的线程中必须调用它,否则会发生死锁.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    # - handle_request() 是顶层调用.  它调用select,get_request(),verify_request()和process_request()
    # - get_request() 不同于流式和报文socket
    # - process_request() 产生进程的位置,或者产生线程去结束请求
    # - finish_request() 请求处理类的实例,此构造都将处理请求本身

    def handle_request(self):
        """处理一个请求, 可能阻塞.考虑self.timeout."""
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()  # 返回当前超时期的值,如果没有设置超时期,则返回None
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """处理一个请求, 非阻塞."""
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """超时处理。默认对于forking服务器是收集退出的子进程状态,threading服务器则什么都不做"""
        pass

    def verify_request(self, request, client_address):
        """
        返回一个布尔值,如果该值为True ,则该请求将被处理,反之请求将被拒绝。
        此功能可以重写来实现对服务器的访问控制。
        默认的实现始终返回True。client_address可以限定客户端,比如只处理指定ip区间的请求。 常用。
        """
        return True

    def process_request(self, request, client_address):
        """调用finish_request.被 ForkingMixIn and ThreadingMixIn重写
        如果用户提供handle()方法抛出异常,将调用服务器的handle_error()方法。
        如果self.timeout内没有请求收到, 将调用handle_timeout()并返回handle_request()。

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """关闭并清理server."""
        pass

    def finish_request(self, request, client_address):
        """通过请求类的实例结束请求,实际处理RequestHandlerClass发起的请求并调用其handle()方法。 常用."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """关闭结束一个单独的请求."""
        self.close_request(request)

    def close_request(self, request):
        pass

    def handle_error(self, request, client_address):
        """优雅的操作错误,可重写,默认打印异常并继续
        """
        print ‘-‘*40
        print ‘Exception happened during processing of request from‘,
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print ‘-‘*40

基于协程实现socket多并发

  在这里,首先我们先了解一下协程一个概念:

  • 协程又称微线程,单线程实现异步并发

  • 线程寄存在cpu里,而协程有自己的寄存器,上下文和栈

  • 由于协程本质上是单线程,所以不存在上下文切换花销,以及锁和同步的概念

  • 低成本,高并发,唯一不足的就是不能利用cpu的核资源

  • 你就记住协程干了这么一件事:遇IO阻塞就去做别的事了(socket连接就是IO操作)

  从上面的源码解析,我们知道,socketserver实现多并发本质就是多线程或多进程,但这样还是有些低效,你想啊,如果有几万客户连接过来,就要创建几万个线程,如果用的人其实不是很多,CPU还要不断的去检测socket客户端有没有传输数据,花销很大,效率就低

server

import sys
import socket
import time
import gevent

from gevent import socket,monkey
monkey.patch_all()

def server(port):
    s = socket.socket()
    s.bind((‘0.0.0.0‘, port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)

def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()
if __name__ == ‘__main__‘:
    server(8001)

client

import socket

HOST = ‘localhost‘    # The remote host
PORT = 8001           # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"),encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    #print(data)

    print(‘Received‘, repr(data))
s.close()
时间: 2024-10-09 18:30:24

socketserver源码解析和协程版socketserver的相关文章

Redux系列x:源码解析

写在前面 redux的源码很简洁,除了applyMiddleware比较绕难以理解外,大部分还是 这里假设读者对redux有一定了解,就不科普redux的概念和API啥的啦,这部分建议直接看官方文档. 此外,源码解析的中文批注版已上传至github,可点击查看.本文相关示例代码,可点击查看. 源码解析概览 将redux下载下来,然后看下他的目录结构. npm install redux 这里我们需要关心的主要是src目录,源码解析需要关心的文件都在这里面了 index.js:redux主文件,主

kube-proxy源码解析

kubernetes离线安装包,仅需三步 kube-proxy源码解析 ipvs相对于iptables模式具备较高的性能与稳定性, 本文讲以此模式的源码解析为主,如果想去了解iptables模式的原理,可以去参考其实现,架构上无差别. kube-proxy主要功能是监听service和endpoint的事件,然后下放代理策略到机器上. 底层调用docker/libnetwork, 而libnetwork最终调用了netlink 与netns来实现ipvs的创建等动作 初始化配置 代码入口:cmd

Android EventBus3.0使用及源码解析

叨了个叨 最近因为换工作的一些琐事搞的我一个头两个大,也没怎么去学新东西,实在是有些愧疚.新项目用到了EventBus3.0,原来只是听说EventBus的鼎鼎大名,一直没仔细研究过.趁着周末有些时间,研究下代码,也算没有虚度光阴. EventBus GitHub : https://github.com/greenrobot/EventBus EventBus3.0简介 EventBus是greenrobot出品的一个用于Android中事件发布/订阅的库.以前传递对象可能通过接口.广播.文件

Android MIFARE NFCA源码解析

Android MIFARE NFCA源码解析TagTechnology定义了所有标签的共有接口类BasicTagTechnology 实现了TagTechnology的一些接口 再有具体的标签协议继承BasicTagTechnologyNFC-A 遵循ISO 14443-3A协议. 关键字ATQA Answer To Request acc. to ISO/IEC 14443-4ATS Answer To Select acc. to ISO/IEC 14443-4DIF Dual Inter

Android 热修复Nuwa的原理及Gradle插件源码解析

现在,热修复的具体实现方案开源的也有很多,原理也大同小异,本篇文章以Nuwa为例,深入剖析. Nuwa的github地址 https://github.com/jasonross/Nuwa 以及用于hotpatch生成的gradle插件地址 https://github.com/jasonross/NuwaGradle 而Nuwa的具体实现是根据QQ空间的热修复方案来实现的.安卓App热补丁动态修复技术介绍.在阅读本篇文章之前,请先阅读该文章. 从QQ空间终端开发团队的文章中可以总结出要进行热更

EventBus3.0源码解析

本文主要介绍EventBus3.0的源码 EventBus是一个Android事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事件传递. EventBus使用简单,并将事件发布和订阅充分解耦,从而使代码更简洁. 本文主要从以下几个模块来介绍 1.EventBus使用 2.EventBus注册源码解析 3.EventBus事件分发解析 4.EventBus取消注册解析 一.EventBus使用 1.首先是注册 EventBus.getDefault().register(this)

Android AsyncTask 源码解析

1. 官方介绍 public abstract class AsyncTask extends Object  java.lang.Object    ? android.os.AsyncTask<Params, Progress, Result> AsyncTask enables proper and easy use of the UI thread. This class allows to perform background operations and publish resul

IPerf——网络测试工具介绍与源码解析(2)

对于IPerf源码解析,我是基于2.0.5版本在Windows下执行的情况进行分析的,提倡开始先通过对源码的简单修改使其能够在本地编译器运行起来,这样可以打印输出一些中间信息,对于理解源码的逻辑,程序实现的过程能够起到事半功倍的效果. IPerf主要分为如下几个模块: 选项参数处理: 线程封装和角色扮演: 四种线程模式(或者说角色): 客户端线程: 服务端线程: 报告者线程: 监听者线程. 套接字选项设置与提取: 链表和数组的封装和维护: 处理多并发Condition条件变量的封装: 时间戳封装

第二章 Google guava cache源码解析1--构建缓存器

1.guava cache 当下最常用最简单的本地缓存 线程安全的本地缓存 类似于ConcurrentHashMap(或者说成就是一个ConcurrentHashMap,只是在其上多添加了一些功能) 2.使用实例 具体在实际中使用的例子,去查看<第七章 企业项目开发--本地缓存guava cache>,下面只列出测试实例: import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;