Java 高并发四:无锁的实际应用

无 锁 算法 详 解
无 锁 的Vector 实现:
参照着JDK中的 Vector 源码
1、Vector中的 add 方法的实现,它是一个同步方法,所以保证了每一次只能又一个值对数组 elementData 进行操作。
protected Object[] elementData; 通过数据来实现存储
protected int elementCount; 记录对这个Vector的操作数

public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);//这边是做越界判断
elementData[elementCount++] = e;
return true;
}

private void ensureCapacityHelper(int minCapacity) {
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);//如果没有越界
}

private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
capacityIncrement : oldCapacity);
//如果初始化的时候不指定增量capacityIncrement,那么就是将oldCapacity+oldCapacity赋值给新的长度,如果指定增量那么就是oldCapacity+capacityIncrement
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
elementData = Arrays.copyOf(elementData, newCapacity);
//最后将老的元素和新的一起加入到Vector中
}
public static <T> T[] copyOf(T[] original, int newLength) {
return (T[]) copyOf(original, newLength, original.getClass());
}
值得注意的一点就是如果指定增量,那么可以减少空间的浪费。

JDK阅读只是为未无锁的Vector做铺垫

无锁的Vector的实现

/**

  • @author Allen李
  • @date
    /
    public class LockFreeVector<E> extends AbstractList<E>{
    private static final boolean debug = false;
    private static final int FIRST_BUCKET_SIZE = 8;
    //虽然这边篮子的个数是固定的,但是绝对是够用的因为总共的篮子数就是8
    2^30-1
    private static final int N_BUCKET = 30;
    private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;
    //利用二维数组来嵌套是为了使一维的AtomicReferenceArray尽量避免修改,在一维数组填充满了时是不会去扩充的,
    // 而是往二维数组里面填充,这样就避免了修改一维数组,而且高并发就是避免不必要的写来影响性能
    public LockFreeVector(AtomicReferenceArray<AtomicReferenceArray<E>> buckets) {
    this.buckets = buckets;
    }
    static class WriteDescriptor<E>{
    public E oldV;
    public E newV;
    public AtomicReferenceArray<E> addr;
    public int addr_ind;
    public WriteDescriptor( AtomicReferenceArray<E> addr, int addr_ind,E oldV, E newV) {
        this.oldV = oldV;
        this.newV = newV;
        this.addr = addr;
        this.addr_ind = addr_ind;
    }
    
    public void doInt(){
        addr.compareAndSet(addr_ind,oldV,newV);
    }

    }
    static class Descriptor<E>{
    public int size;
    volatile WriteDescriptor<E> writeOp;

    public Descriptor(int size, WriteDescriptor<E> writeOp) {
        this.size = size;
        this.writeOp = writeOp;
    }
    
    public void completeWrite(){
        WriteDescriptor<E> tmpOp = writeOp;
        if(tmpOp != null){
            tmpOp.doInt();
            writeOp=null;
        }
    }

    }

    private AtomicReference<Descriptor<E>> descriptor;
    private static final int zeroNumFirst = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE);

    public LockFreeVector(){
    buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
    buckets.set(0,new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
    descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,null));
    }

    public void push_back(E e){
    Descriptor<E> desc;
    Descriptor<E> newD;

    do {
        desc=descriptor.get();
        desc.completeWrite();
        int pos = desc.size+FIRST_BUCKET_SIZE;
        int zeroNumPos = Integer.numberOfLeadingZeros(pos);
        int bucketInd = zeroNumFirst - zeroNumPos;
        if(buckets.get(bucketInd)==null){
            int newLen = 2 * buckets.get(bucketInd - 1).length();
            if (debug)
                System.out.println("New Length is:"+newLen);
            buckets.compareAndSet(bucketInd,null,new AtomicReferenceArray<E>(newLen));
        }
        //0x80000000就是1000000...  总共32位
        int idx = (0x80000000>>>zeroNumPos) ^ pos;
        newD = new Descriptor<>(desc.size + 1,new WriteDescriptor<E>(buckets.get(bucketInd),idx,null, e));
    }while (!descriptor.compareAndSet(desc,newD));
    descriptor.get().completeWrite();

    }

    public E pop_back(){
    Descriptor<E> desc;
    Descriptor<E> newD;
    E elem;
    do {
    desc = descriptor.get();
    desc.completeWrite();
    int pos = desc.size + FIRST_BUCKET_SIZE - 1;
    int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
    int idx = Integer.highestOneBit(pos) ^ pos;
    elem = buckets.get(bucketInd).get(idx);
    newD = new Descriptor<E>(desc.size - 1,null);
    }while (!descriptor.compareAndSet(desc,newD));

    return elem;

    }

    @Override
    public E get(int index){
    int pos = index + FIRST_BUCKET_SIZE;
    int zeroNumPos = Integer.numberOfLeadingZeros(pos);
    int bucketInd = zeroNumFirst - zeroNumPos;
    int idx = (0x80000000>>>zeroNumPos) ^ pos;
    return buckets.get(bucketInd).get(idx);
    }

    @Override
    public E set(int index,E e){
    int pos = index + FIRST_BUCKET_SIZE;
    int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
    int idx = Integer.highestOneBit(pos) ^ pos;
    AtomicReferenceArray<E> bucket = buckets.get(bucketInd);
    while (true){
    E oldV = bucket.get(idx);
    if (bucket.compareAndSet(idx,oldV,e))
    return oldV;
    }
    }

    public void reserve(int newSize){
    int size = descriptor.get().size;
    int pos = size + FIRST_BUCKET_SIZE - 1;
    int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);

    if (i<1)
        i = 1;
    
    int initialSize = buckets.get(i - 1).length();
    while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)){
        i++;
        initialSize *= FIRST_BUCKET_SIZE;
        buckets.compareAndSet(i, null , new AtomicReferenceArray<E>(initialSize));
    }

    }

    public int size(){
    return descriptor.get().size;
    }

    @Override
    public boolean add(E obj){
    push_back(obj);
    return true;
    }
    }
    它的结构是:private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;
    从这里我们可以看到,它的内部是采用的是 无锁的引用数组, 数组嵌套数组
    变量buckets存放所有的内部元素。从定义上看,它是一个保存着数组的数组,也就是通常的二维数组。特别之处在于这些数组都是使用CAS的原子数组。为什么使用二维数组去实现一个一维的Vector呢?这是为了将来Vector进行动态扩展时可以更加方便。我们知道,AtomicReferenceArray内部使用Object[]来进行实际数据的存储,这使得动态空间增加特别的麻烦,因此使用二维数组的好处就是为将来增加新的元素。

相当于一个二维数组,它的大小可以动态的进行扩展,

为了更有序的读写数组,定义了一个Descriptor的静态内部类。它的作用是使用CAS操作写入新数据。

它定义了

private static final int FIRST_BUCKET_SIZE = 8;

/**

  • number of buckets. 30 will allow 8(2^30-1) elements
    /
    private static final int N_BUCKET = 30;

FIRST_BUCKET_SIZE:为第一个数组的长度

N_BUCKET 整个二维数组最大可扩转至30

每次的扩展是成倍的增加,即:第一个数组长度为8,第二个为8<<1,第三个为8<<2 ......第30个为 8<<29

3. push_back

在第23行,使用descriptor将数据真正地写入数组中。这个descriptor写入的数据由20~21行构造的WriteDescriptor决定。

在循环最开始(第5行),使用descriptor先将数据写入数组,是为了防止上一个线程设置完descriptor后(22行),还没来得及执行第23行的写入,因此,做一次预防性的操作。

第8~10行通过当前Vector的大小(desc.size),计算新的元素应该落入哪个数组。这里使用了位运算进行计算。

LockFreeVector每次都会扩容。它的第一个数组长度为8,第2个就是16,第3个就是32,依次类推。它们的二进制表示如下:

它们之和就是整个LockFreeVector的总大小,因此,如果每一个数组都恰好填满,那么总大小应该类似如下的值(以4个数组为例)00000000 00000000 00000000 01111000:4个数组都恰好填满时的大小。

导致这个数字进位的最小条件,就是加上二进制的1000。而这个数字整好是8(FIRST_BUCKET_SIZE就是8)这就是第8行代码的意义。

它可以使得数组大小发生一次二进制进位(如果不进位说明还在第一个数组中),进位后前导零的数量就会发生变化。而元素所在的数组,和pos(第8行定义的比变量)的前导零直接相关。每进行一次数组扩容,它的前导零就会减1。如果从来没有扩容过,它的前导零就是28个。以后,逐级减1。这就是第9行获得pos前导零的原因。第10行,通过pos的前导零可以立即定位使用哪个数组(也就是得到了bucketInd的值)。

第11行,判断这个数组是否存在。如果不存在,则创建这个数组,大小为前一个数组的两倍,并把它设置到buckets中。

接着再看一下元素没有恰好填满的情况:

那么总大小如下:

总个数加上二进制1000后,得到:

显然,通过前导零可以定位到第4个数组。而剩余位,显然就表示元素在当前数组内偏移量(也就是数组下标)。根据这个理论,就可以通过pos计算这个元素应该放在给定数组的哪个位置。通过第19行代码,获得pos的除了第一位数字1以外的其他位的数值。因此,pos的前导零可以表示元素所在的数组,而pos的后面几位,则表示元素所在这个数组中的位置。由此,第19行代码就取得了元素所在位置idx。

代码理解可以参考:
https://blog.csdn.net/netcobol/article/details/79785651

http://www.shaoqun.com/a/197387.html

原文地址:https://blog.51cto.com/11585002/2455173

时间: 2024-11-13 07:39:46

Java 高并发四:无锁的实际应用的相关文章

Java高并发之无锁与Atomic源码分析

目录 CAS原理 AtomicInteger Unsafe AtomicReference AtomicStampedReference AtomicIntegerArray AtomicIntegerFieldUpdater 无锁的Vector 无锁即无障碍的运行, 所有线程都可以到达临界区, 接近于无等待. 无锁采用CAS(compare and swap)算法来处理线程冲突, 其原理如下 CAS原理 CAS包含3个参数CAS(V,E,N).V表示要更新的变量, E表示预期值, N表示新值.

一文看透Java高并发:Synchronized锁的性质、原理及其缺陷

前置知识 了解Java基本语法 了解多线程基本知识 知识介绍 Synchronized简介:作用.地位.不控制并发的后果 两种用法:对象锁和类锁 多线程访问同步方法的7种情况:是否是static.Synchronized方法等 Synchronized的性质:可重入.不可中断 原理:加解锁原理.可重入原理.可见性原理 Synchronized的缺陷:效率低.不够灵活.无法预判是否成功获取锁 常见问题: 如何选择Lock或Synchronized等 如何提高性能.JVM如何解决那个线程获取锁等 S

【实战Java高并发程序设计 4】数组也能无锁:AtomicIntegerArray

除了提供基本数据类型外,JDK还为我们准备了数组等复合结构.当前可用的原子数组有:AtomicIntegerArray.AtomicLongArray和AtomicReferenceArray,分别表示整数数组.long型数组和普通的对象数组. 这里以AtomicIntegerArray为例,展示原子数组的使用方式. AtomicIntegerArray本质上是对int[]类型的封装.使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性.它提供了以下几个核心API: //获得数组第

构建高性能服务(二)java高并发锁的3种实现

构建高性能服务(二)java高并发锁的3种实现 来源:http://www.xymyeah.com/?p=46 提高系统并发吞吐能力是构建高性能服务的重点和难点.通常review代码时看到synchronized是我都会想一想,这个地方可不可以优化.使用synchronized使得并发的线程变成顺序执行,对系统并发吞吐能力有极大影响,我的博文 http://maoyidao.iteye.com/blog/1149015 介绍了可以从理论上估算系统并发处理能力的方法. 那么对于必须使用synchr

java高并发锁的3种实现

初级技巧 - 乐观锁 乐观锁适合这样的场景:读不会冲突,写会冲突.同时读的频率远大于写. 以下面的代码为例,悲观锁的实现: Java代码   public Object get(Object key) { synchronized(map) { if(map.get(key) == null) { // set some values } return map.get(key); } } 乐观锁的实现: Java代码   public Object get(Object key) { Objec

java高并发的实现 - 并发锁

参考: 线程基础:http://www.cnblogs.com/Wenxu/p/7942757.html java高并发的3种实现:https://blog.csdn.net/hl_java/article/details/70148453 高并发--并发锁:https://blog.csdn.net/chen213wb/article/details/80331616 原文地址:https://www.cnblogs.com/greatai/p/10025789.html

我的《实战java高并发程序设计》纸质书上市了

在过去单核CPU时代,单任务在一个时间点只能执行单一程序,随着多核CPU的发展,并行程序开发就显得尤为重要. <实战Java高并发程序设计>主要介绍基于Java的并行程序设计基础.思路.方法和实战.首先,立足于并发程序基础,详细介绍Java中进行并行程序设计的基本方法.第二,进一步详细介绍JDK中对并行程序的强大支持,帮助读者快速.稳健地进行并行程序开发.第三,详细讨论有关"锁"的优化和提高并行程序性能级别的方法和思路.第四,介绍并行的基本设计模式及Java8对并行程序的支

【实战Java高并发程序设计 3】带有时间戳的对象引用:AtomicStampedReference

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference AtomicReference无法解决上述问题的根本是因为对象在修改过程中,丢失了状态信息.对象值本身与状态被画上了等号.因此,我们只要能够记录对象在修改过程中的状态值,就可以很好的解决对象被反复修改导致线程无法正确判断对象状态的问题. AtomicStampedReference正是这么做的.它内部不仅维护了对象值,还维护了一个时间戳(我这里把它

【实战Java高并发程序设计 5】让普通变量也享受原子操作

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference [实战Java高并发程序设计 3]带有时间戳的对象引用:AtomicStampedReference [实战Java高并发程序设计 4]数组也能无锁:AtomicIntegerArray 有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求.如果改动不大,我们可以简单地修改程序中每一个使用或者读取这个变量的地方.但显然