fasthttp中的协程池实现

fasthttp中的协程池实现

协程池可以控制并行度,复用协程。fasthttp 比 net/http 效率高很多倍的重要原因,就是利用了协程池。实现并不复杂,我们可以参考他的设计,写出高性能的应用。

入口

// server.go

func (s *Server) Serve(ln net.Listener) error {
    var lastOverflowErrorTime time.Time
    var lastPerIPErrorTime time.Time
    var c net.Conn
    var err error

    maxWorkersCount := s.getConcurrency()
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
    }
    // break-00
    wp.Start()

    for {
        // break-02
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            wp.Stop()
            if err == io.EOF {
                return nil
            }
            return err
        }

        // break-03
        if !wp.Serve(c) {
            s.writeFastError(c, StatusServiceUnavailable,
                "The connection cannot be served because Server.Concurrency limit exceeded")
            c.Close()
            if time.Since(lastOverflowErrorTime) > time.Minute {
                s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+
                    "Try increasing Server.Concurrency", maxWorkersCount)
                lastOverflowErrorTime = CoarseTimeNow()
            }

            // The current server reached concurrency limit,
            // so give other concurrently running servers a chance
            // accepting incoming connections on the same address.
            //
            // There is a hope other servers didn‘t reach their
            // concurrency limits yet :)
            time.Sleep(100 * time.Millisecond)
        }
        c = nil
    }
}

// 有必要了解一下 workerPool 的结构
type workerPool struct {
    // Function for serving server connections.
    // It must leave c unclosed.
    WorkerFunc func(c net.Conn) error

    MaxWorkersCount int

    LogAllErrors bool

    MaxIdleWorkerDuration time.Duration

    Logger Logger

    lock         sync.Mutex
    workersCount int
    mustStop     bool

    ready []*workerChan

    stopCh chan struct{}

    workerChanPool sync.Pool
}

goroutine status:

  1. main0: wp.Start()

break-00

// workerpool.go

// 启动一个 goroutine, 每隔一段时间,清理一下 []*workerChan;
// wp.clean() 的操作是 查看最近使用的workerChan, 如果他的最近使用间隔大于某个值,那么把这个workerChan清理了。
func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            // break-01
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

goroutine status:

  1. main0: wp.Start()
  2. g1: for loop to clean idle workerChan

break-01

func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()

    // Clean least recently used workers if they didn‘t serve connections
    // for more than maxIdleWorkerDuration.
    currentTime := time.Now()

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    // 这里从队列头部取出超过 最大空闲时间 的workerChan。
    // 可以看出,最后使用的workerChan 一定是放回队列尾部的。
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++
    }
    // 把空闲的放入 scratch, 剩余的放回 ready
    *scratch = append((*scratch)[:0], ready[:i]...)
    if i > 0 {
        m := copy(ready, ready[i:])
        for i = m; i < n; i++ {
            ready[i] = nil
        }
        wp.ready = ready[:m]
    }
    wp.lock.Unlock()

    // Notify obsolete workers to stop.
    // This notification must be outside the wp.lock, since ch.ch
    // may be blocking and may consume a lot of time if many workers
    // are located on non-local CPUs.
    tmp := *scratch
    // 销毁的操作就是向 chan net.Conn 中塞入一个 nil, 后面会看到解释
    for i, ch := range tmp {
        ch.ch <- nil
        tmp[i] = nil
    }
}

break-02

acceptConn(s, ln, &lastPerIPErrorTime) 主要处理 ln.Accept(),判断err是否是 Temporary 的,最终返回一个 net.Conn

break-03

// workerpool.go

func (wp *workerPool) Serve(c net.Conn) bool {
    // break-04
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    ch.ch <- c
    return true
}

type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}

wp.getCh() 返回一个 *workerChan, 可以看到, workerChan 有一个 ch 属性,参数传入的 net.Conn 直接往里面塞。

break-04

// workerpool.go

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        // ready 为空,并且总数小于 MaxWorkersCount,那么需要创建新的 workerChan
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        // 从队列尾部取出一个 workerChan
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        // 走入创建流程,从 Pool中取出 workerChan
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        // 创建goroutine处理请求,接收一个 chan *workerChan 作为参数
        go func() {
            // break-05
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

这里我们只看创建的流程。如果ready为空,说明ready被耗尽,并且小于 MaxWorkersCount,那么需要创建新的 workerChan。
创建时,先从 Pool 中取出复用,如果为nil,再创建新的。
可以预测到,这里 wp.workerFunc(ch) 必定包含一个 for 循环,处理 workerChan 中的 net.Conn。

goroutine status:

  1. main0: wp.Start()
  2. g1: for loop to clean idle workerChan
  3. g2: wp.workerFunc(ch) blocks for handling connection

break-05

// workerpool.go

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch {
        if c == nil {
            break
        }

        // 正真的处理请求的函数
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
            errStr := err.Error()
            if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
                strings.Contains(errStr, "reset by peer") ||
                strings.Contains(errStr, "i/o timeout")) {
                wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
            }
        }
        if err != errHijacked {
            c.Close()
        }
        c = nil

        // 释放 workerChan
        // break-06
        if !wp.release(ch) {
            break
        }
    }

    // 跳出 for range 循环, 意味着 从chan中取得一个 nil,或者 wp.mustStop 被设为了true,这是主动停止的方法。
    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

for range 不断从 chan net.Conn 中获取连接。大家是否还记得 在 func (wp *workerPool) Serve(c net.Conn) bool 函数中,一个重要操作就是把 accept 到的connection,放入 channel.
最后,需要把当前的 workerChan 释放回 workerPool 的 ready 中。

break-06

