sync:与golang的并发息息相关的包

楔子

我们知道golang除了兼顾了开发速度和运行效率之外,最大的亮点就是在语言层面原生支持并发,也就是通过所谓的goroutine。不过既然是并发,那么就势必会面临很多问题。比如:资源竞争,多个goroutine同时访问一个资源会发生竞争从而产生意想不到的结果,那么这时候我们会通过加锁来解决;主goroutine不能先退出,这时候我们会等待子goroutine。还有单例模式,以及对象池等等。那么golang是如何实现的呢?就是通过下面我们要介绍的sync包。

sync.Mutex

sync.Mutex称为互斥锁,主要就是解决资源竞争的问题,这个时候我们通过对会发生资源竞争的部分进行加锁来解决。因为锁只有一把,只能一个goroutine执行完毕把锁释放,其他的goroutine才有机会执行。

我们先来看看不加锁的版本

package main

import (
    "fmt"
    "time"
)

var num = 0

func add() {
    i := 0
    for ; i < 100000; i++ {
        num++
    }
}

func sub() {
    i := 0
    for ; i < 100000; i++ {
        num--
    }
}

func main() {

    go add()
    go sub()
    time.Sleep(time.Second * 3)
    fmt.Println(num)  //1314
}
//如果发现结果正常(为0的话),那就多执行即便

此时由于没有加锁,那么两个goroutine都访问num,那么得到的结果是会有误差的,理论上应该是0才对。下面我们就来加锁:

package main

import (
    "fmt"
    "sync"
    "time"
)

var num = 0

func add(lock *sync.Mutex) {
    i := 0
    for ; i < 100000; i++ {
        //表示上锁,在该锁为释放的话,那么其他goroutine使用lock.Lock()的时候就会阻塞
        lock.Lock()
        num++
        //不要忘记把锁释放,否则其他goroutine在尝试获取锁的时候都会阻塞
        lock.Unlock()
    }
}

func sub(lock *sync.Mutex) {
    i := 0
    for ; i < 100000; i++ {
        lock.Lock()
        num--
        lock.Unlock()
    }
}

func main() {

    //这个sync.Mutex是一个结构体,传递的时候是只传递,因此我们需要传递指针
    var lock = new(sync.Mutex)
    go add(lock)
    go sub(lock)
    time.Sleep(time.Second * 3)
    fmt.Println(num)  //0
}

sync.RWMutex

Mutex表示互斥锁,RWMutex表示读写互斥锁(简称:"读写锁")。为什么会有读写互斥锁,因为我们不希望多个goroutine对同一个资源进行修改,但如果是读取的话还是可以的。而Mutex比较严格,只要我上锁了,那么其它人就无法访问了,无论你是读还是写,必须等我完事之后你才可以开始。于是就有了RWMutex,读写锁的话,就可以解决这一问题。读写锁,可以设置为读锁,也可以设置为写锁。

  • 写锁:设置为写锁,那么此时就等同于互斥锁,其他goroutine不可读也不可写
  • 读锁:设置为读锁,那么其它goroutine可以读,但是不可以写。
// Lock 将 rw 设置为写锁定状态,禁止其他goroutine读取或写入。
func (rw *RWMutex) Lock()

// Unlock 解除 rw 的写锁定状态,如果 rw 未被写锁定,则该操作会引发 panic。
func (rw *RWMutex) Unlock()

// RLock 将 rw 设置为读锁定状态,禁止其他goroutine写入,但可以读取。
func (rw *RWMutex) RLock()

// Runlock 解除 rw 的读锁定状态,如果 rw 未被读锁顶,则该操作会引发 panic。
func (rw *RWMutex) RUnlock()

先设置为写锁

package main

import (
    "fmt"
    "sync"
    "time"
)

var num = 0

func add(lock *sync.RWMutex) {
    defer lock.Unlock()
    lock.Lock()
    time.Sleep(1e9)
    fmt.Println("add")
}

func sub(lock *sync.RWMutex) {
    defer lock.Unlock()
    lock.Lock()
    fmt.Println("sub")
}

