无锁模式的Vector

这两天学习无锁的并发模式,发现相比于传统的 同步加锁相比,有两点好处
1.无锁 模式 相比于 传统的 同步加锁  提高了性能

2.无锁模式 是天然的死锁免疫

下来介绍无锁的Vector--- LockFreeVector

它的结构是:

private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;

从这里我们可以看到,它的内部是采用的是 无锁的引用数组, 数组嵌套数组

相当于一个二维数组,它的大小可以动态的进行扩展,

为了更有序的读写数组,定义了一个Descriptor的静态内部类。它的作用是使用CAS操作写入新数据。

它定义了

private static final int FIRST_BUCKET_SIZE = 8;

/** * number of buckets. 30 will allow 8*(2^30-1) elements */private static final int N_BUCKET = 30;
FIRST_BUCKET_SIZE:为第一个数组的长度

N_BUCKET 整个二维数组最大可扩转至30

每次的扩展是成倍的增加,即:第一个数组长度为8,第二个为8<<1,第三个为8<<2 ......第30个为 8<<29

贡献源码:

/*
 * Copyright (c) 2007 IBM Corporation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package main.java.org.amino.ds.lockfree;

import java.util.AbstractList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

/**
 * It is a thread safe and lock-free vector.
 * This class implement algorithm from:<br>
 *
 * Lock-free Dynamically Resizable Arrays <br>
 *
 * Damian Dechev, Peter Pirkelbauer, and Bjarne Stroustrup<br>
 * Texas A&M University College Station, TX 77843-3112<br>
 * {dechev, peter.pirkelbauer}@tamu.edu, [email protected]
 *
 *
 * @author Zhi Gan
 *
 * @param <E> type of element in the vector
 *
 */
public class LockFreeVector<E> extends AbstractList<E> {
	private static final boolean debug = false;
	/**
	 * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i])
	 */
	private static final int FIRST_BUCKET_SIZE = 8;

	/**
	 * number of buckets. 30 will allow 8*(2^30-1) elements
	 */
	private static final int N_BUCKET = 30;

	/**
	 * We will have at most N_BUCKET number of buckets. And we have
	 * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1)
	 */
	private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;

	/**
	 * @author ganzhi
	 *
	 * @param <E>
	 */
	static class WriteDescriptor<E> {
		public E oldV;
		public E newV;
		public AtomicReferenceArray<E> addr;
		public int addr_ind;

		/**
		 * Creating a new descriptor.
		 *
		 * @param addr Operation address
		 * @param addr_ind	Index of address
		 * @param oldV old operand
		 * @param newV new operand
		 */
		public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind,
				E oldV, E newV) {
			this.addr = addr;
			this.addr_ind = addr_ind;
			this.oldV = oldV;
			this.newV = newV;
		}

