ReentrantReadWriteLock(读写锁)源码分析
ReentrantReadWriteLock 分为读锁和写锁两个实例:
读锁是共享锁,可被多个线程同时使用,写锁是独占锁。
持有写锁的线程可以继续获取读锁(锁降级),反之不行(持有读锁必须先释放才能再次获取写锁)。
AQS 的精髓在于内部的属性 state:
独占模式,通常就是 0 代表可获取锁,>=1 代表锁被别人获取了。
共享模式下,每个线程都可以对 state 进行加减操作。
独占模式和共享模式对于 state 的操作完全不一样,
读写锁 ReentrantReadWriteLock 中是将 state 这个 32 位的 int 值分为高 16 位和低 16位,
分别用于共享模式和独占模式。
/**
* 读锁
*/
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//分享锁(高16位)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//独占锁(低16位)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
public void lock() {
sync.acquireShared(1);
}
}
}
AbstractQueuedSynchronizer:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) //尝试获取锁
doAcquireShared(arg); //获取锁失败,进入阻塞队列
}
ReentrantReadWriteLock:
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) //非当前线程持有写锁,获取读锁失败(如果是当前线程,可以锁降级)
return -1;
int r = sharedCount(c); //当前持有读锁的次数
//获取读锁
//如果公平模式:判断阻塞队列是否有等待
//如果非公平模式:判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿。
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//处理可重入锁( readerShouldBlock()中阻塞队列有等待而使其获取重入锁失败,则在此方法中获取重入锁 )
return fullTryAcquireShared(current);
}
}
/**
* 写锁
*/
写锁是独占锁:
如果有读锁被占用,写锁获取是要进入到阻塞队列中等待的。
BlockingQueue(不接受 null 值的插入)
public interface BlockingQueue<E> extends Queue<E> {}
ArrayBlockingQueue(数组。有界)
ArrayBlockingQueue 实现并发同步的原理是:根据 Condition 实现。
读操作和写操作都需要获取到 AQS 独占锁才能进行操作。
如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex; //下一次读取操作的位置
int putIndex; //下一次写入操作的位置
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs;
//写元素
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock; //获取锁
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //队列满了,写线程阻塞,等待被唤醒
enqueue(e); //队列不满,写线程被唤醒,写元素,并唤醒读元素线程
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal(); //唤醒读线程
}
}
LinkedBlockingQueue(链表。可以无界,也可以有界)
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head; //对头
private transient Node<E> last; //队尾
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
}
takeLock 和 notEmpty 搭配:
如果要获取(take)一个元素,需要获取 takeLock 锁,
但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
putLock 需要和 notFull 搭配:
如果要插入(put)一个元素,需要获取 putLock 锁,
但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。
//写元素
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //获取写锁
try {
while (count.get() == capacity) {
notFull.await(); //队列满了,写线程阻塞
}
enqueue(node); //被别人唤醒,队列未满,写元素
c = count.getAndIncrement();
if (c + 1 < capacity) //写进一个元素后,如果还有空位,唤醒别的写线程
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
//那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
signalNotEmpty();
}
private void enqueue(Node<E> node) {
last = last.next = node; //写元素(将元素放在链表队尾)
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
SynchronousQueue(读写匹配,不提供储存元素的空间。可以选择公平与非公平)
Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。
不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
transfer 的设计思路:
当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致
(如当前操作是 put 操作,而队列中的元素也都是写线程)。
这种情况下,将当前线程加入到等待队列即可。
如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。
这种情况下,匹配等待队列的队头,出队,返回相应数据。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
static final class TransferQueue<E> extends Transferer<E> { //公平模式
static final class QNode { //单向链表等待队列
volatile QNode next;
volatile Object item;
volatile Thread waiter;
final boolean isData;
}
transient volatile QNode head;
transient volatile QNode tail;
transient volatile QNode cleanMe;
E transfer(E e, boolean timed, long nanos) {}
}
}
PriorityBlockingQueue(数组。无界。二叉堆)
插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。
它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
迭代并遍历时,不能保证有序性:
如果你想要实现有序遍历,建议采用 Arrays.sort(queue.toArray()) 进行处理。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue;
private transient int size;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock = new ReentrantLock(); //并发控制采用的是 ReentrantLock
private final Condition notEmpty = lock.newCondition();
private transient volatile int allocationSpinLock; //数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
private PriorityQueue<E> q; //用于序列化和反序列化的时候用
}
原文地址:https://www.cnblogs.com/loveer/p/11829537.html
时间: 2024-11-02 14:28:56