Buffered Channels and Worker Pools

原文链接:https://golangbot.com/buffered-channels-worker-pools/

buffered channels

  • 带有缓冲区的channel 只有在缓冲区满之后 channel才会阻塞

WaitGroup

  • 如果有多个 goroutine在后台执行 那么需要在主线程中 多次等待 可以有一个简单的方法 就是 通过WaitGroup 可以控制 Goroutines 直到它们都执行完成

例子

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()![](https://images2018.cnblogs.com/blog/736597/201803/736597-20180312115109926-1714494090.png)

    fmt.Println("All go routines finished executing")
}

Worker Pool Implementation

先贴一下个人理解的 程序执行的流程图

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    id       int
    randomno int
}
type Result struct {
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(5 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

func main() {
    startTime := time.Now()

    noOfJobs := 100
    go allocate(noOfJobs)

    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done

    endTime := time.Now()

    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

原文地址:https://www.cnblogs.com/alin-qu/p/8548512.html

时间: 2024-10-17 12:57:34

Buffered Channels and Worker Pools的相关文章

A Tour of Go Buffered Channels

Channels can be buffered. Provide the buffer length as the second argument to make to initialize a buffered channel: ch := make(chan int, 100) Sends to a buffered channel block only when the buffer is full. Receives block when the buffer is empty. Mo

goroutine与channels

goroutine(协程) 大家都知道java中的线程Thread,golang没有提供Thread的功能,但是提供了更轻量级的goroutine(协程),协程比线程更轻,创办一个协程很简单,只需要go关键字加上要运行的函数,就可以实现了.看个简单的例子: package main import "fmt" func f(from string) { for i := 0; i < 3; i++ { fmt.Println(from, ":", i) } }

golang中channels的本质详解,经典!

原文:https://www.goinggo.net/2014/02/the-nature-of-channels-in-go.html The Nature Of Channels In Go 这篇文章关于channel讲解得非常好,深度形象.深入浅出.尤其是这两幅图,太厉害了.非常清楚,一目了然. --------------------------------------------------------------------------------------------------

Go by Example

Go is an open source programming language designed for building simple, fast, and reliable software. Go by Example is a hands-on introduction to Go using annotated example programs. Check out the first exampleor browse the full list below. Hello Worl

Flink -- Barrier

CheckpointBarrierHandler 这个接口用于react从input channel过来的checkpoint barrier,这里可以通过不同的实现来,决定是简单的track barriers,还是要去真正的block inputs /** * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels. * Different implementation

初识Go(8)

并发 goroutinegoroutine 是 Go 并行设计的核心.goroutine 说到底其实就是线程,但是他比线程更小,十几个 goroutine 可能体现在底层就是五六个线程,Go 语言内部帮你实现了这些 goroutine之间的内存共享.执行 goroutine 只需极少的栈内存(大概是 4~5KB),当然会根据相应的数据伸缩.也正因为如此,可同时运行成千上万个并发任务. goroutine 比 thread 更易用.更高效.更轻便.goroutine 是通过 Go 的 runtim

channel _ buffering

By default channels are unbuffered, meaning that they will only accept sends(chan <-) if there is a corresponding receive (<- chan) ready to receive the sent value. Buffered channels accept a limited number of values without a corresponding receiver

netty12---简单源码

package com.cn; import java.io.IOException; import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import c

图解 Go 并发

你很可能从某种途径听说过 Go 语言.它越来越受欢迎,并且有充分的理由可以证明. Go 快速.简单,有强大的社区支持.学习这门语言最令人兴奋的一点是它的并发模型. Go 的并发原语使创建多线程并发程序变得简单而有趣.我将通过插图介绍 Go 的并发原语,希望能点透相关概念以方便后续学习.本文是写给 Go 语言编程新手以及准备开始学习 Go 并发原语 (goroutines 和 channels) 的同学. 单线程程序 vs. 多线程程序 你可能已经写过一些单线程程序.一个常用的编程模式是组合多个函