如何优雅的关闭golang的channel

How to Gracefully Close Channels,这篇博客讲了如何优雅的关闭channel的技巧,好好研读,收获良多。

众所周知,在golang中,关闭或者向已关闭的channel发送数据都会引发panic。

谨遵优雅关闭channel的原则

  • 不要在接受一端关闭channel
  • 不要在有多个并发的senders中关闭channel。反过来说,如果只有一个协程充当sender,那么我们可以在这个sender协程内关闭掉channel。

一个简单的方法

  • SafeClose
type MyChannel struct {
    C      chan T
    closed bool
    mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    if !mc.closed {
        close(mc.C)
        mc.closed = true
    }
}

func (mc *MyChannel) IsClosed() bool {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    return mc.closed
}
  • SafeSend
func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
        if recover() != nil {
            closed = true
        }
    }()

    ch <- value  // panic if ch is closed
    return false // <=> closed = false; return
}
  • [x] 那边英文博客有一句话

    One drawback of the above SafeSend function is that its calls can‘t be used as send operations which follow the case keyword in select blocks.

这里指的是SafeSend方法不能用在select...case...的case接受操作中,即

select {
    case <- SafeSend(ch, 1)
}

因为case后面需要一个channel。

优雅关闭channel的设计

  • 多个receivers,一个sender的情况。
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 100

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int, 100)

    // the sender
    go func() {
        for {
            if value := rand.Intn(MaxRandomNumber); value == 0 {
                // The only sender can close the channel safely.
                close(dataCh)
                return
            } else {
                dataCh <- value
            }
        }
    }()

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()

            // Receive values until dataCh is closed and
            // the value buffer queue of dataCh is empty.
            for value := range dataCh {
                log.Println(value)
            }
        }()
    }

    wgReceivers.Wait()
}
  • 一个receiver,多个senders的情况。
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)

    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel dataCh.
        // Its receivers are the senders of channel dataCh.

    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                // The try-receive operation is to try to exit
                // the goroutine as early as possible. For this
                // specified example, it is not essential.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first branch in the
                // second select may be still not selected for some
                // loops if the send to dataCh is also unblocked.
                // But this is acceptable for this example, so the
                // first select block above can be omitted.
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(MaxRandomNumber):
                }
            }
        }()
    }

    // the receiver
    go func() {
        defer wgReceivers.Done()

        for value := range dataCh {
            if value == MaxRandomNumber-1 {
                // The receiver of the dataCh channel is
                // also the sender of the stopCh channel.
                // It is safe to close the stop channel here.
                close(stopCh)
                return
            }

            log.Println(value)
        }
    }()

    // ...
    wgReceivers.Wait()
}
  • 多个receivers和多个senders
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown below.
        // Its receivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
        // The channel toStop is used to notify the moderator
        // to close the additional signal channel (stopCh).
        // Its senders are any senders and receivers of dataCh.
        // Its receiver is the moderator goroutine shown below.
        // It must be a buffered channel.

    var stoppedBy string

    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(MaxRandomNumber)
                if value == 0 {
                    // Here, the try-send operation is to notify the
                    // moderator to close the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }

                // The try-receive operation here is to try to exit the
                // sender goroutine as early as possible. Try-receive
                // try-send select blocks are specially optimized by the
                // standard Go compiler, so they are very efficient.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first branch in this
                // select block may be still not selected for some
                // loops (and for ever in theory) if the send to dataCh
                // is also non-blocking. If this is not acceptable,
                // then the above try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()

            for {
                // Same as the sender goroutine, the try-receive
                // operation here is to try to exit the receiver
                // goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first branch in this
                // select block may be still not selected for some
                // loops (and for ever in theory) if the receive from
                // dataCh is also non-blocking. If this is not acceptable,
                // then the above try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == MaxRandomNumber-1 {
                        // The same trick is used to notify
                        // the moderator to close the
                        // additional signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }

                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }

    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

原文地址:https://www.cnblogs.com/linyihai/p/10612409.html

时间: 2025-01-11 13:46:48

如何优雅的关闭golang的channel的相关文章

golang控制channel的出入口

golang控制channel的出入口 我们常常使用channel来在多个goroutine之间做数据通讯,但是chan作为函数的入参我们应该怎么写呢?也许有人觉得这个问题比较傻,不过这个还真的是我今天才知道的. 首先我们看看下面的代码: func main() { c := make(chan int) go in(c) go out(c) time.Sleep(time.Second) } func in(c chan int) { for i := 0; i < 10; i++ { c <

如何关闭Golang中的HTTP连接 How to Close Golang&#39;s HTTP connection

我们的一个服务是用Go写的,在测试的时候发现几个小时之后它就会core掉,而且core的时候没有打出任何堆栈信息,简单分析后发现该服务中的几个HTTP服务的连接数不断增长,而我们的开发机的fd limit只有1024,当该服务所属进程的连接数增长到系统的fd limit的时候,它被操作系统杀掉了... 这个服务中,我们会定期向一个HTTP服务器发起POST请求,因为请求非常不频繁,所以想采用短连接的方式去做.请求代码大概长这样: func dialTimeout(network, addr st

Linux系统下如何优雅地关闭Java进程?

资料出处: http://www.sohu.com/a/329564560_700886 https://www.cnblogs.com/nuccch/p/10903162.html 前言 Linux系统下如何kill掉一个后台Java进程,相信童鞋们都知道如何操作.首先使用ps命令查找该Java进程的进程ID,然后使用kill命令进行杀掉.命令如下: (1)ps查进程ID [[email protected] ~]$ ps -ef | grep Test user 2095020809 0 2

如果优雅地关闭ExecutorService提供的java线程池

ExecutorService让我们可以优雅地在程序中使用线程池来创建和管理线程,而且性能佳.开销小,还可以有效地控制最大并发线程数,是我们在java并发编程中会经常使用到的. 每一个线程都会占用系统资源,因此线程池的关闭与清理同样重要,本文介绍我们如何优雅地关闭线程池. 一. ExecutorService中关闭线程池的方法 1. shutdown() 停止接收新任务,原来的任务继续执行 停止接收新的submit的任务: 已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成: 等到第2

如何优雅的关闭Java线程池

面试中经常会问到,创建一个线程池需要哪些参数啊,线程池的工作原理啊,却很少会问到线程池如何安全关闭的. 也正是因为大家不是很关注这块,即便是工作三四年的人,也会有因为线程池关闭不合理,导致应用无法正常stop的情况,还有出现一些报错的问题. 本篇就以ThreadPoolExecutor为例,来介绍下如何优雅的关闭线程池. 01 线程中断 在介绍线程池关闭之前,先介绍下Thread的interrupt. 在程序中,我们是不能随便中断一个线程的,因为这是极其不安全的操作,我们无法知道这个线程正运行在

深入学习golang(2)—channel

Channel 1. 概述 “网络,并发”是Go语言的两大feature.Go语言号称“互联网的C语言”,与使用传统的C语言相比,写一个Server所使用的代码更少,也更简单.写一个Server除了网络,另外就是并发,相对python等其它语言,Go对并发支持使得它有更好的性能. Goroutine和channel是Go在“并发”方面两个核心feature. Channel是goroutine之间进行通信的一种方式,它与Unix中的管道类似. Channel声明: ChannelType = (

Golang的channel使用以及并发同步技巧

在学习<The Go Programming Language>第八章并发单元的时候还是遭遇了不少问题,和值得总结思考和记录的地方. 做一个类似于unix du命令的工具.但是阉割了一些功能,这里应该只实现-c(统计total大小) 和-h(以human比较容易辨识的显示出来)的功能. 首先我们需要构造一个 能够返回FileInfo信息数组的函数,我们把它取名为dirEntries: func dirEntries(dir string) []os.FileInfo { entries, er

golang中channel的超时处理

并发中超时处理是必不可少的,golang没有提供直接的超时处理机制,但可以利用select机制来解决超时问题. func timeoutFunc() { //首先,实现并执行一个匿名的超时等待函数 timeout := make(chan bool, 1) go func() { time.Sleep(1e9) //等待1秒钟 timeout <- true }() //然后,我们把timeout这个channel利用起来 select { case <- ch: //从ch中读到数据 cas

golang 之 channel

channel的机制是先进先出 无缓冲的channel: 如果你给channel赋值了,那么必须要读取它的值,不然就会造成阻塞. chreadandwrite :=make(chan int) chonlyread := make(<-chan int) //创建只读channel chonlywrite := make(chan<- int) //创建只写channel 有缓冲的channel: 发送方会一直阻塞直到数据被拷贝到缓冲区: 如果缓冲区已满,则发送方只能在接收方取走数据后才能从阻