sync.Map源码分析

sync.Map源码分析

背景

众所周知,go普通的map是不支持并发的,换而言之,不是线程(goroutine)安全的。博主是从golang 1.4开始使用的,那时候map的并发读是没有支持,但是并发写会出现脏数据。golang 1.6之后,并发地读写会直接panic:

fatal error: concurrent map read and map write
package main
func main() {
    m := make(map[int]int)
    go func() {
        for {
            _ = m[1]
        }
    }()
    go func() {
        for {
            m[2] = 2
        }
    }()
    select {}
}

所以需要支持对map的并发读写时候,博主使用两种方法:

  1. 第三方类库 concurrent-map
  2. map加上sync.RWMutex来保障线程(goroutine)安全的。

golang 1.9之后,go 在sync包下引入了并发安全的map,也为博主提供了第三种方法。本文重点也在此,为了时效性,本文基于golang 1.10源码进行分析。

sync.Map

结构体

Map

type Map struct {
    mu Mutex    //互斥锁,用于锁定dirty map

    read atomic.Value //优先读map,支持原子操作,注释中有readOnly不是说read是只读,而是它的结构体。read实际上有写的操作

    dirty map[interface{}]*entry // dirty是一个当前最新的map,允许读写

    misses int // 主要记录read读取不到数据加锁读取read map以及dirty map的次数,当misses等于dirty的长度时,会将dirty复制到read
}

readOnly

readOnly 主要用于存储,通过原子操作存储在Map.read中元素。

type readOnly struct {
    m       map[interface{}]*entry
    amended bool // 如果数据在dirty中但没有在read中,该值为true,作为修改标识
}

entry

type entry struct {
    // nil: 表示为被删除,调用Delete()可以将read map中的元素置为nil
    // expunged: 也是表示被删除,但是该键只在read而没有在dirty中,这种情况出现在将read复制到dirty中,即复制的过程会先将nil标记为expunged,然后不将其复制到dirty
    //  其他: 表示存着真正的数据
    p unsafe.Pointer // *interface{}
}

原理

如果你接触过大Java,那你一定对CocurrentHashMap利用锁分段技术增加了锁的数目,从而使争夺同一把锁的线程的数目得到控制的原理记忆深刻。
那么Golang的sync.Map是否也是使用了相同的原理呢?sync.Map的原理很简单,使用了空间换时间策略,通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
通过引入两个map将读写分离到不同的map,其中read map提供并发读和已存元素原子写,而dirty map则负责读写。 这样read map就可以在不加锁的情况下进行并发读取,当read map中没有读取到值时,再加锁进行后续读取,并累加未命中数,当未命中数大于等于dirty map长度,将dirty map上升为read map。从之前的结构体的定义可以发现,虽然引入了两个map,但是底层数据存储的是指针,指向的是同一份值。

开始时sync.Map写入数据

X=1
Y=2
Z=3

dirty map主要接受写请求,read map没有数据,此时read map与dirty map数据如下图。

读取数据的时候从read map中读取,此时read map并没有数据,miss记录从read map读取失败的次数,当misses>=len(dirty map)时,将dirty map直接升级为read map,这里直接对dirty map进行地址拷贝并且dirty map被清空,misses置为0。此时read map与dirty map数据如下图。

现在有需求对Z元素进行修改Z=4,sync.Map会直接修改read map的元素。

新加元素K=5,新加的元素就需要操作dirty map了,如果misses达到阀值后dirty map直接升级为read map并且dirty map为空map(read的amended==false),则dirty map需要从read map复制数据。

升级后的效果如下。

如果需要删除Z,需要分几种情况:
一种read map存在该元素且read的amended==false:直接将read中的元素置为nil。

另一种为元素刚刚写入dirty map且未升级为read map:直接调用golang内置函数delete删除dirty map的元素;

还有一种是read map和dirty map同时存在该元素:将read map中的元素置为nil,因为read map和dirty map 使用的均为元素地址,所以均被置为nil。

优化点

  1. 空间换时间。通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。
  2. 使用只读数据(read),避免读写冲突。
  3. 动态调整,miss次数多了之后,将dirty数据提升为read。
  4. double-checking(双重检测)。
  5. 延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。
  6. 优先从read读取、更新、删除,因为对read的读取不需要锁。

方法源码分析

Load

Load返回存储在映射中的键值,如果没有值,则返回nil。ok结果指示是否在映射中找到值。

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 第一次检测元素是否存在
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        // 为dirty map 加锁
        m.mu.Lock()
        // 第二次检测元素是否存在,主要防止在加锁的过程中,dirty map转换成read map,从而导致读取不到数据
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 从dirty map中获取是为了应对read map中不存在的新元素
            e, ok = m.dirty[key]
            // 不论元素是否存在,均需要记录miss数,以便dirty map升级为read map
            m.missLocked()
        }
        // 解锁
        m.mu.Unlock()
    }
    // 元素不存在直接返回
    if !ok {
        return nil, false
    }
    return e.load()
}