		/**
		 * set newV.
		 */
		public void doIt() {
			addr.compareAndSet(addr_ind, oldV, newV);
		}
	}

	/**
	 * @author ganzhi
	 *
	 * @param <E>
	 */
	static class Descriptor<E> {
		public int size;
		volatile WriteDescriptor<E> writeop;

		/**
		 * Create a new descriptor.
		 *
		 * @param size Size of the vector
		 * @param writeop Executor write operation
		 */
		public Descriptor(int size, WriteDescriptor<E> writeop) {
			this.size = size;
			this.writeop = writeop;
		}

		/**
		 *
		 */
		public void completeWrite() {
			WriteDescriptor<E> tmpOp = writeop;
			if (tmpOp != null) {
				tmpOp.doIt();
				writeop = null; // this is safe since all write to writeop use
				// null as r_value.
			}
		}
	}

	private AtomicReference<Descriptor<E>> descriptor;
	private static final int zeroNumFirst = Integer
			.numberOfLeadingZeros(FIRST_BUCKET_SIZE);;

	/**
	 * Constructor.
	 */
	public LockFreeVector() {
		buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
		buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
		descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,
				null));
	}

	/**
	 * add e at the end of vector.
	 *
	 * @param e
	 *            element added
	 */
	public void push_back(E e) {
		Descriptor<E> desc;
		Descriptor<E> newd;
		do {
			desc = descriptor.get();
			desc.completeWrite();
			//desc.size   Vector 本身的大小
			//FIRST_BUCKET_SIZE  第一个一位数组的大小
			int pos = desc.size + FIRST_BUCKET_SIZE;
			int zeroNumPos = Integer.numberOfLeadingZeros(pos);  // 取出pos 的前导领
			//zeroNumFirst  为FIRST_BUCKET_SIZE 的前导领
			int bucketInd = zeroNumFirst - zeroNumPos;  //哪个一位数组
			//判断这个一维数组是否已经启用
			if (buckets.get(bucketInd) == null) {
				//newLen  一维数组的长度
				int newLen = 2 * buckets.get(bucketInd - 1).length();
				if (debug)
					System.out.println("New Length is:" + newLen);
				buckets.compareAndSet(bucketInd, null,
						new AtomicReferenceArray<E>(newLen));
			}

			int idx = (0x80000000>>>zeroNumPos) ^ pos;   //在这个一位数组中,我在哪个位置
			newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>(
					buckets.get(bucketInd), idx, null, e));
		} while (!descriptor.compareAndSet(desc, newd));
		descriptor.get().completeWrite();
	}

	/**
	 * Remove the last element in the vector.
	 *
	 * @return element removed
	 */
	public E pop_back() {
		Descriptor<E> desc;
		Descriptor<E> newd;
		E elem;
		do {
			desc = descriptor.get();
			desc.completeWrite();

			int pos = desc.size + FIRST_BUCKET_SIZE - 1;
			int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
					- Integer.numberOfLeadingZeros(pos);
			int idx = Integer.highestOneBit(pos) ^ pos;
			elem = buckets.get(bucketInd).get(idx);
			newd = new Descriptor<E>(desc.size - 1, null);
		} while (!descriptor.compareAndSet(desc, newd));

		return elem;
	}

	/**
	 * Get element with the index.
	 *
	 * @param index
	 *            index
	 * @return element with the index
	 */
	@Override
	public E get(int index) {
		int pos = index + FIRST_BUCKET_SIZE;
		int zeroNumPos = Integer.numberOfLeadingZeros(pos);
		int bucketInd = zeroNumFirst - zeroNumPos;
		int idx = (0x80000000>>>zeroNumPos) ^ pos;
		return buckets.get(bucketInd).get(idx);
	}

	/**
	 * Set the element with index to e.
	 *
	 * @param index
	 *            index of element to be reset
	 * @param e
	 *            element to set
	 */
	/**
	  * {@inheritDoc}
	  */
	public E set(int index, E e) {
		int pos = index + FIRST_BUCKET_SIZE;
		int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
				- Integer.numberOfLeadingZeros(pos);
		int idx = Integer.highestOneBit(pos) ^ pos;
		AtomicReferenceArray<E> bucket = buckets.get(bucketInd);
		while (true) {
			E oldV = bucket.get(idx);
			if (bucket.compareAndSet(idx, oldV, e))
				return oldV;
		}
	}

	/**
	 * reserve more space.
	 *
	 * @param newSize
	 *            new size be reserved
	 */
	public void reserve(int newSize) {
		int size = descriptor.get().size;
		int pos = size + FIRST_BUCKET_SIZE - 1;
		int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
				- Integer.numberOfLeadingZeros(pos);
		if (i < 1)
			i = 1;

		int initialSize = buckets.get(i - 1).length();
		while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
				- Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) {
			i++;
			initialSize *= FIRST_BUCKET_SIZE;
			buckets.compareAndSet(i, null, new AtomicReferenceArray<E>(
					initialSize));
		}
	}

	/**
	 * size of vector.
	 *
	 * @return size of vector
	 */
	public int size() {
		return descriptor.get().size;
	}

	/**
	  * {@inheritDoc}
	  */
	@Override
	public boolean add(E object) {
		push_back(object);
		return true;
	}
}

参考:http://www.shaoqun.com/a/197387.aspx

源码部分,我只着重写了push_back这个方法的注释。

时间: 2024-10-12 10:05:21

无锁模式的Vector的相关文章

生产者消费者模式下的并发无锁环形缓冲区

上一篇记录了几种环形缓冲区的设计方法和环形缓冲区在生产者消费者模式下的使用(并发有锁),这一篇主要看看怎么实现并发无锁. 0.简单的说明 首先对环形缓冲区做下说明: 环形缓冲区使用改进的数组版本,缓冲区容量为2的幂 缓冲区满阻塞生产者,消费者进行消费后,缓冲区又有可用资源,由消费者唤醒生产者 缓冲区空阻塞消费者,生产者进程生产后,缓冲区又有可用资源,由生产者唤醒消费者 然后对涉及到的几个技术做下说明: ⑴CAS,Compare & Set,X86下对应的是CMPXCHG 汇编指令,原子操作,基本

一个无锁消息队列引发的血案:怎样做一个真正的程序员?(二)——月:自旋锁

前续 一个无锁消息队列引发的血案:怎样做一个真正的程序员?(一)——地:起因 一个无锁消息队列引发的血案:怎样做一个真正的程序员?(二)——月:自旋锁 平行时空 在复制好上面那一行我就先停下来了,算是先占了个位置,虽然我知道大概要怎么写,不过感觉还是很乱. 我突然想到,既然那么纠结,那么混乱,那么不知所措,我们不如换个视角.记得高中时看过的为数不多的长篇小说<穆斯林的葬礼>,作者是:霍达(女),故事描写了两个发生在不同时代.有着不同的内容却又交错扭结的爱情悲剧,一个是“玉”的故事,一个是“月”

