.Net中的并行编程-2.ConcurrentStack的实现与分析

在上篇文章《.net中的并行编程-1.基础知识》中列出了在.net进行多核或并行编程中需要的基础知识,今天就来分析在基础知识树中一个比较简单常用的并发数据结构--.net类库中无锁栈的实现。

首先解释一下什么这里“无锁”的相关概念。

所谓无锁其实就是在普通栈的实现方式上使用了原子操作,原子操作的原理就是CPU在系统总线上设置一个信号,当其他线程对同一块内存进行访问时CPU监测到该信号存在会,然后当前线程会等待信号释放后才能对内存进行访问。原子操作都是由操作系统API实现底层由硬件支持,常用的操作有:原子递增,原子递减,比较交换,ConcurrentStack中的实现就是使用了原子操作中的比较交换操作。

使用原子操作的好处:

第一、由于没有使用锁,可以避免死锁。

第二、原子操作不会阻塞线程,例如执行某个指令时当前线程挂起了(或执行了一次上下文切换),其他线程还能继续操作,如果使用lock锁,当前线程挂起后由于没有释放锁,其他线程进行操作时会被阻塞。

第三、由于原子操作直接由硬件指令的支持,所以原子操作性能比普通锁的高。

使用原子操作的坏处:

第一,使用原子操作一般失败时会使用回退技术对当前操作进行重试,所以容易产生活锁和线程饥饿问题,但可以通过随机退让等技术进行缓解,但不能消除。

第二,程序员开发使用难度较大,测试难度较大。

下面开始进入正题:

由于.net 中的 ConcurrentStack的代码较多所以本文就不贴出所有代码,本人也只分析笔者认为重要的几个部分,全部源码可以再去以下微软官方网址查看

 http://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentStack.cs

传统的栈结构都一般都使用单链表实现(.net中的Stack使用的是数组), 入栈操作就是把头节点替换为新节点,出栈操作就是把头结点指向下一个节点。所以当大量线程并发访问时线程的竞争条件都在头结点也就是说如果我们能报保证对于头结点操作时是安全的那么整个栈就是安全的。

入栈操作 public void Push(T item)

        public void Push(T item)
        {
            Node newNode = new Node(item);
            newNode.m_next = m_head;
            if (Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next) == newNode.m_next)
            {
                return;
            }

            // If we failed, go to the slow path and loop around until we succeed.
            PushCore(newNode, newNode);
        }

        private void PushCore(Node head, Node tail)
        {
            SpinWait spin = new SpinWait();

            // Keep trying to CAS the exising head with the new node until we succeed.
            do
            {
                spin.SpinOnce();
                // Reread the head and link our new node.
                tail.m_next = m_head;
            }
            while (Interlocked.CompareExchange(ref m_head, head, tail.m_next) != tail.m_next);

        }

(在原版的注释中我们就能看到入栈使用了原子操作中的比较交换(CAS)操作)

入栈分为了三步:

a.当数据入栈时会分配一个新结点,然后将此刻当前内存中的头结点作为新结点的下一个结点, newNode.m_next = m_head中保持的是当前头结点的快照也就是说另一个线程此时有可能更改了m_head指向的结点,注意头结点(m_head)的字段声明中前面使用了volatile关键字,我们知道volatile关键字有两个作用:第一个是禁止编译器和CPU更改字段的位置,第二个是强制刷新CPU的高速缓存,当读取该声明该关键字的字段时每次都去内存里重新加载数据然后读到CPU的高速缓存而不使用CPU缓存中较老的数据,这个地方m_head使用volatile是因为当运行在其他核心的CPU线程更改了m_head值,而我们当前核心的CPU高速缓存中没有及时更新的问题,还有就是出栈时防止对m_head的操作语句移动到其他语句之后造成逻辑代码没有按照预先的逻辑走,例如newNode.m_next = m_head操作放到了if语句之后造成逻辑错误或者CPU的指令乱序执行时产生的逻辑错误。

b.比较当前的头结点是否与我们保存的newNode.m_next 的快照头结点相同,如果相同则将新结点替换为头结点否则比较失败进行c步骤。

Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next)其实等价于以下代码,只不过该代码的执行是以原子的方式执行。

if (m_head == newNode.m_next)
{
    m_head = newNode.m_next;
}
else
{
    return m_head;
}