dirty map升级为read map

func (m *Map) missLocked() {
    // misses自增1
    m.misses++
    // 判断dirty map是否可以升级为read map
    if m.misses < len(m.dirty) {
        return
    }
    // dirty map升级为read map
    m.read.Store(readOnly{m: m.dirty})
    // dirty map 清空
    m.dirty = nil
    // misses重置为0
    m.misses = 0
}

元素取值

func (e *entry) load() (value interface{}, ok bool) {
    p := atomic.LoadPointer(&e.p)
    // 元素不存在或者被删除,则直接返回
    if p == nil || p == expunged {
        return nil, false
    }
    return *(*interface{})(p), true
}

read map主要用于读取,每次Load都先从read读取,当read中不存在且amended为true,就从dirty读取数据 。无论dirty map中是否存在该元素,都会执行missLocked函数,该函数将misses+1,当m.misses &lt; len(m.dirty)时,便会将dirty复制到read,此时再将dirty置为nil,misses=0。

storage

设置Key=>Value。

func (m *Map) Store(key, value interface{}) {
    // 如果read存在这个键,并且这个entry没有被标记删除,尝试直接写入,写入成功,则结束
    // 第一次检测
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }
    // dirty map锁
    m.mu.Lock()
    // 第二次检测
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // unexpungelocc确保元素没有被标记为删除
        // 判断元素被标识为删除
        if e.unexpungeLocked() {
            // 这个元素之前被删除了,这意味着有一个非nil的dirty,这个元素不在里面.
            m.dirty[key] = e
        }
        // 更新read map 元素值
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 此时read map没有该元素,但是dirty map有该元素,并需修改dirty map元素值为最新值
        e.storeLocked(&value)
    } else {
        // read.amended==false,说明dirty map为空,需要将read map 复制一份到dirty map
        if !read.amended {
            m.dirtyLocked()
            // 设置read.amended==true,说明dirty map有数据
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        // 设置元素进入dirty map,此时dirty map拥有read map和最新设置的元素
        m.dirty[key] = newEntry(value)
    }
    // 解锁,有人认为锁的范围有点大,假设read map数据很大,那么执行m.dirtyLocked()会耗费花时间较多,完全可以在操作dirty map时才加锁,这样的想法是不对的,因为m.dirtyLocked()中有写入操作
    m.mu.Unlock()
}

尝试存储元素。

func (e *entry) tryStore(i *interface{}) bool {
    // 获取对应Key的元素,判断是否标识为删除
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        // cas尝试写入新元素值
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        // 判断是否标识为删除
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}

unexpungelocc确保元素没有被标记为删除。如果这个元素之前被删除了,它必须在未解锁前被添加到dirty map上。

func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

从read map复制到dirty map。

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        // 如果标记为nil或者expunged,则不复制到dirty map
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

LoadOrStore

如果对应的元素存在,则返回该元素的值,如果不存在,则将元素写入到sync.Map。如果已加载值,则加载结果为true;如果已存储,则为false。

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
    // 不加锁的情况下读取read map
    // 第一次检测
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // 如果元素存在(是否标识为删除由tryLoadOrStore执行处理),尝试获取该元素已存在的值或者将元素写入
        actual, loaded, ok := e.tryLoadOrStore(value)
        if ok {
            return actual, loaded
        }
    }

    m.mu.Lock()
    // 第二次检测
    // 以下逻辑参看Store
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            m.dirty[key] = e
        }
        actual, loaded, _ = e.tryLoadOrStore(value)
    } else if e, ok := m.dirty[key]; ok {
        actual, loaded, _ = e.tryLoadOrStore(value)
        m.missLocked()
    } else {
        if !read.amended {
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
        actual, loaded = value, false
    }
    m.mu.Unlock()

    return actual, loaded
}

如果没有删除元素,tryLoadOrStore将自动加载或存储一个值。如果删除元素,tryLoadOrStore保持条目不变并返回ok= false。

func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
    p := atomic.LoadPointer(&e.p)
    // 元素标识删除,直接返回
    if p == expunged {
        return nil, false, false
    }
    // 存在该元素真实值,则直接返回原来的元素值
    if p != nil {
        return *(*interface{})(p), true, true
    }

    // 如果p为nil(此处的nil,并是不是指元素的值为nil,而是atomic.LoadPointer(&e.p)为nil,元素的nil在unsafe.Pointer是有值的),则更新该元素值
    ic := i
    for {
        if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
            return i, false, true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return nil, false, false
        }
        if p != nil {
            return *(*interface{})(p), true, true
        }
    }
}

Delete

删除元素,采用延迟删除,当read map存在元素时,将元素置为nil,只有在提升dirty的时候才清理删除的数,延迟删除可以避免后续获取删除的元素时候需要加锁。当read map不存在元素时,直接删除dirty map中的元素

func (m *Map) Delete(key interface{}) {
    // 第一次检测
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        // 第二次检测
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 不论dirty map是否存在该元素,都会执行删除
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        // 如果在read中,则将其标记为删除(nil)
        e.delete()
    }
}

