go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器

一、使用场景

大背景是从kafka 中读取oplog进行增量处理,但是当我想发一条命令将这个增量过程阻塞,然后开始进行一次全量同步之后,在开始继续增量。

所以需要对多个协程进行控制。

二、使用知识

1. 从一个未初始化的管道读会阻塞

2.从一个关闭的管道读不会阻塞

利用两个管道和select 进行控制

三、上代码

控制器代码

package util

import (
	"errors"
	"sync"
)

const (
	//STOP 停止
	STOP = iota
	//START 开始
	START
	//PAUSE 暂停
	PAUSE
)

//Control 控制器
type Control struct {
	ch1  chan struct{}
	ch2  chan struct{}
	stat int64
	lock sync.RWMutex
}

var (
	//ErrStat 错误状态
	ErrStat = errors.New("stat error")
)

//NewControl 获得一个新Control
func NewControl() *Control {
	return &Control{
		ch1:  make(chan struct{}),
		ch2:  nil,
		stat: START,
		lock: sync.RWMutex{},
	}
}

//Stop 停止
func (c *Control) Stop() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.stat == START {
		c.ch2 = nil
		close(c.ch1)
		c.stat = STOP
	} else if c.stat == PAUSE {
		ch2 := c.ch2
		c.ch2 = nil
		close(c.ch1)
		close(ch2)
		c.stat = STOP
	} else {
		return ErrStat
	}
	return nil
}

//Pause 暂停
func (c *Control) Pause() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.stat == START {
		c.ch2 = make(chan struct{})
		close(c.ch1)
		c.stat = PAUSE
	} else {
		return ErrStat
	}
	return nil
}

//Start 开始
func (c *Control) Start() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.stat == PAUSE {
		c.ch1 = make(chan struct{})
		close(c.ch2)
		c.stat = START
	} else {
		return ErrStat
	}
	return nil
}

//C 控制管道
func (c *Control) C() <-chan struct{} {
	c.lock.RLock()
	defer c.lock.RUnlock()
	return c.ch1
}

//Wait 等待
func (c *Control) Wait() bool {
	c.lock.RLock()
	ch2 := c.ch2
	c.lock.RUnlock()
	if ch2 == nil {  //通过赋值nil 发送停止推出命令
		return false
	}
	<-ch2  //会进行阻塞
	return true
}

使用代码

	for {
		select {
		case part, ok := <-c.Partitions():
			if !ok {
				conf.Logger.Error("get kafka Partitions not ok", regular.Name)
				return
			}
			go readFromPart(c, part, regular, respChan)
		case <-regular.C():   //regular 为Control 类
			if !regular.Wait() {
				conf.Logger.Debug("Stop! ")
				return
			}
			conf.Logger.Debug("Start! ")
		}
	}

这样就可以随时随地的控制工程中的协程

regular  := util.NewControl()
regular.Pause()
regular.Start()
regular.Stop()

  

原文地址:https://www.cnblogs.com/zhaosc-haha/p/11966215.html

时间: 2024-11-09 05:17:53

go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器的相关文章

golang协程——通道channel阻塞

新的一年开始了,不管今天以前发生了什么,向前看,就够了. 说到channel,就一定要说一说线程了.任何实际项目,无论大小,并发是必然存在的.并发的存在,就涉及到线程通信.在当下的开发语言中,线程通讯主要有两种,共享内存与消息传递.共享内存一定都很熟悉,通过共同操作同一对象,实现线程间通讯.消息传递即通过类似聊天的方式.golang对并发的处理采用了协程的技术.golang的goroutine就是协程的实现.协程的概念很早就有,简单的理解为轻量级线程,goroutine就是为了解决并发任务间的通

异步调用与回调机制,协程

1.异步调用与回调机制 上一篇我们已经了解到了两组比较容易混淆的概念问题,1.同步与异步调用 2.阻塞与非阻塞状态.在说到异步调用的时候,说到提交任务后,就直接执行下一行代码,而不去拿结果,这样明显存在缺陷,结果是肯定要拿的,这辈子都肯定是要拿到这个结果的,没有这个结果后面的活又不会干,没办法,只能去拿结果的,那么问题是异步调用提交任务后,如何实现既要拿到结果又不需要原地等的理想状态呢?专门为异步调用配备了一个方法--回调机制 先来想想我们之前是怎么拿到一个函数的结果,就传给另外一个函数取执行,