func (wp *workerPool) release(ch *workerChan) bool {
    ch.lastUseTime = CoarseTimeNow()
    wp.lock.Lock()
    if wp.mustStop {
        wp.lock.Unlock()
        return false
    }
    wp.ready = append(wp.ready, ch)
    wp.lock.Unlock()
    return true
}

释放操作中,注意到 修改了 ch.lastUseTime , 还记得 clean 操作吗?在 g1 协程中运行着呢。所以最后的运行状态是:

goroutine status:

  1. main0: wp.Start()
  2. g1: for loop to clean idle workerChan
  3. g2: wp.workerFunc(ch) blocks for handling connection
  4. g3: ....
  5. g4: ....

按需增长 goroutine 数量,但是也有一个最大值, 所以并行度是可控的。当请求密集时,一个 worker goroutine 可能会串行处理多个 connection。wokerChan 在 Pool 中被复用,对GC的压力会减小很多。

而对比原生的 net/http 包,并行度不可控(可能不确定,runtime 会有控制? ),goroutine 不可被复用,体现在一个请求一个goroutine, 用完就销毁了,对机器压力更大。

来源: https://segmentfault.com/a/1190000009133154

时间: 2024-11-08 21:36:51

fasthttp中的协程池实现的相关文章

深入tornado中的协程

tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费. tornado中的I/O多路复用前面已经讲过了.本文不做详细解释. 来看一下tornado中的协程模块:tornado.gen: tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步. 先来说一下tornado.gen.coroutine的实现思路: 我们知道generator中的yield语句可以使

在PHP中使用协程实现多任务调度

PHP5.5一个比较好的新功能是加入了对迭代生成器和协程的支持.对于生成器,PHP的文档和各种其他的博客文章已经有了非常详细的讲解.协程相对受到的关注就少了,因为协程虽然有很强大的功能但相对比较复杂, 也比较难被理解,解释起来也比较困难. 这篇文章将尝试通过介绍如何使用协程来实施任务调度, 来解释在PHP中的协程. 我将在前三节做一个简单的背景介绍.如果你已经有了比较好的基础,可以直接跳到“协同多任务处理”一节. 迭代生成器 生成器也是一个函数,不同的是这个函数的返回值是依次输出,而不是只返回一

Unity中的协程(一)

这篇文章很不错的问题,推荐阅读英文原版: Introduction to Coroutines Scripting with Coroutines   这篇文章转自:http://blog.csdn.net/huang9012/article/details/38492937 协程介绍 在Unity中,协程(Coroutines)的形式是我最喜欢的功能之一,几乎在所有的项目中,我都会使用它来控制运动,序列,以及对象的行为.在这个教程中,我将会说明协程是如何工作的,并且会附上一些例子来介绍它的用法

Python 中的协程 (5) 无阻塞

1 异步程序依然会假死 freezing 1)一般程序的调用方 freezing import asyncio import time import threading #定义一个异步操作 async def hello1(a,b): print(f"异步函数开始执行") await asyncio.sleep(3) print("异步函数执行结束") return a+b #在一个异步操作里面调用另一个异步操作 async def main(): c=await

python 并发编程 基于gevent模块 协程池 实现并发的套接字通信

基于协程池 实现并发的套接字通信 客户端: from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 8080)) while True: msg = input(">>>:").strip() if not msg:break client.send(msg.encode("utf-8")) data = client.recv(

Unity中使用协程进行服务端数据验证手段

近期在做项目中的个人中心的一些事情,用户头像上传,下载,本地缓存,二级缓存,压缩,这些都要做,麻雀虽小五脏俱全啊,也是写的浑浑噩噩的, 当我们在上传用户头像的时候,向服务端发送上传头像请求之前,一般都会做一次验证,向服务端获取token验证信息,来确保非法上传,如果不做这个那么会有非法用户上传非法图像,使你的服务器 带来未知的灾难. 而验证的逻辑很好写,并没有什么难度,比如: Server.SendMessage("获取token"); Client.Receive(string to

【Unity3D基础教程】(五):详解Unity3D中的协程(Coroutine)

[狗刨学习网] 为什么需要协程 在游戏中有许多过程(Process)需要花费多个逻辑帧去计算. 你会遇到"密集"的流程,比如说寻路,寻路计算量非常大,所以我们通常会把它分割到不同的逻辑帧去进行计算,以免影响游戏的帧率. 你会遇到"稀疏"的流程,比如说游戏中的触发器,这种触发器大多数时候什么也不做,但是一旦被调用会做非常重要的事情(比图说游戏中自动开启的门就是在门前放了一个Empty Object作为trigger,人到门前就会触发事件). 不管什么时候,如果你想创建

【Unity优化】如何实现Unity编辑器中的协程

Unity编辑器中何时需要协程 当我们定制Unity编辑器的时候,往往需要启动额外的协程或者线程进行处理.比如当执行一些界面更新的时候,需要大量计算,如果用户在不断修正一个参数,比如从1变化到2,这种变化过程要经历无数中间步骤,调用N多次Update,如果直接在Update中不断刷新,界面很容易直接卡死.所以在一个协程中进行一些优化,只保留用户最后一次参数修正,省去中间步骤,就会好很多.这属于Unity编辑器的内容,也属于优化的内容,还是放在优化中吧. 解决问题思路 Unity官网的questi

Unity中使用协程实现倒计时功能

unity中协程的功能很强大,能够充分发挥unity协程功能的地方就是游戏的倒计时,今天我们就来实现一个简易版本的倒计时. 新建一个场景,给camera添加一个脚本,脚本内容如下: using UnityEngine; using System.Collections; public class ShowNumber : MonoBehaviour { private int tmp = 10; // Use this for initialization void Start () { //开