c.如果b步骤失败则进入PushCore(Node head, Node tail);PushCore的步骤其实就是重复执行步骤B中的Interlocked.CompareExchange(ref m_head, newNode, newNode.m_next)操作,直到新节点写入为止,在循环期间使用了spin.SpinOnce() ,使用这个API其实就是为了防止活锁,交换失败的线程会进行退让,就好比两个人迎面走来,可能会发生这种情况:你向左走他也向左走,你向右他也向右,所以为了避免俩人碰到一起,那么一个人可以先原地停止,然后继续再走,当继续走时发现方向还是相同时可以改变一下停止的时间,比如说先停止5秒,然后如果还是方向相同则停止10秒,然后还是方向相同可以时可以去喝杯茶慢慢再走,其实这种思想就是随机退让。当然计算机中的SpinOnce实现没有想象的那么简单,在当分析到同步原语SpinWati的实现时我会详细介绍SpinOnce方法的实现方式,这个地方就理解为让线程休息一会(当分析SpinOnce时源码时发现了一个小细节 Thread,Sleep(0)和Thread.Sleep(1)的使用,其实我发现很多人都不清楚这里0和1的区别,这里要说明一下:Thread,Sleep(0)表示交出当前线程的时间片,让具有相同或者更高优先级的线程运行否则继续运行当前线程,也就是说如果没有相同级别或更高级别的线程等待运行那么还是运行当前线程,这个地方会产生线程饥饿问题。Thread,Sleep(1)表示线程睡眠1毫秒和其他线程优先级没有关系,其实这里设置为1时也不是睡眠1毫秒而是13毫秒或者更长,具体的和系统的时钟周期有关)。

出栈操作 public bool TryPop(out T result)

出栈操作也是使用的CAS操作,不断的将头结点指向下一个结点然后返回头结点,如果交换失败则循环进行,循环期间也是使用随机退让技术来较少活锁的概率只不过退让的时间会随着退让的次数而增大。栈操作的API设计为TryPop()返回值为bool,,是因为在多个线程同时出栈的过程中有可能一个线程出栈以后栈就空了,所以在出栈有可能失败。

批量入栈操作 public void PushRange(T[] items, int startIndex, int count)

批量入栈是在单个入栈基础上实现的,将多个项目入栈时先将压入的多个项目组成一个栈,然后再将头结点使用CAS操作指向该生成的栈,所以说批量调用一次入栈的效率要比单个调用多次入栈的效率高。 题外话:这里有个编写代码的小细节,在批量入栈的方法中会首先调用ValidatePushPopRangeInput(items, startIndex, count)方法来校验传入的参数正确性,其实该编码是编写代码时比较重要的原则叫“手术室原则”,其解释为医生进入手术室时,对手套,身体等已经进行了消毒,这些准备工作已经完成了,剩下了工作就是医生专心完成手术。这种编码原则不仅使代码的整洁性提高,而且减少CPU分支预判的次数以提升代码运行速度,所以在日常编码中我们可以使用该方法,将大量的参数判断抽象到单独的方法中。

判断是否为空IsEmpty属性

该操作的实现比较简单,只要判断头结点为空即可。在微软的文档中我们发现当我们判断栈中元素是否为空时应该使用该属性而不是使用Count == 0 这种判断方式,因为Count统计栈内元素的个数时,每使用一次会遍历整个栈,时间复杂度为O(N)而IsEmpty为O(1),所以使用Count==0 效率比较底下尤其是在数据量大的情况下。

IEnumerable<T>接口成员的实现GetEnumerator()

该方法实现是拿到头结点然后依次遍历整个栈,注意该方法拿到了只是当前时刻整个栈的快照,在遍历过程中栈内元素的增加或减少对于GetEnumerator()返回值的数量不会改变。

其他问题

1.在.net编写无锁代码时不用考虑ABA问题,因为这是.net的垃圾回收来保证的,除非使用了对象池技术,例如将内部分配结点的操作由对象池来负责。

2.在.net源码中普通的Stack<T> 内部使用的是数组实现,而ConcurretStack内部使用的是链表,主要原因还是在于Stack 在使用数组扩容时会有拷贝数据的开销,尤其是在数据量大的情况下这种性能损失还是比较大的,还有个原因是内部使用链表可以避免ABA问题(前提是分配内部结点时没有使用对象池),不过链表的实现也不是没有缺点,例如入栈时我们会分配一个新结点,而该结点出栈完以后会由GC回收掉,这种结点这时候就成为了垃圾结点,不过在实现ConcurretQueue的时候因为队列的先进后出的特性使用了另一种解决方案--链表+数组的方式,这种方式既解决了垃圾结点的问题又解决了数组扩容复制数据产生的性能开销问题。

最后,在我们阅读.net源码的过程中其实可以发现很多非常经典的编码技巧和编码风格,让我们看代码时可以由上到下如行云流水般一气呵成可,这也是我比较推崇的代码风格--要像写诗一样写自己代码,让别人像读诗一样读你的代码。

时间不早了就到这了,下片文章中我会继续分析.net中另外一个比较经典的并发数据结构ConcurrentQueue的实现。

由于笔者能力有限,有分析错误的地方难免发生,欢迎大家指正。

时间: 2024-10-10 17:39:33

.Net中的并行编程-2.ConcurrentStack的实现与分析的相关文章

.Net中的并行编程-1.路线图(转)

大神,大神,膜拜膜拜,原文地址:http://www.cnblogs.com/zw369/p/3834559.html 目录 .Net中的并行编程-1.路线图 分析.Net里线程同步机制 .Net中的并行编程-2.ConcurrentStack的实现与分析 .Net中的并行编程-3.ConcurrentQueue实现与分析 .Net中的并行编程-4.实现高性能异步队列 .Net中的并行编程-5.流水线模型实战 .Net中的并行编程-6.常用优化策略 .Net中的并行编程-7.基于Blocking