func main() {

    var lock = new(sync.RWMutex)
    go add(lock)
    //确保add先执行
    time.Sleep(1000)
    go sub(lock)
    time.Sleep(time.Second * 2)
    /*
    add
    sub
     */
}

设置为写锁,那么由于是add函数先执行,尽管里面出现了sleep,但是add函数先将锁获取了,所以必须要等add先执行完,才可以执行sub。

设置为读锁

package main

import (
    "fmt"
    "sync"
    "time"
)

var num = 0

func add(lock *sync.RWMutex) {
    defer lock.RUnlock()
    lock.RLock()
    time.Sleep(1e9)
    fmt.Println("add")
}

func sub(lock *sync.RWMutex) {
    defer lock.RUnlock()
    lock.RLock()
    fmt.Println("sub")
}

func main() {

    var lock = new(sync.RWMutex)
    go add(lock)
    //确保add先执行
    time.Sleep(1000)
    go sub(lock)
    time.Sleep(time.Second * 2)
    /*
    sub
    add
     */
}

我们看到设置为读锁,那么sub就不会等待add了,因为大家都是读锁。

sync.Cond

sync.Cond用在goroutine之间,用于协程的挂起和唤醒。这个Cond是需要通过锁才能实现,也就是底层还是使用了锁。调用cond.L.Lock()会进行上锁,但是其它的goroutine同时也是可以获取锁的,因此锁不是唯一的,而一旦调用cond.Wait(),那么程序会阻塞在这里(将当前goroutine加入到等待队列里),比如使用另一个goroutine将其唤醒。唤醒的方式有两种:cond.Signal,唤醒等待队列里面的一个goroutine;cond.Broadcase,唤醒等待队列里面的所有goroutine。

package main

import (
    "fmt"
    "sync"
    "time"
)

func add1(cond *sync.Cond) {
    // 获取锁
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add1已成功获取锁")
    //此时程序会卡在这个地方,直到另一个goroutine唤醒
    cond.Wait()
    fmt.Println("add1醒了")
}

func add2(cond *sync.Cond) {
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add2已成功获取锁")
    cond.Wait()
    fmt.Println("add2醒了")
}

func main() {

    //我们同样需要传递指针
    var cond = new(sync.Cond)
    //Cond是需要搭配锁来执行的
    cond.L = new(sync.Mutex)
    //或者我们在创建cond的时候直接通过 var cond = sync.NewCond(new(sync.Mutex))
    go add1(cond)
    go add2(cond)

    time.Sleep(time.Second)
    //唤醒一个goroutine
    cond.Signal()
    time.Sleep(time.Second)
    /*
    add1已成功获取锁
    add2已成功获取锁
    add1醒了
    */
}
package main

import (
    "fmt"
    "sync"
    "time"
)

func add1(cond *sync.Cond) {
    // 获取锁
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add1已成功获取锁")
    cond.Wait()
    fmt.Println("add1醒了")
}

func add2(cond *sync.Cond) {
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add2已成功获取锁")
    cond.Wait()
    fmt.Println("add2醒了")
}

func main() {

    var cond = new(sync.Cond)
    cond.L = new(sync.Mutex)
    go add1(cond)
    go add2(cond)

    time.Sleep(time.Second)
    cond.Broadcast()
    time.Sleep(time.Second)
    /*
    add1已成功获取锁
    add2已成功获取锁
    add1醒了
    add2醒了
     */
}

sync.WaitGroup

我们看一下上面写的代码,是不是很low呢?因为我们希望主线程等待子协程执行完毕之后再退出,使用的方式是time.Sleep,这是很低级的,当然在介绍语法的时候很方便。但是在项目开发中肯一般不会这么写,而且你也不知道子协程什么时候执行完毕。于是就有了组的概念,sync.WaitGroup是一个结构体,有以下三个方法。

// 计数器增加 delta,delta 可以是负数。
func (wg *WaitGroup) Add(delta int)

// 计数器减少 1
func (wg *WaitGroup) Done()

