一起来看CORE源码(一) ConcurrentDictionary

先贴源码地址

https://github.com/dotnet/corefx/blob/master/src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs

.NET CORE很大一个好处就是代码的开源,你可以详细的查看你使用类的源代码,并学习微软的写法和实现思路。

  这里我对.net core中ConcurrentDictionary源码进行了分析,里面采用了Volatile.Read和write(volatile作用:确保本条指令不会因编译器的优化而省略,且要求每次直接从内存地址读值,而不走寄存器),然后也使用了lock这种混合锁,而且还定义了更细颗粒度的锁。所以多线程使用ConcurrentDictionary集合还是比较好的选择。

本来想把本篇放到我的《C#异步编程系列》,不过后来感觉那个系列写的已经算是收尾了,而且以后还会有写更多core源码分析的文字,所以就单独新增一个系列把。

ConcurrentDictionary内部私有类

先上源码,再仔细聊

/// <summary>
/// Tables that hold the internal state of the ConcurrentDictionary
///
/// Wrapping the three tables in a single object allows us to atomically
/// replace all tables at once.
/// </summary>
private sealed class Tables
{
    // A singly-linked list for each bucket.
    // 单链表数据结构的桶,里面的节点就是对应字典值
    internal readonly Node[] _buckets;
    // A set of locks, each guarding a section of the table.
    //锁的数组
    internal readonly object[] _locks;
    // The number of elements guarded by each lock.
    internal volatile int[] _countPerLock; 

    internal Tables(Node[] buckets, object[] locks, int[] countPerLock)
    {
        _buckets = buckets;
        _locks = locks;
        _countPerLock = countPerLock;
    }
}
/// <summary>
/// A node in a singly-linked list representing a particular hash table bucket.
/// 由Dictionary里的Entry改成Node,并且把next放到Node里
/// </summary>
private sealed class Node
{
    internal readonly TKey _key;
    internal TValue _value;
    internal volatile Node _next;
    internal readonly int _hashcode;

    internal Node(TKey key, TValue value, int hashcode, Node next)
    {
        _key = key;
        _value = value;
        _next = next;
        _hashcode = hashcode;
    }
}
private volatile Tables _tables; // Internal tables of the dictionary
private IEqualityComparer<TKey> _comparer; // Key equality comparer
// The maximum number of elements per lock before a resize operation is triggered
// 每个锁对应的元素最大个数,如果超过,要重新进行resize tables
private int _budget;    

  首先,内部类定义为私有且密封,这样就保证了无法从外部进行篡改,而且注意volatile关键字的使用,这确保了我们多线程操作的时候,最终都是去内存中读取对应地址的值和操作对应地址的值。

internal readonly object[] _locks;
internal volatile int[] _countPerLock;

以上两个类是为了高性能及并发锁所建立的对象,实际方法上锁时,使用如下语句

