介绍ReentrantLock之前,先介绍下背景知识,也就是要用到的知识点。这些知识点包括:比较并交换CAS(Compare And Swap )、ReentrantLock的类结构(其父类,内部类等)。
声明:
我主要是通过一种通俗的语言进行内容的总结,帮助大家更好的理解,记忆,更容易去理解书上的讲解。对于一些专业的陈述,大家还是需要去看书。
1、CAS
举例说明。对于如下类
public class Blog { private int count; public int getCount(){ return count; } public synchronized int increCount(){ return ++count; } }
这里有10个线程都要对变量count进行加1的操作。这种情况下我们会对其进行并发控制。也就是加synchronized关键字或者使用ReentrantLock对象的lock方法进行加锁,如increCount()方法。而CAS并不需要使用这些,就可以保证变量a最后结果的正确性。
其实CAS指的是sun.misc.Unsafe这个类中的一些方法的统称。例如,Unsafe这个类中有compareAndSwapInt、compareAndSwapLong等方法。
CAS的过程是:它包含了3个参数CAS(O,V,E,N)。O表示要更新的对象。V表示指明更新的对象中的哪个变量,E是进行比较的值,如果V==E,则将N赋值给V。
这里以JDK中的AtomicInteger类来进行分析。
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; private static final Unsafe unsafe = Unsafe.getUnsafe(); //这里是初始化一个Unsafe对象。因为CAS是这个类中的方法。 private static final long valueOffset; static { try { /*一个java对象可以看成是一段内存,各个字段都得按照一定的顺序放在这段内存里, 同时考虑到对齐要求,可能这些字段不是连续放置的,用这个方法能准确地告诉你某个 字段(也就是下面的value字段)相对于对象的起始内存地址的字节偏移量,因为是相对 偏移量,所以它其实跟某个具体对象又没什么太大关系,跟class的定义和虚拟机的内 存模型的实现细节更相关。通俗一点就是在CAS(O,V,E,N)中,O是你要更新那个对象, V就是我要通过这个偏移量找到这个对象中的value对象,来对他进行操作。*/ valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //volatile,保证变量的可见性。 private volatile int value; //有参构造 public AtomicInteger(int initialValue) { value = initialValue; } public AtomicInteger() { } public final int get() { return value; } public final int getAndIncrement() { //为什么会无限循环,后面的方法中会说明 for (;;) { int current = get(); //得到当前的值 int next = current + 1; if (compareAndSet(current, next)) return current; } } public final boolean compareAndSet(int expect, int update) { /*this指的是该对象,valueOffset就是前面讲的偏移量,找到偏移量对应的值,也就是value变量 对应的值,如果except等于这个value,则将value更改为update。可能这大家会有个问题是,如果我两个 线程同时读,expect的值一样,都等于value,这样两个线程进行判断的时候value都等于except,都将update赋值给value ,结果会造成最后的结果不正确。其实这个方法是native方法,同时也是同步的,一个线程进行操作,另一个线程是需要 等待的,虽然两个线程的expect的值一致,但是当第二个线程再调用这个方法时,value的值已经发生了变化。 此时会返回false,所以在上述的getAndIncrement()方法中会使用for进行无线循环。 */ return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } }
2、ReentrantLock的类结构
ReentrantLock实现了Lock接口,而Lock接口主要提供了以下方法:
void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition();
ReentrantLock类有一个静态内部抽象类Sync,这个Sync继承AbstractQueuedSynchronizer类,AbstractQueuedSynchronizer类继承自AbstractOwnableSynchronizer。
而ReentrantLock类中还有两个静态内部类,分别为NonfairSync(非公平锁)和FairSync(公平锁)。在AbstractQueuedSynchronizer抽象类中有一个静态内部类Node,这个Node就是存放等待线程的节点。
3、分析lock()方法
ReentrantLock类有一个成员变量为private final Sync sync; ReentrantLock的lock()、unLock()等方法,调用的都是sync的一些方法。如下:
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = (fair)? new FairSync() : new NonfairSync(); //构造时指定为公平锁还是非公平锁 } public void lock() { sync.lock(); } public void unlock() { sync.release(1); } }
公平锁与非公平锁的区别在于:如果有10个线程,第1个线程获取了锁,紧接着有8个线程已经处在了等待释放锁。如果是非公平锁的话,第10个线程可以尝试直接获取锁,有可能是第1个线程释放锁了以后第10个直接就获取了。而公平锁就是,第10个线程必须等待前面的线程都获取锁,并释放,才可以获取锁,先进先出的意思。
这里以非公平锁来进行分析。
当调用lock()方法是,也就是执行了sync.lock()方法,如下是代码:
final void lock() { if (compareAndSetState(0, 1)) //首先尝试获取锁。该方法在下面有说明 setExclusiveOwnerThread(Thread.currentThread());//如果获取锁,则将当前线程赋值给exclusiveOwnerThread变量。 else acquire(1);//如果失败,则调用此方法。继续往下看这个方法。 }
// stateOffset = unsafe.objectFieldOffset //(AbstractQueuedSynchronizer.class.getDeclaredField("state")); protected final boolean compareAndSetState(int expect, int update) { //这个stateOffset对应的是Sync的父类中的state字段,这个字段用来标识是否有线程已经获取锁 //如果有的话,state>0,否则state==0;所以如果expect==state==0,那么就获取锁,将 //state置为1 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
public final void acquire(int arg) { //先尝试获取锁,如果获取锁失败了,也就是!tryAcquire(arg)为true则执行 //后面的方法。注意&&,前面为true后面才执行。获取锁失败后,会将该线程加入等待队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);//我们是以非公平锁来讲解的。 }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //状态为0,说明没有线程获取锁。尝试获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果当前线程为exclusiveOwnerThread变量中存储的线程,说明当前线程已经 //获取了锁,这里是当前线程再次要获得锁,所以state要继续+1。 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //获取锁失败,返回false。 return false; }
讲完了tryAcquire(arg)方法,紧接着讲 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这两个方法。
private Node addWaiter(Node mode) { //当前线程节点,可以看看Node类中的一些变量和方法,比较简单。 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //判断有没有尾节点(也就是前面是否有等待线程)。如果有尾节点,则将当前线程的节点插入到队列的尾部 //也就是将当前线程变成尾节点。 if (pred != null) { node.prev = pred; //CAS的操作。 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果没有尾节点,说明前面还未有等待线程。调用下面的方法,创建等待队列 enq(node); return node; }
private Node enq(final Node node) { //enq方法主要是创建头,尾节点,也就是等待队列,下面都会用的CAS同步操作,加上 //for循环,防止两个线程同时创建头尾节点。代码比较容易读懂 for (;;) { Node t = tail; if (t == null) { // Must initialize Node h = new Node(); // Dummy header h.next = node; node.prev = h; if (compareAndSetHead(h)) { tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter方法分析完了,紧接着就是acquireQueued(addWaiter(Node.EXCLUSIVE), arg))中的acquireQueued方法。
//这个方法是不断地获取锁,直到成功的获取锁 final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { //得到当前节点的上一个节点 final Node p = node.predecessor(); //p==head意思是如果上一个节点为头节点,才尝试获取锁,为什么呢? //因为公平锁也会调用这个方法,这里要保证公平,只有当前线程节点在队列的第一个 //才可以获取锁。 if (p == head && tryAcquire(arg)) { //如果成果获得锁,则将当前节点变成头结点。因为获得锁,所以当前节点要移出等待 //队列,变成了头结点。 setHead(node); p.next = null; // help GC return interrupted; } //第一个方法是如果获取失败,是否要对当前线程进行阻塞(后面有分析)。防止线程一直进行for循环。 //&&后面的方法是对当前线程进行阻塞并且判断是否中断。这里注意的是,如果一个线程在 //等待锁期间这个线程被中断了,这里会将interrupted赋为true,但是并不return。这个 //还一直进行for循环,知道这个线程获得了锁,所以lock()方法不能立即响应中断,必须等线程 //获得了锁才可以响应中断。对应的可以立即响应中断的方法为lockInterruptibly()方法 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //这个pre是当前节点(node)的上一个节点。waitStatus有4个状态,这个方法后有说明 int s = pred.waitStatus; //小于0,也就是SIGNAL状态,或者是CONDITION状态 if (s < 0) return true; //大于0,也就是CANCLE状态。 if (s > 0) { do { node.prev = pred = pred.prev;/ } while (pred.waitStatus > 0); pred.next = node; } else //这里为什么要将当前节点的父节点的waitStatus设置成-1呢,这是因为 //在unLock()方法中会根据当前节点的waitStatus来判断后续是否还有节点。 //如果当前的waitStatus==-1,说明有后续节点对其进行了设置。后续节点 //处在阻塞状态。后面会对unLock()方法进行分析。 compareAndSetWaitStatus(pred, 0, Node.SIGNAL); return false; }
static final int CANCELLED = 1;//这个状态说明该节点已经被取消。 static final int SIGNAL = -1;//这个状态说明该节点后续有阻塞的节点 static final int CONDITION = -2;//这个状态跟Condition有关,也就是线程之间通过Condition方法进行通信。我会在接下的博客中写有关Condition的文章
private final boolean parkAndCheckInterrupt() { //阻塞当前线程 LockSupport.park(this); return Thread.interrupted(); }
4、unLock()方法
public void unlock() { sync.release(1); }
public final boolean release(int arg) { //tryRelease(arg)是将state标志位置为0,用来让其他线程获取锁。 if (tryRelease(arg)) { Node h = head; //如果h.waitStatus不等于0,说明后面有线程再等待,前面shouldParkAfterFailedAcquire() //方法有说明。 if (h != null && h.waitStatus != 0) //这个方法后面有分析。主要是将头结点的waitStatus置为0,同时通知下一个节点。 unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { /** * 将头节点的waitStatus状态置为0 */ compareAndSetWaitStatus(node, Node.SIGNAL, 0); /** * 获取下一个节点,如果下一个节点被取消或者为空,则从尾部还是查找,找到第一个可用的线程 * 。 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //通知该节点。 LockSupport.unpark(s.thread); }
5、Lock VS Sychronized(引用别人博文上的)
AbstractQueuedSynchronizer通过构造一个基于阻塞的CLH队列容纳所有的阻塞线程,而对该队列的操作均通过Lock-Free(CAS)操作,但对已经获得锁的线程而言,ReentrantLock实现了偏向锁的功能。
synchronized 的底层也是一个基于CAS操作的等待队列,但JVM实现的更精细,把等待队列分为ContentionList和EntryList,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但synchronized还实现了自旋锁,并针对不同的系统和硬件体系进行了优 化,而Lock则完全依靠系统阻塞挂起等待线程。
当然Lock比synchronized更适合在应用层扩展,可以继承 AbstractQueuedSynchronizer定义各种实现,比如实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock对 应的Condition也比wait/notify要方便的多、灵活的多。
6、总结
以上主要分析了从加锁到解锁的过程,公平锁跟这个类似,读懂这个,其他的方法比如lockInterruptibly(),也很容易明白,如果有不明白可以留言,或者有什么建议,我写的不对的地方,都可以告诉我,我们共同成长。