.Net中的并行编程-3.ConcurrentQueue实现与分析

在上文<.Net中的并行编程-2.ConcurrentQueue的实现与分析> 中解释了无锁的相关概念,无独有偶BCL提供的ConcurrentQueue也是基于原子操作实现, 由于ConcurrentQueue的代码较多所以本文主要分析几个常用操作: 入队(EnQueue) .出队(TryDequeue) .是否为空(IsEmpty).获取队列内元素数量(Count). 一.ConcurrentQueue内部结构: 1.实现原理 众所周知,在普通的非线程安全队列有两种实现方式: 1.使用数组

.Net中的并行编程-4.实现高性能异步队列

上文<.Net中的并行编程-3.ConcurrentQueue实现与分析>分析了ConcurrentQueue的实现,本章就基于ConcurrentQueue实现一个高性能的异步队列,该队列主要用于实时数据流的处理并简化多线程编程模型.设计该队列时考虑以下几点需求(需求来自公司的一个实际项目): 1. 支持多线程入队出队,尽量简化多线程编程的复杂度. 2. 支持事件触发机制,数据入队时才进行处理而不是使用定时处理机制, 而且内部能阻塞消费者线程. 3. 出队时数据处理的顺序要保证和入队时是一致

.Net中的并行编程-6.常用优化策略

            本文是.Net中的并行编程第六篇,今天就介绍一些我在实际项目中的一些常用优化策略.      一.避免线程之间共享数据 避免线程之间共享数据主要是因为锁的问题,无论什么粒度的锁,最好的线程之间同步方式就是不加锁,这个地方主要措施就是找出数据之间的哪个地方需要共享数据和不需要共享数据的地方,再设计上避免多线程之间共享数据. 在以前做过的某项目,开始时设计的方案: 开始设计时所有的数据都放入到了公共队列,然后队列通知多个线程去处理数据,队列采用互斥锁保证线程同步,造成的结果就

.Net中的并行编程-5.流水线模型实战

自己在Excel整理了很多想写的话题,但苦于最近比较忙(其实这是借口).... 上篇文章<.Net中的并行编程-4.实现高性能异步队列>介绍了异步队列的实现,本篇文章介绍我实际工作者遇到了处理多线程问题及基于异步队列底层数据结构的解决方案. 需求如下:1.提供数据服写入务供上层应用调用,数据写入服务处理的吞吐量要达到60w/s每秒,也就是用户每秒发送60w的数据然后通过数据写入服务写到数据库中(数据库为公司自主研发的实时数据库). 2.尽量简化上层应用调用服务的复杂度. 一.分析性能瓶颈: 1

.net中的并行编程系列-1.基础知识

最近半年一直研究用.net进行并行程序的开发与设计,再研究的过程中颇有收获,所以画了一个图总结了一下并行编程的基础知识点,这些知识点是并行编程的基础,有助于我们编程高性能的程序,里面的某些结构实现机制也蕴含着丰富的软件设计思想,在后续的文章中我会对图里面提到某些数据结构或同步机制的源码进行分析. 注:虽然使用的平台是.net ,但大部分知识点和平台以及语言无关,相关数据结构其他相关平台都有实现,包括优化手段都非常相似. .net中的并行编程系列-1.基础知识,布布扣,bubuko.com

NET中的并行编程(TPL)——多线程、异步、任务和并行计算

https://masuit.com/1201 谈一谈.NET中的并行编程(TPL)——多线程.异步.任务和并行计算 懒得勤快 发表于2018-04-26 19:41:00 | 最后修改于2018-06-27 23:44:40 .NET 多线程 异步 高并发 分类:.NET开发技术 | 评论总数:0条 | 热度:2243℃ 我要编辑 写在前面: 在做了几个月的高并发项目的过程中,其实发现自己真的提升了不少,所以也想把这段时间的收获分享给大家,然后写这篇文章发现,写下来是一发不可收拾,所以这篇文章

.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列

三年前写过基于ConcurrentQueue的异步队列,今天在整理代码的时候发现当时另外一种实现方式-使用BlockingCollection实现,这种方式目前依然在实际项目中使用.关于BlockingCollection的基本使用请查阅MSDN.源码实现 下面直接上代码:(代码已经放到了我的github上) using System; using System.Collections.Concurrent; using System.Collections.Generic; using Sys

[翻译]在 .NET Core 中的并发编程

原文地址:http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core 今天我们购买的每台电脑都有一个多核心的 CPU,允许它并行执行多个指令.操作系统通过将进程调度到不同的内核来发挥这个结构的优点.然而,还可以通过异步 I/O 操作和并行处理来帮助我们提高单个应用程序的性能.在.NET Core中,任务 (tasks) 是并发编程的主要抽象表述,但还有其他支撑类可以使我们的工作更容易. 并发编程 - 异步 v