lock (tables._locks[lockNo])
Monitor.Enter(tables._locks[lockNo], ref lockTaken);

  以上两种调用方式是等价的,都会阻塞执行,直到获取到锁(对于Monitor我很多时候会尽可能使用TryEnter,毕竟不阻塞,不过这个类的实现一定要使用阻塞式的,这样程序逻辑才能继续往下走。更多关于Monitor我在 《C#异步编程(四)混合模式线程同步》里面有详细介绍)

这样,实现了颗粒化到每个单独的键值的锁,最大限度的保证了并发。

这里lockNo参数是通过GetBucketAndLockNo方法获取的,方法通过out变量返回值。

/// <summary>
/// Computes the bucket and lock number for a particular key.
///这里获取桶的索引和锁的索引,注意,锁的索引和桶未必是同一个值。
/// </summary>
private static void GetBucketAndLockNo(int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
{
    bucketNo = (hashcode & 0x7fffffff) % bucketCount;
    lockNo = bucketNo % lockCount;
}

上面方法中

hashcode 是通过private IEqualityComparer<TKey> _comparer对象的GetHashCode方法通过key获取到的。

bucketCount是整个table的长度。

lockCount是现有的锁的数组

TryAdd方法

  我们从最简单的TryAdd方法开始介绍,这里ConcurrentDictionary类的封装非常合理,暴露出来的方法,很多是通过统一的内部方法进行执行,比如更新删除等操作等,都有类内部唯一的私有方法进行执行,然后通过向外暴漏各种参数不同的方法,来实现不同行为。

public bool TryAdd(TKey key, TValue value)
{
    if (key == null) ThrowKeyNullException();
    TValue dummy;
    return TryAddInternal(key, _comparer.GetHashCode(key), value, false, true, out dummy);
}

上面TryAddInternal的参数对应如下

/// <summary>
/// Shared internal implementation for inserts and updates.
/// If key exists, we always return false; and if updateIfExists == true we force update with value;
/// If key doesn‘t exist, we always add value and return true;
/// </summary>
private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)

也就说说,updateIfExists为false,存在值的情况下,TryAdd不会更新原有值,而是直接返回false。我的多线程并发写库就是利用了这个特性,这个案例我会在本文最后介绍。现在我们来看TryAddInternal内部,废话不多说,上源码(大部分都注释过了,所以直接阅读即可)

//while包在外面,为了continue,如果发生了_tables私有变量在操作过程被其他线程修改的情况
while (true)
{
    int bucketNo, lockNo;
    //变量复制到方法本地变量  判断tables是否在操作过程中被其他线程修改。
    Tables tables = _tables;
    //提到过的获取桶的索引和锁的索引
    GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
    //是否要扩大tables
    bool resizeDesired = false;
    //是否成功获取锁,成功的话会在final块中进行退出
    bool lockTaken = false;
    try
    {
        if (acquireLock)
            Monitor.Enter(tables._locks[lockNo], ref lockTaken);

        // If the table just got resized, we may not be holding the right lock, and must retry.
        // This should be a rare occurrence.
        if (tables != _tables)
        {
            continue;
        }

        // Try to find this key in the bucket
        Node prev = null;
        //这里如果找到对应地址为空,会直接跳出循环,说明对应的key没有添加锅
        //不为空的时候,会进行返回false(具体是否更新根据updateIfExists)(当然也存在会有相同_hashcode值的情况,所以还要对key进行判定,key不同,继续往后找,直到找到相同key)
        for (Node node = tables._buckets[bucketNo]; node != null; node = node._next)
        {
            Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node);
            //对hashcode和key进行判定,确保找到的就是要更新的
            if (hashcode == node._hashcode && _comparer.Equals(node._key, key))
            {
                // The key was found in the dictionary. If updates are allowed, update the value for that key.
                // We need to create a new node for the update, in order to support TValue types that cannot
                // be written atomically, since lock-free reads may be happening concurrently.
                if (updateIfExists)
                {
                    if (s_isValueWriteAtomic)
                    {
                        node._value = value;
                    }
                    else
                    {
                        Node newNode = new Node(node._key, value, hashcode, node._next);
                        if (prev == null)
                        {
                            Volatile.Write(ref tables._buckets[bucketNo], newNode);
                        }
                        else
                        {
                            prev._next = newNode;
                        }
                    }
                    resultingValue = value;
                }
                else
                {
                    resultingValue = node._value;
                }
                return false;
            }
            prev = node;
        }

        // The key was not found in the bucket. Insert the key-value pair.
        Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo]));
        checked
        {
            tables._countPerLock[lockNo]++;
        }

        //
        // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table.
        // It is also possible that GrowTable will increase the budget but won‘t resize the bucket table.
        // That happens if the bucket table is found to be poorly utilized due to a bad hash function.
        //
        if (tables._countPerLock[lockNo] > _budget)
        {
            resizeDesired = true;
        }
    }
    finally
    {
        if (lockTaken)
            Monitor.Exit(tables._locks[lockNo]);
    }

    //
    // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table.
    //
    // Concurrency notes:
    // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks.
    // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0
    //   and then verify that the table we passed to it as the argument is still the current table.
    //
    if (resizeDesired)
    {
        GrowTable(tables);
    }

    resultingValue = value;
    return true;
}

