不得不知道的golang之sync.Mutex互斥锁源码分析

针对Golang 1.9的sync.Mutex进行分析,与Golang 1.10基本一样除了将panic改为了throw之外其他的都一样。
源代码位置:sync\mutex.go
可以看到注释如下:

Mutex can be in 2 modes of operations: normal and starvation.
 In normal mode waiters are queued in FIFO order, but a woken up waiter does not own the mutex and competes with new arriving goroutines over the ownership. New arriving goroutines have an advantage -- they are already running on CPU and there can be lots of them, so a woken up waiter has good chances of losing. In such case it is queued at front of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, it switches mutex to the starvation mode.

In starvation mode ownership of the mutex is directly handed off from the unlocking goroutine to the waiter at the front of the queue. New arriving goroutines don‘t try to acquire the mutex even if it appears  to be unlocked, and don‘t try to spin. Instead they queue themselves at  the tail of the wait queue.

If a waiter receives ownership of the mutex and sees that either (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, it switches mutex back to normal operation mode.

 Normal mode has considerably better performance as a goroutine can acquire a mutex several times in a row even if there are blocked waiters.
Starvation mode is important to prevent pathological cases of tail latency.

博主英文很烂,就粗略翻译一下,仅供参考:

互斥量可分为两种操作模式:正常和饥饿。
在正常模式下,等待的goroutines按照FIFO(先进先出)顺序排队,但是goroutine被唤醒之后并不能立即得到mutex锁,它需要与新到达的goroutine争夺mutex锁。
因为新到达的goroutine已经在CPU上运行了,所以被唤醒的goroutine很大概率是争夺mutex锁是失败的。出现这样的情况时候,被唤醒的goroutine需要排队在队列的前面。
如果被唤醒的goroutine有超过1ms没有获取到mutex锁,那么它就会变为饥饿模式。
在饥饿模式中,mutex锁直接从解锁的goroutine交给队列前面的goroutine。新达到的goroutine也不会去争夺mutex锁(即使没有锁,也不能去自旋),而是到等待队列尾部排队。
在饥饿模式下,有一个goroutine获取到mutex锁了,如果它满足下条件中的任意一个,mutex将会切换回去正常模式:
1. 是等待队列中的最后一个goroutine
2. 它的等待时间不超过1ms。
正常模式有更好的性能,因为goroutine可以连续多次获得mutex锁;
饥饿模式对于预防队列尾部goroutine一致无法获取mutex锁的问题。

看了这段解释,那么基本的业务逻辑也就了解了,可以整理一下衣装,准备看代码。

打开mutex.go看到如下代码:

type Mutex struct {
    state int32    // 将一个32位整数拆分为 当前阻塞的goroutine数(29位)|饥饿状态(1位)|唤醒状态(1位)|锁状态(1位) 的形式,来简化字段设计
    sema  uint32   // 信号量
}

const (
    mutexLocked = 1 << iota      // 1 0001 含义:用最后一位表示当前对象锁的状态,0-未锁住 1-已锁住
    mutexWoken                   // 2 0010 含义:用倒数第二位表示当前对象是否被唤醒 0-唤醒 1-未唤醒
    mutexStarving                // 4 0100 含义:用倒数第三位表示当前对象是否为饥饿模式,0为正常模式,1为饥饿模式。
    mutexWaiterShift = iota      // 3,从倒数第四位往前的bit位表示在排队等待的goroutine数
    starvationThresholdNs = 1e6  // 1ms
)

可以看到Mutex中含有:

  • 一个非负数信号量sema;
  • state表示Mutex的状态。

常量:

  • mutexLocked表示锁是否可用(0可用,1被别的goroutine占用)
  • mutexWoken=2表示mutex是否被唤醒
  • mutexWaiterShift=4表示统计阻塞在该mutex上的goroutine数目需要移位的数值。

将3个常量映射到state上就是

state:   |32|31|...| |3|2|1|
         \__________/ | | |
              |       | | |
              |       | |  mutex的占用状态(1被占用,0可用)
              |       | |
              |       |  mutex的当前goroutine是否被唤醒
              |       |
              |       饥饿位,0正常,1饥饿
              |
               等待唤醒以尝试锁定的goroutine的计数,0表示没有等待者

如果同学们熟悉Java的锁,就会发现与AQS的设计是类似,只是没有AQS设计的那么精致,不得不感叹,JAVA的牛逼。
有同学是否会有疑问为什么使用的是int32而不是int64呢,因为32位原子性操作更好,当然也满足的需求。

Mutex在1.9版本中就两个函数Lock()Unlock()
下面我们先来分析最难的Lock()函数:

func (m *Mutex) Lock() {
    // 如果m.state=0,说明当前的对象还没有被锁住,进行原子性赋值操作设置为mutexLocked状态,CompareAnSwapInt32返回true
    // 否则说明对象已被其他goroutine锁住,不会进行原子赋值操作设置,CopareAndSwapInt32返回false
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    // 开始等待时间戳
    var waitStartTime int64
    // 饥饿模式标识
    starving := false
    // 唤醒标识
    awoke := false
    // 自旋次数
    iter := 0
    // 保存当前对象锁状态
    old := m.state
    // 看到这个for {}说明使用了cas算法
    for {
        // 相当于xxxx...x0xx & 0101 = 01,当前对象锁被使用
        if old&(mutexLocked|mutexStarving) == mutexLocked &&
            // 判断当前goroutine是否可以进入自旋锁
            runtime_canSpin(iter) {

            // 主动旋转是有意义的。试着设置mutexwake标志,告知解锁,不要唤醒其他阻塞的goroutines。
            if !awoke &&
            // 再次确定是否被唤醒: xxxx...xx0x & 0010 = 0
            old&mutexWoken == 0 &&
            // 查看是否有goroution在排队
            old>>mutexWaiterShift != 0 &&
                // 将对象锁改为唤醒状态:xxxx...xx0x | 0010 = xxxx...xx1x
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }//END_IF_Lock

            // 进入自旋锁后当前goroutine并不挂起,仍然在占用cpu资源,所以重试一定次数后,不会再进入自旋锁逻辑
            runtime_doSpin()
            // 自加,表示自旋次数
            iter++
            // 保存mutex对象即将被设置成的状态
            old = m.state
            continue
        }// END_IF_spin

        // 以下代码是不使用**自旋**的情况
        new := old

        // 不要试图获得饥饿的互斥,新来的goroutines必须排队。
        // 对象锁饥饿位被改变,说明处于饥饿模式
        // xxxx...x0xx & 0100 = 0xxxx...x0xx
        if old&mutexStarving == 0 {
            // xxxx...x0xx | 0001 = xxxx...x0x1,标识对象锁被锁住
            new |= mutexLocked
        }
        // xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;当前mutex处于饥饿模式并且锁已被占用,新加入进来的goroutine放到队列后面
        if old&(mutexLocked|mutexStarving) != 0 {
            // 更新阻塞goroutine的数量,表示mutex的等待goroutine数目加1
            new += 1 << mutexWaiterShift
        }

        // 当前的goroutine将互斥锁转换为饥饿模式。但是,如果互斥锁当前没有解锁,就不要打开开关,设置mutex状态为饥饿模式。Unlock预期有饥饿的goroutine
        if starving &&
            // xxxx...xxx1 & 0001 != 0;锁已经被占用
            old&mutexLocked != 0 {
            // xxxx...xxx | 0101 =>   xxxx...x1x1,标识对象锁被锁住
            new |= mutexStarving
        }

        // goroutine已经被唤醒,因此需要在两种情况下重设标志
        if awoke {
            // xxxx...xx1x & 0010 = 0,如果唤醒标志为与awoke不相协调就panic
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            // new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x  :设置唤醒状态位0,被唤醒
            new &^= mutexWoken
        }
        // 获取锁成功
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // xxxx...x0x0 & 0101 = 0,已经获取对象锁
            if old&(mutexLocked|mutexStarving) == 0 {
                // 结束cas
                break
            }
            // 以下的操作都是为了判断是否从饥饿模式中恢复为正常模式
            // 判断处于FIFO还是LIFO模式
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // xxxx...x1xx & 0100 != 0
            if old&mutexStarving != 0 {
                // xxxx...xx11 & 0011 != 0
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    panic("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            // 保存mutex对象状态
            old = m.state
        }
    }// cas结束

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

看了Lock()函数之后是不是觉得一片懵逼状态,告诉大家一个方法,看Lock()函数时候需要想着如何Unlock。下面就开始看看Unlock()函数。

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // state-1标识解锁
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 验证锁状态是否符合
    if (new+mutexLocked)&mutexLocked == 0 {
        panic("sync: unlock of unlocked mutex")
    }
    // xxxx...x0xx & 0100 = 0 ;判断是否处于正常模式
    if new&mutexStarving == 0 {
        old := new
        for {
            // 如果没有等待的goroutine或goroutine已经解锁完成
            if old>>mutexWaiterShift == 0 ||
            // xxxx...x0xx & (0001 | 0010 | 0100) => xxxx...x0xx & 0111 != 0
            old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式:将mutex所有权移交给下一个等待的goroutine
        // 注意:mutexlock没有设置,goroutine会在唤醒后设置。
        // 但是互斥锁仍然被认为是锁定的,如果互斥对象被设置,所以新来的goroutines不会得到它
        runtime_Semrelease(&m.sema, true)
    }
}

在网上还会有一些基于go1.6的分析,但是与go 1.9的差距有点大。
上面的分析,因个人水平有限,难免存在错误,请各位老师同学多多指点,不喜勿喷。

附录

https://github.com/golang/go/blob/dev.boringcrypto.go1.9/src/sync/mutex.go
https://segmentfault.com/a/1190000000506960

原文地址:http://blog.51cto.com/qiangmzsx/2134786

时间: 2024-08-25 08:39:05

不得不知道的golang之sync.Mutex互斥锁源码分析的相关文章

Mutex互斥锁的使用方法

同一时刻,只能有一个线程持有该锁! 使用Mutex互斥锁来同步两个单独的程序(可以两次运行该程序,查看运行结果) static void Main(string[] args) { const string MutexName = "CSharpThreadingCookbook"; using (var m = new Mutex(false, MutexName)) { //WaitOne构造表示指定的时间内(这里是5秒)是否可以获得互斥锁定 if (!m.WaitOne(Time

sync.Map源码分析

sync.Map源码分析 背景 众所周知,go普通的map是不支持并发的,换而言之,不是线程(goroutine)安全的.博主是从golang 1.4开始使用的,那时候map的并发读是没有支持,但是并发写会出现脏数据.golang 1.6之后,并发地读写会直接panic: fatal error: concurrent map read and map write package main func main() { m := make(map[int]int) go func() { for

区块链入门教程以太坊源码分析fast sync算法一

区块链入门教程以太坊源码分析fast sync算法一,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上.this PR aggregates a lot of small modifications to core, trie, eth and other packages to collectively implement the eth/63 fast synch

{区块链教程}以太坊源码分析fast sync算法二

{区块链教程}以太坊源码分析fast sync算法二:上面的表格应该这样解释:如果我们每隔K个区块头验证一次区块头,在N个区块头之后,伪造的概率小于***者产生SHA3冲突的概率.这也意味着,如果确实发现了伪造,那么最后的N个头部应该被丢弃,因为不够安全.可以从上表中选择任何{N,K}对,为了选择一个看起来好看点的数字,我们选择N = 2048,K = 100.后续可能会根据网络带宽/延迟影响以及可能在一些CPU性能比较受限的设备上运行的情况来进行调整. Using this caveat ho

golang 中 sync.Mutex 和 sync.RWMutex

介绍 golang 中的 sync 包实现了两种锁: Mutex:互斥锁 RWMutex:读写锁,RWMutex 基于 Mutex 实现 Mutex(互斥锁) Mutex 为互斥锁,Lock() 加锁,Unlock() 解锁 在一个 goroutine 获得 Mutex 后,其他 goroutine 只能等到这个 goroutine 释放该 Mutex 使用 Lock() 加锁后,不能再继续对其加锁,直到利用 Unlock() 解锁后才能再加锁 在 Lock() 之前使用 Unlock() 会导

golang 原子计数,互斥锁,耗时

import "sync" import "sync/atomic" import "time" import "runtime" 1.runtime.Gosched()表示让CPU把时间片让给别人,下次某个时候继续恢复执行该goroutine,自己一般是阻塞了,这是一个很高级的sleep,我们经常会遇到要sleep多久的问题,这里不用考虑了,别人完成后,自然会通知你. 2.var mutex sync.Mutex 定义一个互

Golang Cond源码分析

cond的主要作用就是获取锁之后,wait()方法会等待一个通知,来进行下一步锁释放等操作,以此控制锁合适释放,释放频率,适用于在并发环境下goroutine的等待和通知. 针对Golang 1.9的sync.Cond,与Golang 1.10一样. 源代码位置:sync\cond.go. 结构体 type Cond struct { noCopy noCopy // noCopy可以嵌入到结构中,在第一次使用后不可复制,使用go vet作为检测使用 // 根据需求初始化不同的锁,如*Mutex

linux 2.6 互斥锁的实现-源码分析

http://blog.csdn.net/tq02h2a/article/details/4317211 看了看linux 2.6 kernel的源码,下面结合代码来分析一下在X86体系结构下,互斥锁的实现原理. 代码分析 1. 首先介绍一下互斥锁所使用的数据结构:struct mutex { 引用计数器 1: 所可以利用.  小于等于0:该锁已被获取,需要等待 atomic_t  count;  自旋锁类型,保证多cpu下,对等待队列访问是安全的. spinlock_t  wait_lock;

golang中container/heap包源码分析

学习golang难免需要分析源码包中一些实现,下面就来说说container/heap包的源码 heap的实现使用到了小根堆,下面先对堆做个简单说明 1. 堆概念 堆是一种经过排序的完全二叉树,其中任一非终端节点的数据值均不大于(或不小于)其左孩子和右孩子节点的值. 最大堆和最小堆是二叉堆的两种形式. 最大堆:根结点的键值是所有堆结点键值中最大者. 最小堆:根结点的键值是所有堆结点键值中最小者. 2. heap 树的最小元素在根部,为index 0. heap包对任意实现了heap接口的类型提供