这两天学习无锁的并发模式,发现相比于传统的 同步加锁相比,有两点好处
1.无锁 模式 相比于 传统的 同步加锁 提高了性能
2.无锁模式 是天然的死锁免疫
下来介绍无锁的Vector--- LockFreeVector
它的结构是:
private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;
从这里我们可以看到,它的内部是采用的是 无锁的引用数组, 数组嵌套数组
相当于一个二维数组,它的大小可以动态的进行扩展,
为了更有序的读写数组,定义了一个Descriptor的静态内部类。它的作用是使用CAS操作写入新数据。
它定义了
private static final int FIRST_BUCKET_SIZE = 8; /** * number of buckets. 30 will allow 8*(2^30-1) elements */private static final int N_BUCKET = 30;
FIRST_BUCKET_SIZE:为第一个数组的长度 N_BUCKET 整个二维数组最大可扩转至30 每次的扩展是成倍的增加,即:第一个数组长度为8,第二个为8<<1,第三个为8<<2 ......第30个为 8<<29
贡献源码:
/* * Copyright (c) 2007 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main.java.org.amino.ds.lockfree; import java.util.AbstractList; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceArray; /** * It is a thread safe and lock-free vector. * This class implement algorithm from:<br> * * Lock-free Dynamically Resizable Arrays <br> * * Damian Dechev, Peter Pirkelbauer, and Bjarne Stroustrup<br> * Texas A&M University College Station, TX 77843-3112<br> * {dechev, peter.pirkelbauer}@tamu.edu, [email protected] * * * @author Zhi Gan * * @param <E> type of element in the vector * */ public class LockFreeVector<E> extends AbstractList<E> { private static final boolean debug = false; /** * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i]) */ private static final int FIRST_BUCKET_SIZE = 8; /** * number of buckets. 30 will allow 8*(2^30-1) elements */ private static final int N_BUCKET = 30; /** * We will have at most N_BUCKET number of buckets. And we have * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1) */ private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets; /** * @author ganzhi * * @param <E> */ static class WriteDescriptor<E> { public E oldV; public E newV; public AtomicReferenceArray<E> addr; public int addr_ind; /** * Creating a new descriptor. * * @param addr Operation address * @param addr_ind Index of address * @param oldV old operand * @param newV new operand */ public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind, E oldV, E newV) { this.addr = addr; this.addr_ind = addr_ind; this.oldV = oldV; this.newV = newV; } /** * set newV. */ public void doIt() { addr.compareAndSet(addr_ind, oldV, newV); } } /** * @author ganzhi * * @param <E> */ static class Descriptor<E> { public int size; volatile WriteDescriptor<E> writeop; /** * Create a new descriptor. * * @param size Size of the vector * @param writeop Executor write operation */ public Descriptor(int size, WriteDescriptor<E> writeop) { this.size = size; this.writeop = writeop; } /** * */ public void completeWrite() { WriteDescriptor<E> tmpOp = writeop; if (tmpOp != null) { tmpOp.doIt(); writeop = null; // this is safe since all write to writeop use // null as r_value. } } } private AtomicReference<Descriptor<E>> descriptor; private static final int zeroNumFirst = Integer .numberOfLeadingZeros(FIRST_BUCKET_SIZE);; /** * Constructor. */ public LockFreeVector() { buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET); buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE)); descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0, null)); } /** * add e at the end of vector. * * @param e * element added */ public void push_back(E e) { Descriptor<E> desc; Descriptor<E> newd; do { desc = descriptor.get(); desc.completeWrite(); //desc.size Vector 本身的大小 //FIRST_BUCKET_SIZE 第一个一位数组的大小 int pos = desc.size + FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); // 取出pos 的前导领 //zeroNumFirst 为FIRST_BUCKET_SIZE 的前导领 int bucketInd = zeroNumFirst - zeroNumPos; //哪个一位数组 //判断这个一维数组是否已经启用 if (buckets.get(bucketInd) == null) { //newLen 一维数组的长度 int newLen = 2 * buckets.get(bucketInd - 1).length(); if (debug) System.out.println("New Length is:" + newLen); buckets.compareAndSet(bucketInd, null, new AtomicReferenceArray<E>(newLen)); } int idx = (0x80000000>>>zeroNumPos) ^ pos; //在这个一位数组中,我在哪个位置 newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>( buckets.get(bucketInd), idx, null, e)); } while (!descriptor.compareAndSet(desc, newd)); descriptor.get().completeWrite(); } /** * Remove the last element in the vector. * * @return element removed */ public E pop_back() { Descriptor<E> desc; Descriptor<E> newd; E elem; do { desc = descriptor.get(); desc.completeWrite(); int pos = desc.size + FIRST_BUCKET_SIZE - 1; int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); int idx = Integer.highestOneBit(pos) ^ pos; elem = buckets.get(bucketInd).get(idx); newd = new Descriptor<E>(desc.size - 1, null); } while (!descriptor.compareAndSet(desc, newd)); return elem; } /** * Get element with the index. * * @param index * index * @return element with the index */ @Override public E get(int index) { int pos = index + FIRST_BUCKET_SIZE; int zeroNumPos = Integer.numberOfLeadingZeros(pos); int bucketInd = zeroNumFirst - zeroNumPos; int idx = (0x80000000>>>zeroNumPos) ^ pos; return buckets.get(bucketInd).get(idx); } /** * Set the element with index to e. * * @param index * index of element to be reset * @param e * element to set */ /** * {@inheritDoc} */ public E set(int index, E e) { int pos = index + FIRST_BUCKET_SIZE; int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); int idx = Integer.highestOneBit(pos) ^ pos; AtomicReferenceArray<E> bucket = buckets.get(bucketInd); while (true) { E oldV = bucket.get(idx); if (bucket.compareAndSet(idx, oldV, e)) return oldV; } } /** * reserve more space. * * @param newSize * new size be reserved */ public void reserve(int newSize) { int size = descriptor.get().size; int pos = size + FIRST_BUCKET_SIZE - 1; int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos); if (i < 1) i = 1; int initialSize = buckets.get(i - 1).length(); while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) { i++; initialSize *= FIRST_BUCKET_SIZE; buckets.compareAndSet(i, null, new AtomicReferenceArray<E>( initialSize)); } } /** * size of vector. * * @return size of vector */ public int size() { return descriptor.get().size; } /** * {@inheritDoc} */ @Override public boolean add(E object) { push_back(object); return true; } }
参考:http://www.shaoqun.com/a/197387.aspx
源码部分,我只着重写了push_back这个方法的注释。
时间: 2024-10-12 10:05:21