ContainsKey和TryGetValue

ContainsKey和TryGetValue其实内部最后调用的都是私有TryGetValueInternal,这里ContainsKey调用TryGetValue。

ContainsKey方法

/// <summary>
/// Determines whether the ConcurrentDictionary{TKey, TValue} contains the specified key.
/// </summary>
/// <param name="key">The key to locate in the</param>
/// <returns>true if the ConcurrentDictionary{TKey, TValue} contains an element withthe specified key; otherwise, false.</returns>
public bool ContainsKey(TKey key)
{
    if (key == null) ThrowKeyNullException();
    TValue throwAwayValue;
    return TryGetValue(key, out throwAwayValue);
}

TryGetValue方法

/// <summary>
/// Attempts to get the value associated with the specified key from the ConcurrentDictionary{TKey,TValue}.
/// </summary>
/// <param name="key">The key of the value to get.</param>
/// <param name="value">When this method returns, <paramref name="value"/> contains the object from
/// the ConcurrentDictionary{TKey,TValue} with the specified key or the default value of
/// <returns>true if the key was found in the <see cref="ConcurrentDictionary{TKey,TValue}"/>;
/// otherwise, false.</returns>
public bool TryGetValue(TKey key, out TValue value)
{
    if (key == null) ThrowKeyNullException();
    return TryGetValueInternal(key, _comparer.GetHashCode(key), out value);
}

TryGetValueInternal方法

private bool TryGetValueInternal(TKey key, int hashcode, out TValue value)
{
    //用本地变量保存这个table的快照。
    // We must capture the _buckets field in a local variable. It is set to a new table on each table resize.
Tables tables = _tables;
//获取key对应的桶位置
    int bucketNo = GetBucket(hashcode, tables._buckets.Length);
    // We can get away w/out a lock here.
    // The Volatile.Read ensures that we have a copy of the reference to tables._buckets[bucketNo].
    // This protects us from reading fields (‘_hashcode‘, ‘_key‘, ‘_value‘ and ‘_next‘) of different instances.
Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]);
//如果key相符 ,赋值,不然继续寻找下一个。
    while (n != null)
    {
        if (hashcode == n._hashcode && _comparer.Equals(n._key, key))
        {
            value = n._value;
            return true;
        }
        n = n._next;
    }
    value = default(TValue);//没找到就赋默认值
    return false;
}

TryRemove

TryRemove方法

public bool TryRemove(TKey key, out TValue value)
{
    if (key == null) ThrowKeyNullException();
    return TryRemoveInternal(key, out value, false, default(TValue));
}

这个方法会调用内部私用的TryRemoveInternal