关于C10K、异步回调、协程、同步阻塞

最近到处在争论这些话题,发现很多人对一些基础的常识并不了解,在此发表一文做一下解释.此文未必能解答所有问题,各位能有一个大致的了解就好. C10K的由来 大家都知道互联网的基础就是网络通信,早期的互联网可以说是一个小群体的集合.互联网还不够普及,用户也不多.一台服务器同时在线100个用户估计 在当时已经算是大型应用了.所以并不存在什么C10K的难题.互联网的爆发期应该是在www网站,浏览器,雅虎出现后.最早的互联网称之为Web1.0, 互联网大部分的使用场景是下载一个Html页面,用户在浏览器中

Python 中的协程 (5) 无阻塞

1 异步程序依然会假死 freezing 1)一般程序的调用方 freezing import asyncio import time import threading #定义一个异步操作 async def hello1(a,b): print(f"异步函数开始执行") await asyncio.sleep(3) print("异步函数执行结束") return a+b #在一个异步操作里面调用另一个异步操作 async def main(): c=await

协程异步非阻塞

1.gevent. 在遇到io操作时会发生切换,切换gevent.joinall()中的gevent.spawn(a)去执行. 使用非gevent封装的sleep()时会发生阻塞 import gevent import time def a(): print("begin a",time.time()) time.sleep(1) print("end a ",time.time()) def b(): print("begin b",time

给协程加上同步互斥机制

前面一篇文章介绍了Linux内的同步互斥的概念.内核态和用户态Linux提供的同步/互斥接口.这里本文介绍下如何给协程加上同步.互斥机制. 简单说下协程coroutine: 参考文章 操作系统的课本中对进程.线程的定义:进程是最小的资源分配单位,线程是最小的调度单位. 随着互联网的飞速发展,互联网后台Server服务通常要面临高请求.高并发的挑战,一些业务Server通常要面临很高的网络IO请求.这也就是C10K问题. 现在对C10K问题的解决方案已经很成熟了,主要是 非阻塞IO+IO复用(ep

PHP 协程:Go + Chan + Defer

Swoole4为PHP语言提供了强大的CSP协程编程模式.底层提供了3个关键词,可以方便地实现各类功能. Swoole4提供的PHP协程语法借鉴自Golang,在此向GO开发组致敬 PHP+Swoole协程可以与Golang很好地互补.Golang:静态语言,严谨强大性能好,PHP+Swoole:动态语言,灵活简单易用 本文基于Swoole-4.2.9和PHP-7.2.9版本 关键词 go :创建一个协程 chan :创建一个通道 defer :延迟任务,在协程退出时执行,先进后出 这3个功能底

Python中monkey.patch_all()解决协程阻塞问题

直接参考以下实例,采用协程访问三个网站 由于IO操作非常耗时,程序经常会处于等待状态 比如请求多个网页有时候需要等待,gevent可以自动切换协程 遇到阻塞自动切换协程,程序启动时执行monkey.patch_all()解决 # 由于IO操作非常耗时,程序经常会处于等待状态 # 比如请求多个网页有时候需要等待,gevent可以自动切换协程 # 遇到阻塞自动切换协程,程序启动时执行monkey.patch_all()解决 # 首行添加下面的语句即可 from gevent import monke

基于ASIO的协程库orchid简介

什么是orchid? orchid是一个构建于boost库基础上的C++库,类似于python下的gevent/eventlet,为用户提供基于协程的并发模型. 什么是协程: 协程,即协作式程序,其思想是,一系列互相依赖的协程间依次使用CPU,每次只有一个协程工作,而其他协程处于休眠状态.协程在控制离开时暂停执行,当控制再次进入时只能从离开的位置继续执行. 协程已经被证明是一种非常有用的程序组件,不仅被python.lua.ruby等脚本语言广泛采用,而且被新一代面向多核的编程语言如golang

python并发编程之---协程

1.什么是协程 协程:是单线程下的并发,又称微线程,纤程. 协程是一种用户态的轻量级线程,协程是由用户程序自己控制调度的. 2.需要注意的点: 需要强调的是: #1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行) #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关) 对比操作系统控制线程的切换,用户在单线程内控制协程的切换 优点