// 等待直到计数器归零。如果计数器小于 0,则该操作会引发 panic。
func (wg *WaitGroup) Wait()

//所以只要计数器不为0,那么Wait会阻塞
//Add会使计数器增加指定的数值
//Done会使计数器减一

//那么你想到了什么?
//对,假设我们要开20个协程,那么就Add(20)
//每执行一个协程Done()一下
//Wait()不就会等待所有的子协程全部执行完毕吗
package main

import (
    "fmt"
    "sync"
)

func add(wg *sync.WaitGroup, value int) {
    fmt.Printf("satori %d号\n", value)
    wg.Done()
}

func main() {

    //我们同样需要传递指针
    var wg = new(sync.WaitGroup)
    wg.Add(10)
    for i:=0;i<10;i++{
        go add(wg, i)
    }
    wg.Wait()
    /*
    satori 4号
    satori 0号
    satori 1号
    satori 3号
    satori 9号
    satori 6号
    satori 5号
    satori 7号
    satori 8号
    satori 2号
     */
}

sync.Once

sync.Once可以保证一些对象只被实例化一次,或者某个函数只被执行一次。经常用于单例模式、系统初始化等等。再比如channel的Close,对一个通道进行多次Close会引发panic,那么我们通过sync.Once就可以保证channel只会被Close一次。

package main

import (
    "fmt"
    "sync"
)

func foo(){
    fmt.Println(123)
}

func mmp(once *sync.Once){
    once.Do(foo)
}

func main() {
    var once = new(sync.Once)
    for i:=0;i<10;i++{
        once.Do(foo)
        /*
        123
         */
    }
    //once.Do里面函数只会被执行一次。
    //这里的也不会被执行,因为我们传递的是指针,如果传值的话会进行拷贝,那么还是会执行的,因为不是一个sync.Once对象了
    mmp(once)
}

另外我们发现once.Do里面的函数,是一个函数名,而且参数类型也指明了是一个无参无返回值的函数。那如果需要参数呢?很简单,使用闭包即可。

package main

import (
    "fmt"
    "sync"
)

func girl(name string) func() {
    return func() {
        fmt.Println("i'm a girl named", name)
    }
}

func main() {
    var once = new(sync.Once)
    once.Do(girl("mashiro1"))
    once.Do(girl("mashiro2"))
    once.Do(girl("mashiro3"))
    /*
    i'm a girl named mashiro1
     */
}

sync.Pool

从名字也能看出来,sync.Pool指的是临时对象池,为了减少GC的负担,我们对于那些可能会后续使用、但是暂时不用的对象放到池子里,当使用的时候,再从池子里面拿出来。

package main

import (
    "bytes"
    "fmt"
    "sync"
)

func main() {
    //Pool是一个结构体,里面有一个New字段,接收一个无参、返回值为interface{}类型的函数
    var pool = sync.Pool{
        New: func() interface{} {
            return &bytes.Buffer{}
        },
    }

    //如果我们创建Pool的时候,指定了函数,那么池子里面就有东西了,就是函数的返回值
    //可以直接使用Get函数获取,但它是一个interface{},所以我们要转换成相应的类型
    //如果初始胡没有指定,那么获取的结果就是nil
    buf := pool.Get().(*bytes.Buffer)
    buf.WriteString("哈哈")
    pool.Put(buf) //调用put函数,可以将对象放回去
    //然后我们再取出来
    buf = pool.Get().(*bytes.Buffer)
    // 成功打印我们写入的内容
    fmt.Println(buf.String())  //哈哈

    //这个pool不一定非要放相同的对象
    var num  = 123
    pool.Put(num)
    //我方进去一个int也是可以的
    num = pool.Get().(int)
    fmt.Println(num) // 123
}

如果初始化的时候,不指定函数。

package main

import (
    "fmt"
    "sync"
)