/// <summary>
/// Removes the specified key from the dictionary if it exists and returns its associated value.
/// If matchValue flag is set, the key will be removed only if is associated with a particular
/// value.
/// </summary>
/// <param name="key">The key to search for and remove if it exists.</param>
/// <param name="value">The variable into which the removed value, if found, is stored.</param>
/// <param name="matchValue">Whether removal of the key is conditional on its value.</param>
/// <param name="oldValue">The conditional value to compare against if <paramref name="matchValue"/> is true</param>
/// <returns></returns>
private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
{
    int hashcode = _comparer.GetHashCode(key);
    while (true)
    {
        Tables tables = _tables;
        int bucketNo, lockNo;
        //这里获取桶的索引和锁的索引,注意,锁的索引和桶未必是同一个值,具体算法看源码。
        GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
        //这里锁住的只是对应这个index指向的锁,而不是所有锁。
        lock (tables._locks[lockNo])
        {
            //这里table可能被重新分配,所以这里再次获取,看得到的是不是同一个table
            // If the table just got resized, we may not be holding the right lock, and must retry.
            // This should be a rare occurrence.
            if (tables != _tables)
            {
                continue;
            }

            Node prev = null;
            //这里同一个桶,可能因为连地址,有很多值,所以要对比key
            for (Node curr = tables._buckets[bucketNo]; curr != null; curr = curr._next)
            {
                Debug.Assert((prev == null && curr == tables._buckets[bucketNo]) || prev._next == curr);
                //对比是不是要删除的的那个元素
                if (hashcode == curr._hashcode && _comparer.Equals(curr._key, key))
                {
                    if (matchValue)
                    {
                        bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr._value);
                        if (!valuesMatch)
                        {
                            value = default(TValue);
                            return false;
                        }
                    }
                    //执行删除,判断有没有上一个节点。然后修改节点指针或地址。
                    if (prev == null)
                    {
                        Volatile.Write<Node>(ref tables._buckets[bucketNo], curr._next);
                    }
                    else
                    {
                        prev._next = curr._next;
                    }

                    value = curr._value;
                    tables._countPerLock[lockNo]--;
                    return true;
                }
                prev = curr;
            }
        }
        value = default(TValue);
        return false;
    }
}

我的使用实例

之前做项目时候,有个奇怪的场景,就是打电话的时候回调接口保存通话记录,这里通过CallId来唯一识别每次通话,但是前端程序是通过websocket跟通话服务建立连接(通话服务是另外一个公司做的)。客户是呼叫中心,一般在网页端都是多个页面操作,所以会有多个websocket连接,这时候每次通话,每个页面都会回调接口端,保存相同的通话记录,并发是同一时间的。

我们最早考虑使用消息队列来过滤重复的请求,但是我仔细考虑了下,发现使用ConcurrentDictionary方式的实现更简单,具体实现如下(我精简了下代码):

private  static ConcurrentDictionary<string,string> _strDic=new ConcurrentDictionary<string, string>();
public async Task<BaseResponse> AddUserByAccount(string callId)
{
    if ( _strDic.ContainsKey(callId))
    {
        return BaseResponse.GetBaseResponse(BusinessStatusType.Failed,"键值已存在");
    }
    //成功写入
    if (_strDic.TryAdd(callId,callId))
    {
        var  recordExist =await _userRepository.FirstOrDefaultAsync(c => c.CallId == callId);
        if (recordExist ==null)
        {
            Record record=new Record
            {
                CallId = callId,
                …………
                …………
                IsVerify=1
            };
            _userRepository.Insert(record);
            _userRepository.SaveChanges();
        }
        return BaseResponse.GetBaseResponse(BusinessStatusType.OK);
    }
    //尝试竞争线程,写入失败
    return BaseResponse.GetBaseResponse(BusinessStatusType.Failed,"写入失败");
}

  这里如果进行同时的并发请求,最后请求都可以通过if ( _strDic.ContainsKey(callId))的判定,因为所有线程同时读取,都是未写入状态。但是多个线程会在TryAdd时有竞争,而且ConcurrentDictionary的实现保证了只有一个线程可以成功更新,其他的都返回失败。

原文地址:https://www.cnblogs.com/qixinbo/p/10351876.html

时间: 2024-10-03 23:37:55

一起来看CORE源码(一) ConcurrentDictionary的相关文章

DOTNET CORE源码分析之IOC容器结果获取内容补充

补充一下ServiceProvider的内容 可能上一篇文章DOTNET CORE源码分析之IServiceProvider.ServiceProvider.IServiceProviderEngine.ServiceProviderEngine和ServiceProviderEngineScope 中还没有关联上ServiceProvider和ServiceCollection就直接通过GetService获取了值,这样不科学啊.其实是有关联的,请看一下上篇文章同样存在的一个代码段: inte

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

