深入讨论channel timeout

深入讨论channel timeout

Go 语言的 channel 本身是不支持 timeout 的,所以一般实现 channel 的读写超时都采用 select,如下:

select {
case <-c:
case <-time.After(time.Second):
}

这两天在写码的过程中突然对这样实现 channel 超时产生了怀疑,这种方式真的好吗?于是我写了这样一个测试程序:

package main

import (
    "os"
    "time"
)

func main() {
    c := make(chan int, 100)

    go func() {
        for i := 0; i < 10; i++ {
            c <- 1
            time.Sleep(time.Second)
        }

        os.Exit(0)
    }()

    for {
        select {
        case n := <-c:
            println(n)
        case <-timeAfter(time.Second * 2):
        }
    }
}

func timeAfter(d time.Duration) chan int {
    q := make(chan int, 1)

    time.AfterFunc(d, func() {
        q <- 1
        println("run") 		// 重点在这里
    })

    return q
}

这个程序很简单,你会发现运行结果将会输出 10 次 “run”,也就是每一遍执行 select 注册的 timer 最终都执行了,虽然这里读 channel 都没有超时。原因其实很简单,每次执行 select 语句,都会将 case 条件语句给执行一遍,于是 timeAfter 的执行结果就是会创建一个定时器,并注册到 runtime 中,select 语句执行完成后,这个定时器本身并没有撤销,还继续保留在 runtime 的小顶堆中,所以这些 timer 一超时就会执行挂载的函数。

当然,用 time.After() 函数来做 channel 的读写超时,在应用层根本感受不到底层的定时器还保留着、继续执行;问题是,如果这里的 select 语句在循环中执行得非常快,也就是 channel 中的消息来得非常频繁,会出现的问题就是 runtime 中会有大量的定时器存在,timeout 的时间设置得越长,底层维护的定时器就会越多。原因就是每次 select 都会注册一个新的 timer,并且 timer 只有在它超时后才会被删除。

想想,自己的 channel 每秒钟将传输成千上万的消息,将会有多少 timer 对象存在底层 runtime 中。大量的临时对象会不会影响内存?大量的 timer 会不会影响其他定时器的准确度?

最后,我觉得正确的 channel timeout 也许应该这么做:

to := time.NewTimer(time.Second)
for {
    to.Reset(time.Second)
    select {
    case <-c:
    case <-to.C:
    }
}

这样做就是为了维护一个全局单一的定时器,每次操作前调整一下定时器的超时时间,从而避免每次循环都生成新的定时器对象。

简单测试了一下两种 channel 超时实现方式,在全力收发数据的情况的内存对象和 gc 情况。 
 
* 蓝线是采用 time.After(),并设置4s 超时的堆内存对象分配的数量 * 绿线是采用 time.After(),并设置2s 超时的堆内存对象分配的数量 * 黄线是采用全局 timer,并设置4s 超时的堆内存对象分配的数量

这个现象其实是预料之中的,重点可以注意设置的超时时间越长,time.After() 的表现将越糟糕。

这三条线和上图的三条线描述的对象是一样的,图中的 gc 时间是平均每次 gc 的时间。

针对这个 channel timeout,我没有去测试是否会影响其他定时器的准确性,但我认为这是必然的,随着定时器的增多。

最后,我始终觉得 channel 本身应该支持超时机制,而不是利用 select 来实现。

另外参见:

如何正确使用 Timer 来完成上面提到的定时任务?

func demo(input chan interface{}) {
    t1 := time.NewTimer(time.Second * 5)
    t2 := time.NewTimer(time.Second * 10)

    for {
        select {
        case msg <- input:
            println(msg)

        case <-t1.C:
            println("5s timer")
            t1.Reset(time.Second * 5)

        case <-t2.C:
            println("10s timer")
            t2.Reset(time.Second * 10)
        }
    }
}

改正后的程序,原理上是自定义两个全局的 Timer,每次执行 select 都重复使用这两个 Timer,而不是每次都生成全新的。这样才可以真正做到在接收消息的同时,还能够定时的执行相应的任务。