一个无锁消息队列引发的血案(六)——RingQueue(中) 休眠的艺术 [续]

目录 (一)起因 (二)混合自旋锁 (三)q3.h 与 RingBuffer (四)RingQueue(上) 自旋锁 (五)RingQueue(中) 休眠的艺术 (六)RingQueue(中) 休眠的艺术 [续] 开篇 这是第五篇的后续,这部分的内容同时会更新和添加在 第五篇:RingQueue(中) 休眠的艺术 一文的末尾. 归纳 紧接上一篇的末尾,我们把 Windows 和 Linux 下的休眠策略归纳总结一下,如下图: 我们可以看到,Linux 下的 sched_yield() 虽然包括了

谈谈存储软件的无锁设计

面向磁盘设计的存储软件不需要考虑竞争锁带来的性能影响.磁盘存储软件的性能瓶颈点在于磁盘,磁盘抖动会引入极大的性能损耗.因此,传统存储软件的设计不会特别在意处理器的使用效率.曾经对一个存储虚拟化软件进行性能调优,在锁竞争方面做了大量优化,最后也没有达到性能提升的效果,原因就在于存储虚拟化的性能瓶颈点在于磁盘,而不在于处理器的使用效率.正因为如此,在面向磁盘设计的软件中,很多都采用单线程.单队列处理的方式,一定程度上还可以避免由于并发所引入的磁盘抖动问题. 在面向NVMe SSD设计的存储软件中,这

4.锁--无锁编程以及CAS

无锁编程以及CAS 无锁编程 / lock-free / 非堵塞同步 无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被堵塞的情况下实现变量的同步,所以也叫非堵塞同步(Non-blocking Synchronization). 实现非堵塞同步的方案称为"无锁编程算法"( Non-blocking algorithm). lock-free是眼下最常见的无锁编程的实现级别(一共三种级别). 为什么要 Non-blocking sync ? 使用lock实现线程同步

DIOCP开源项目-高效稳定的服务端解决方案(DIOCP + 无锁队列 + ZeroMQ + QWorkers) 出炉了

[概述] 自从上次发布了[DIOCP开源项目-利用队列+0MQ+多进程逻辑处理,搭建稳定,高效,分布式的服务端]文章后,得到了很多朋友的支持和肯定.这加大了我的开发动力,经过几个晚上的熬夜,终于在昨天晚上,DEMO基本成型,今天再加入了QWorkers来做逻辑处理进程,进一步使得逻辑处理进程更加方便和高效.今天特意写篇blog来记录我的心得与大家分享. [功能实现说明] 沿用上次的草图 目前DEMO图上的功能都已经实现.下面谈谈各部分的实现. 通信服务, 由DIOCP实现,担当与客户端的通信工作

Nah Lock: 一个无锁的内存分配器

概述 我实现了两个完全无锁的内存分配器:_nalloc 和 nalloc.  我用benchmark工具对它们进行了一组综合性测试,并比较了它们的指标值. 与libc(glibc malloc)相比,第一个分配器测试结果很差,但是我从中学到了很多东西,然后我实现了第二个无锁分配器,随着核数增加至30,测试结果线性提高.核数增加至60,测试结果次线性提高,但是仅比tcmalloc好一点. 想要安装,输入命令: git clone ~apodolsk/repo/nalloc,阅读 README文档.

无锁编程与有锁编程的性能对比与分析

最近维护的一个网络服务器遇到性能问题,于是就对原有的程序进行了较大的框架改动.改动最多的是线程工作模式与数据传递方式,最终的结果是改变锁的使用模式.经过一番改进,基本上可以做到 GMb 网卡全速工作处理.在 性能达标之后,一度在想有没有什么办法使用更加轻量级锁,或者去掉锁的使用,为此搜索一些相关的研究成果,并做了一些实验来验证这些成果,因而就有这篇文章.希望有做类似工作的同行可以有所借鉴.如果有人也有相关的经验,欢迎和我交流. 1 无锁编程概述 本节主要对文献 [1] 进行概括,做一些基础知识的

无锁编程以及CAS

无锁编程以及CAS 无锁编程 / lock-free / 非阻塞同步 无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization). 实现非阻塞同步的方案称为"无锁编程算法"( Non-blocking algorithm). lock-free是目前最常见的无锁编程的实现级别(一共三种级别). 为什么要 Non-blocking sync ? 使用lock实现线程同步