func main() {
    //Pool是一个结构体,里面有一个New字段,接收一个无参、返回值为interface{}类型的函数
    var pool = sync.Pool{}
    //不指定的话,是一个nil
    fmt.Println(pool.Get()) // <nil>

    //这个时候可以直接put
    //但是我们也可以指定函数
    pool.New = func() interface{} {
        return 123
    }
    fmt.Println(pool.Get().(int)) // 123
}

sync.Map

golang在1.9的时候,引用了sync.Map,它是原生支持并发安全的map。对于普通的map,我们一般够用了,尽管它并不是线程安全的。但是有时我们需要涉及到线程安全的时候,我们可以使用sync.Map。sync.Map和原生的map语法差别较大,但是很好理解。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map
    //设置key  value
    //参数类型都是interface{}
    m.Store("name", "satori")
    m.Store("age", 17)
    m.Store("gender", "f")

    //Load:查找一个key,如果存在那么返回 值和true,否则返回 nil和false
    if value, ok := m.Load("name"); ok {
        fmt.Println(value) // satori
    }

    //LoadOrStore:查找一个key的同时指定一个value
    //如果查找的key存在,那么返回 对应的值和true; 不存在就将该key和指定的value设置进去,返回 设置的值和false
    fmt.Println(m.Load("where"))  // <nil> false
    //设置成功
    fmt.Println(m.LoadOrStore("where", "japan"))  //japan false

    //再次获取,设置成功
    fmt.Println(m.Load("where"))  // japan true
    fmt.Println(m.LoadOrStore("where", "America"))  //japan true

    //遍历,接收一个函数,参数是两个interface{},返回一个bool
    m.Range(func(key, value interface{}) bool {
        fmt.Println(key, value)
        return true
        /*
        name satori
        age 17
        gender f
        where japan
         */
    })

    //删除一个元素
    fmt.Println(m.Load("gender"))  // f true
    m.Delete("gender")
    fmt.Println(m.Load("gender"))  // <nil> false
}

下面我们来测试一下同时写和删除有什么表现,一个goroutine往map里面写,一个从map里面删除。

package main

import (
    "time"
)

func readMap(m map[int]int){
    for i:=0;i<5;i++{
        time.Sleep(1000)
        delete(m, i)
    }
}

func writeMap(m map[int]int){
    for i:=0;i<6;i++{
        time.Sleep(1000)
        m[i] = 1
    }
}

func main() {
    var m = make(map[int]int)
    go readMap(m)
    go writeMap(m)
    time.Sleep(time.Second)
    /*
    fatal error: concurrent map writes

    goroutine 18 [running]:
    runtime.throw(0x47d496, 0x15)
    ...
    ...
     */
}

我们看到对于普通map直接报错了,我们再来试试sync.Map

package main

import (
    "fmt"
    "sync"
    "time"
)

func readMap(m *sync.Map){
    for i:=0;i<5;i++{
        time.Sleep(1000)
        m.Delete(i)
    }
}

func writeMap(m *sync.Map){
    for i:=0;i<6;i++{
        time.Sleep(1000)
        m.Store(i, 0)
    }
}

func main() {
    var m = new(sync.Map)
    go readMap(m)
    go writeMap(m)
    time.Sleep(time.Second)

    //此时则没有任何问题,因此sync.Map是线程安全的。
    m.Range(func(key, value interface{}) bool {
        fmt.Println(key, value)
        return true
    })
    /*
    2 0
    4 0
    5 0
     */
    //但是打印的结果貌似不正常,因为我们写了6个,但是删了5个,应该剩下一个啊
    //其实goroutine的执行顺序不确定,有可能删的时候还没有这个key呢,但是等到有的时候就进入下一层循环了。
    //所以上面的结果是正常的。
}

原文地址:https://www.cnblogs.com/traditional/p/12221809.html

时间: 2024-08-27 03:55:31

sync:与golang的并发息息相关的包的相关文章

Go语言 sync.Map(在并发中使用)

Go语言中的 map 在并发情况下,只读是线程安全的,同时读写是线程不安全的. 需要并发读写时,一般的做法是加锁,但这样性能并不高,Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包下的特殊结构. sync.Map 有以下特性: 无须初始化,直接声明即可. sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Loa