元素值置为nil

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}

Range

遍历获取sync.Map中所有的元素,使用的为快照方式,所以不一定是准确的。

func (m *Map) Range(f func(key, value interface{}) bool) {
    // 第一检测
    read, _ := m.read.Load().(readOnly)
    // read.amended=true,说明dirty map包含所有有效的元素(含新加,不含被删除的),使用dirty map
    if read.amended {
        // 第二检测
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if read.amended {
            // 使用dirty map并且升级为read map
            read = readOnly{m: m.dirty}
            m.read.Store(read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }
    // 一贯原则,使用read map作为读
    for k, e := range read.m {
        v, ok := e.load()
        // 被删除的不计入
        if !ok {
            continue
        }
        // 函数返回false,终止
        if !f(k, v) {
            break
        }
    }
}

总结

经过了上面的分析可以得到,sync.Map并不适合同时存在大量读写的场景,大量的写会导致read map读取不到数据从而加锁进行进一步读取,同时dirty map不断升级为read map。 从而导致整体性能较低,特别是针对cache场景.针对append-only以及大量读,少量写场景使用sync.Map则相对比较合适。

sync.Map没有提供获取元素个数的Len()方法,不过可以通过Range()实现。

func Len(sm sync.Map) int {
    lengh := 0
    f := func(key, value interface{}) bool {
        lengh++
        return true
    }
    one:=lengh
    lengh=0
    sm.Range(f)
    if one != lengh {
        one = lengh
        lengh=0
        sm.Range(f)
        if one <lengh {
            return lengh
        }

    }
    return one
}


参考

原文地址:http://blog.51cto.com/qiangmzsx/2126577

时间: 2024-10-13 23:51:44

sync.Map源码分析的相关文章

java.util.Map源码分析

/** * An object that maps keys to values. A map cannot contain duplicate keys; * each key can map to at most one value. * * <p>This interface takes the place of the <tt>Dictionary</tt> class, which * was a totally abstract class rather t

Hadoop源码分析之Map输入

对于MapReduce的输入输出Hadoop的官网如下所示 Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 这里将从源码分析 input <k1,v1>->map 的过程, Mapper 基

Hadoop-2.4.1学习之Map任务源码分析(下)

在Map任务源码分析(上)中,对MAP阶段的代码进行了学习,这篇文章文章将学习Map任务的SORT阶段.如果Reducer的数量不为0,则还需要进行SORT阶段,但从上面的学习中并未发现与MAP阶段执行完毕调用mapPhase.complete()类似的在SORT阶段执行完毕调用sortPhase.complete()的源码,那SORT阶段是在什么时候启动的?对于Map任务来说,有输入就有输出,输入由RecordReader负责,输出则由RecordWriter负责,当Reducer的数量不为0

区块链入门教程以太坊源码分析fast sync算法一

区块链入门教程以太坊源码分析fast sync算法一,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上.this PR aggregates a lot of small modifications to core, trie, eth and other packages to collectively implement the eth/63 fast synch

{区块链教程}以太坊源码分析fast sync算法二

{区块链教程}以太坊源码分析fast sync算法二:上面的表格应该这样解释:如果我们每隔K个区块头验证一次区块头,在N个区块头之后,伪造的概率小于***者产生SHA3冲突的概率.这也意味着,如果确实发现了伪造,那么最后的N个头部应该被丢弃,因为不够安全.可以从上表中选择任何{N,K}对,为了选择一个看起来好看点的数字,我们选择N = 2048,K = 100.后续可能会根据网络带宽/延迟影响以及可能在一些CPU性能比较受限的设备上运行的情况来进行调整. Using this caveat ho

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇

转:Mongodb源码分析之Replication模式

原文出处:http://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_rep mongodb中提供了复制(Replication)机制,通过该机制可以帮助我们很容易实现读写分离方案,并支持灾难恢复(服务器断电)等意外情况下的数据安全. 在老版本(1.6)中,Mongo提供了两种方式的复制:master-slave及replica pair模式(注:mongodb最新支持的replset复制集方式可看成是pair的升级版,

【Zookeeper】源码分析之持久化--FileTxnSnapLog

一.前言 前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类. 二.FileTxnSnapLog源码分析 2.1 类的属性 public class FileTxnSnapLog { //the direcotry containing the //the transaction logs // 日志文件目录 private final File dataDir; //the directory cont

Solr4.8.0源码分析(17)之SolrCloud索引深入(4)

Solr4.8.0源码分析(17)之SolrCloud索引深入(4) 前面几节以add为例已经介绍了solrcloud索引链建索引的三步过程,delete以及deletebyquery跟add过程大同小异,这里暂时就不介绍了.由于commit流程较为特殊,那么本节主要简要介绍下commit的流程. 1. SolrCloud的commit流程 SolrCloud的commit流程同样分为三步,本节主要简单介绍下三步过程. 1.1 LogUpdateProcessor LogUpdateProces