探索任何一个现象背后的真正原因,才是最有趣的事情。

来源: http://skoo.me/go/2014/07/09/channel-timeout

时间: 2024-10-11 15:57:50

深入讨论channel timeout的相关文章

IO复用

IO复用简单介绍 IO复用使得程序能同一时候监听多个文件描写叙述符.这对提高程序的性能至关重要.通常.网络程序在下列情况下须要使用IO复用技术: client程序要同一时候处理多个socket. client程序要同一时候处理用户输入和网络连接. TCPserver同一时候处理监听socket和连接socket. server要同一时候处理TCP请求和UDP请求. 须要指出的是.IO复用尽管能同一时候监听多个文件描写叙述符,但它本身是堵塞的.而且当多个文件描写叙述符同一时候就绪时,假设不採取额外

yate.conf

仅仅是配置文件,粘贴如下,不做解释!去掉了很多功能!仅保留sip电话! [general] ; General settings for the operation of Yate ; modload: boolean: Should a module be loaded by default if there is no ; reference to it in the [modules] section modload=disable ; modpath: string: Overrides

Go语言编程(旧读书笔记)

Go语言编程 目录 [隐藏] 1 前言 2 初识Go语言 3 顺序编程 4 OOP 5 并发编程 6 网络编程 7 安全编程 8 工程管理 9 开发工具 10 进阶话题 11 附录A [编辑]前言 协程? go run('test') Go强制了{ }的编写风格: if expression { 错误处理: defer?相当于finally?注意这里的作用域,defer之前似乎没有用{ }把整个(try)块括起来? defer相当于说推迟语句的执行,其内部实现会自动管理嵌套作用域的问题? Go允

16 Go Concurrency Patterns: Timing out, moving on

Go Concurrency Patterns: Timing out, moving on 23 September 2010 Concurrent programming has its own idioms. A good example is timeouts. Although Go's channels do not support them directly, they are easy to implement. Say we want to receive from the c

golang中select实现非阻塞及超时控制

// select.go package main import ( "fmt" "time" //"time" ) func main() { //声明一个channel ch := make(chan int) //声明一个匿名函数,传入一个参数整型channel类型ch go func(ch chan int) { ch <- 1 //往channel写入一个数据,此时阻塞 }(ch) //由于goroutine执行太快,先让它sle

480000 millis timeout while waiting for channel to be ready for write异常处理

2014-08-25 15:35:05,691 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(10.130.136.136:50010, storageID=DS-1533727399-10.130.136.136-50010-1388038551296, infoPort=50075, ipcPort=50020):DataXceiver java.net.SocketTimeoutExc

nsq源码阅读笔记之nsqd(四)——Channel

与Channel相关的代码主要位于nsqd/channel.go, nsqd/nsqd.go中. Channel与Topic的关系 Channel是消费者订阅特定Topic的一种抽象.对于发往Topic的消息,nsqd向该Topic下的所有Channel投递消息,而同一个Channel只投递一次,Channel下如果存在多个消费者,则随机选择一个消费者做投递.这种投递方式可以被用作消费者负载均衡. Channel从属于特定Topic,可以认为是Topic的下一级.在同一个Topic之下可以有零个

Java NIO:Buffer、Channel 和 Selector

Buffer 一个 Buffer 本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据. java.nio 定义了以下几个 Buffer 的实现,这个图读者应该也在不少地方见过了吧. 其实核心是最后的 ByteBuffer,前面的一大串类只是包装了一下它而已,我们使用最多的通常也是 ByteBuffer. 我们应该将 Buffer 理解为一个数组,IntBuffer.CharBuffer.DoubleBuffer 等分别对应 int[].char[].double[] 等.

Java网络编程和NIO详解4:浅析NIO包中的Buffer、Channel 和 Selector

Java网络编程与NIO详解4:浅析NIO包中的Buffer.Channel 和 Selector 转自https://www.javadoop.com/post/nio-and-aio 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎阅览我的CSDN专栏:Java网络编程和NIO https://blog.csdn.net/column/details/21963.html 部分代码会放在我的的Github:https://github.com/h2pl/ J