用Golang自己构造ICMP数据包

ICMP是用来对网络状况进行反馈的协议,可以用来侦测网络状态或检测网路错误. 限于当前Golang在网络编程方面的代码稀缺,资料甚少,所以分享一个用Golang来构造ICMP数据包并发送ping程序的echo消息的实例. RFC792定义的echo数据包结构: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Microsoft Sync Framework 2.1 可再发行程序包 Microsoft Sync Framework 1.0 SP1 - 中文(简体)

Sync Framework 2.1 引入了新功能,这些功能支持您计算机上的 SQL Server 或 SQL Server Compact 数据库与 SQL Azure 数据库进行同步.此发行版还引入了基于参数的筛选.从数据库删除同步作用域和模板的功能,并且增强了性能可加快和简化同步过程. SQL Azure 同步 使用 Sync Framework 2.1,您可以通过综合利用 Windows Azure 平台和 SQL Azure 数据库将您的数据范围扩展到 Web.通过将您企业内部部署的

golang的并发

Golang的并发涉及二个概念: goroutine channel goroutine由关键字go创建. channel由关键字chan定义 channel的理解稍难点, 最简单地, 你把它当成Unix中的双向通道Pipe. 1. channel的定义 2. select阻塞 3. 缓存机制: 使用make()创建. 4. 超时机制: 使用time.After()函数. func main() {var abc chan intselect {case <-abc:fmt.Println(&quo

golang总结-并发

目录 2.7 并发编程 go协程 go管道 2.7 并发编程 go协程 golang 通过一个go关键字就可以开启一个协程. func main() { //两个交错输出 go sayHello() go sayHello2() time.Sleep(time.Second * 3) //阻塞主线程 } func sayHello() { for i := 0; i < 30; i++ { fmt.Println("hello world") } } func sayHello2

golang高并发的理解

前言 GO语言在WEB开发领域中的使用越来越广泛,Hired 发布的<2019 软件工程师状态>报告中指出,具有 Go 经验的候选人是迄今为止最具吸引力的.平均每位求职者会收到9 份面试邀请. 想学习go,最基础的就要理解go是怎么做到高并发的. 那么什么是高并发? 高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计保证系统能够同时并行处理很多请求. 严格意义上说,单核的CPU是没法做到并行的,只有多核的CPU才能做到严格意义上的并行

深入理解java:2.3.1. 并发编程concurrent包 之Atomic原子操作

java中,可能有一些场景,操作非常简单,但是容易存在并发问题,比如i++, 此时,如果依赖锁机制,可能带来性能损耗等问题, 于是,如何更加简单的实现原子性操作,就成为java中需要面对的一个问题. 在backport-util-concurrent没有被引入java1.5并成为JUC之前, 这些原子类和原子操作方法,都是使用synchronized实现的. 不过JUC出现之后,这些原子操作 基于JNI提供了新的实现, 比如AtomicInteger,AtomicLong,AtomicBoole

golang中tcp socket粘包问题和处理

转自:http://www.01happy.com/golang-tcp-socket-adhere/ 在用golang开发人工客服系统的时候碰到了粘包问题,那么什么是粘包呢?例如我们和客户端约定数据交互格式是一个json格式的字符串: {"Id":1,"Name":"golang","Message":"message"} 当客户端发送数据给服务端的时候,如果服务端没有及时接收,客户端又发送了一条数据上来

Java的并发神器concurrent包详解(一)

在JDK 1.5之前,提到并发,java程序员们一般想到的是wait().notify().Synchronized关键字等,但是并发除了要考虑竞态资源.死锁.资源公平性等问题,往往还需要考虑性能问题,在一些业务场景往往还会比较复杂,这些都给java coder们造成不小的难题.JDK 1.5的concurrent包帮我们解决了不少问题. Concurrent包中包含了几个比较常用的并发模块,这个系列,LZ就和大家一起来学习各个模块,Let's Go! 一.线程池的基本用法 一般并发包里有三个常