golang 无限制同步队列(unlimited buffer channel)

问题

如何支持一个无容量限制的channel

  • 取出元素会阻塞到元素存在并且返回
  • 放入元素永远不会阻塞,都会立即返回

方法一:用两个chan加一个list模拟

在单独的goroutine处理入队和出队,这样不用给list加锁。

完整代码:https://github.com/luweimy/goutil/blob/master/syncq/syncq.go

q := &SyncQueue{
    ctx:    ctx,
    cancel: cancel,
    l:      list.New(),
    max:    max,
    in:     make(chan interface{}),
    out:    make(chan interface{}),
}
func (q *SyncQueue) dispatch() {
    for {
        if q.l.Len() == 0 {
            // the queue is empty, only enqueue is allowed.
            select {
            case v := <-q.in:
                q.l.PushBack(v)
            case <-q.ctx.Done():
                return
            }
        }
        e := q.l.Front()
        if q.max > 0 && q.l.Len() >= q.max {
            // the queue is full, only dequeue is allowed.
            select {
            case q.out <- e.Value:
                q.l.Remove(e)
            case <-q.ctx.Done():
                return
            }
        } else {
            // enqueue and dequeue are allowed.
            select {
            case value := <-q.in:
                q.l.PushBack(value)
            case q.out <- e.Value:
                q.l.Remove(e)
            case <-q.ctx.Done():
                return
            }
        }
    }
}

但是这种方法速度很慢,跑benchmark只有1234 ns/op

方法二:用sync.Cond通知

这个方法比较简单,就是利用sync.Cond的通知机制。

出队时,检测队列内有无元素,有就直接返回,没有则阻塞等待条件变量。

入队时,触发条件变量通知一个阻塞的端点恢复运行。

完整代码:https://github.com/luweimy/goutil/blob/master/syncq2/syncq2.go

func (q *SyncQueue) Enqueue(value interface{}) {
    call.WithLock(q.cond.L, func() {
        q.l.PushBack(value)
        q.cond.Signal()
    })
}

func (q *SyncQueue) Dequeue() interface{} {
    var v interface{}
    call.WithLock(q.cond.L, func() {
        // if queue is empty, wait enqueue
        for q.l.Len() <= 0 {
            q.cond.Wait()
        }
        v = q.l.Remove(q.l.Front())
    })
    return v
}

这种方法速度比上面的快,跑benchmark241 ns/op

原文地址:https://www.cnblogs.com/luweimy/p/8448626.html

时间: 2024-10-11 11:36:33

golang 无限制同步队列(unlimited buffer channel)的相关文章

golang:高性能消息队列moonmq的简单使用

在上一篇moonmq的介绍中(这里),我只简短的罗列了一些moonmq的设计想法,可是对于怎样使用并没有具体说明,公司同事无法非常好的使用. 对于moonmq的使用,事实上非常easy,例子代码在这里,我们仅仅须要处理好broker,consumer以及publisher的关系就能够了. 首先,我们须要启动一个broker,由于moonmq如今仅仅支持tcp的自己定义协议,所以broker启动的时候须要指定一个listen address. #启动broker ./simple_broker -

Python 多线程同步队列模型

Python 多线程同步队列模型 我面临的问题是有个非常慢的处理逻辑(比如分词.句法),有大量的语料,想用多线程来处理. 这一个过程可以抽象成一个叫"同步队列"的模型. 具体来讲,有一个生产者(Dispatcher)一方面从语料中读入句子,并且存入队列中,一方面看有没有空闲的消费者(Segmentor),如果有,就把句子从队列中弹出并交给这个空闲的消费者处理. 然后消费者把处理完成的结果交给生产者输出,生产者要保证输出与输入顺序一致. 消费者是典型的threading,它需要看见生成者

【死磕Java并发】-----J.U.C之AQS:CLH同步队列

此篇博客所有源码均来自JDK 1.8 在上篇博客[死磕Java并发]-–J.U.C之AQS:AQS简介中提到了AQS内部维护着一个FIFO队列,该队列就是CLH同步队列. CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态. 在CLH同步队列中,一个节点表示一个线程

python3.4多线程实现同步的四种方式(锁机制、条件变量、信号量和同步队列)

临界资源即那些一次只能被一个线程访问的资源,典型例子就是打印机,它一次只能被一个程序用来执行打印功能,因为不能多个线程同时操作,而访问这部分资源的代码通常称之为临界区. threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁 import threading import time class Num: def __init__(self): self.num = 0 self.lock = threading.Lock() def add(self)

AQS同步队列器之一:介绍以及简单使用

一.简介 JDK1.5之前都是通过synchronized关键字实现并发同步,而JDK1.5以后Doug Lea大师开发了current包下的类,通过JAVA代码实现了synchronized关键的语义.然而在current包下的这些类的实现大部分都不离不开一个基础组件----AQS(AbstractQueuedSynchronizer)也就是同步队列器. AQS定义了一套多线程访问共享资源的同步框架,比如ReentrantLock.CountDownLatch等都是依赖这个基础组件实现的.深入

(原创)C++ 同步队列

(原创)C++ 同步队列 同步队列作为一个线程安全的数据共享区,经常用于线程之间数据读取,比如半同步半异步线程池的同步队列. 其实做起来比较简单,要用到list.锁和条件变量,条件变量的作用是在队列满了或者空了的时候等待通知.先看一个简单的同步队列: #include <thread> #include <condition_variable> #include <mutex> #include <list> #include <iostream>

go语言之行--golang核武器goroutine调度原理、channel详解

一.goroutine简介 goroutine是go语言中最为NB的设计,也是其魅力所在,goroutine的本质是协程,是实现并行计算的核心.goroutine使用方式非常的简单,只需使用go关键字即可启动一个协程,并且它是处于异步方式运行,你不需要等它运行完成以后在执行以后的代码. go func()//通过go关键字启动一个协程来运行函数 二.goroutine内部原理 概念介绍 在进行实现原理之前,了解下一些关键性术语的概念. 并发 一个cpu上能同时执行多项任务,在很短时间内,cpu来

【Java并发编程实战】—– AQS(四):CLH同步队列

在[Java并发编程实战]-–"J.U.C":CLH队列锁提过,AQS里面的CLH队列是CLH同步锁的一种变形. 其主要从双方面进行了改造:节点的结构与节点等待机制.在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁.入队列.释放锁等实现都与头尾节点相关.而且每一个节点都引入前驱节点和后兴许节点的引用:在等待机制上由原来的自旋改成堵塞唤醒. 其结构例如以下: 知道其结构了,我们再看看他的实现.在线程获取锁时会调用AQS的acquire()方法.该方法第一次尝试获取锁假设

关于golang中IO相关的Buffer类浅析

io重要的接口 在介绍buffer之前,先来认识两个重要的接口,如下边所示: type Reader interface { Read(p []byte) (n int, err error) } type Writer interface { Write(p []byte) (n int, err error) } 上边两个接口在golang sdk安装目录src/io/io.go中定义.后边凡是涉及到io相关操作的,基本上都实现了这两个接口,如: 1. package bufio 中的Rea