asp.net core源码飘香:Logging组件(转)

简介: 作为基础组件,日志组件被其他组件和中间件所使用,它提供了一个统一的编程模型,即不需要知道日志最终记录到哪里去,只需要调用它即可. 使用方法很简单,通过依赖注入ILogFactory(CreateLogger方法)或ILogger<T>对象,获取一个ILogger对象,然后通过ILogger的各种扩展方法(都是调用Log方法)记录不同级别的日志. 源码剖析: 总结: 日志组件其实就是工厂模式的应用,但进行了改进,LoggerFactory每次都返回一个Logger对象,而Logger对象

asp.net core源码飘香:Configuration组件(转)

简介: 这是一个基础组件,是一个统一的配置模型,配置可以来源于配置文件(json文件,xml文件,ini文件),内存对象,命令行参数,系统的环境变量又或者是你自己扩展的配置源,该组件将各个配置源的数据按统一的格式(IDictionary<string, string> Data)进行加载,进而对外提供调用接口. 不仅如此,有些配置源(如文件配置源)还可以在配置源的数据发生变化时进行重新加载(IDictionary<string, string> Data),而程序员随时可以判断是否

.net源码分析 - ConcurrentDictionary&lt;TKey, TValue&gt;

List源码分析 Dictionary源码分析 ConcurrentDictionary源码分析 分析: http://www.cnblogs.com/brookshi/p/5583892.html 继上篇Dictionary源码分析,上篇讲过的在这里不会再重复 ConcurrentDictionary源码地址:https://github.com/dotnet/corefx/blob/master/src/System.Collections.Concurrent/src/System/Col

asp.net core源码地址

https://github.com/dotnet/corefx 这个是.net core的 开源项目地址 https://github.com/aspnet 这个下面是asp.net core 框架的地址,里面有很多仓库. https://github.com/aspnet/EntityFrameworkCore  EF Core源码 https://github.com/aspnet/Configuration 配置模块源码 https://github.com/aspnet/Routing

ASP.NET Core 源码阅读笔记(5) ---Microsoft.AspNetCore.Routing路由

这篇随笔讲讲路由功能,主要内容在项目Microsoft.AspNetCore.Routing中,可以在GitHub上找到,Routing项目地址. 路由功能是大家都很熟悉的功能,使用起来也十分简单,从使用的角度来说可讲的东西不多.不过阅读源码的过程的是个学习的过程,看看顶尖Coder怎么组织代码也是在提升自己. 我们知道现在ASP.NET Core中所有用到的功能都是服务,那么Routing服务是什么时候被添加到依赖注入容器的呢?答案是在StartUp类的ConfigureServices方法中

Core源码(三) Lazy&lt;T&gt;

Lazy<T>解决什么问题? 1.大对象加载 考虑下面的需求,有个对象很大,创建耗时,并且要在托管堆上分配一大块空间.我们当然希望,用到它的时候再去创建.也就是延迟加载,等到真正需要它的时候,才去加载. 显然,这里需要加一个中间层,将大对象封装起来,暴露接口,开始并不创建大对象,等到用户真正访问对象的时候,再去创建.另外,这个中间层应该可以封装不同类型的大对象,因此需要类模版.Lazy<T>就是为了解决这个问题. 典型的使用 public Lazy<AccountServic

asp.net core源码飘香:Configuration组件

简介: 这是一个基础组件,是一个统一的配置模型,配置可以来源于配置文件(json文件,xml文件,ini文件),内存对象,命令行参数,系统的环境变量又或者是你自己扩展的配置源,该组件将各个配置源的数据按统一的格式(IDictionary<string, string> Data)进行加载,进而对外提供调用接口. 不仅如此,有些配置源(如文件配置源)还可以在配置源的数据发生变化时进行重新加载(IDictionary<string, string> Data),而程序员随时可以判断是否