在聊聊高并发(六)实现几种自旋锁(一) 这篇中实现了两种基本的自旋锁:TASLock和TTASLock,它们的问题是会进行频繁的CAS操作,引发大量的缓存一致性流量,导致锁的性能不好。
对TTASLock的一种改进是BackoffLock,它会在锁高争用的情况下对线程进行回退,减少竞争,减少缓存一致性流量。但是BackoffLock有三个主要的问题:
1. 还是有大量的缓存一致性流量,因为所有线程在同一个共享变量上旋转,每一次成功的获取锁都会产生缓存一致性流量
2. 因为回退的存在,不能及时获取锁释放的信息,存在一个时间差,导致获取锁的时间变长
3. 不能保证无饥饿,有的线程可能一直无法获取锁
这篇会实现2种基于队列的锁,来解决上面提到的三个问题。主要的思路是将线程组织成一个队列,有4个优点:
1. 每个线程只需要检查它的前驱线程的状态,将自旋的变量从一个分散到多个,减少缓存一致性流量
2. 可以即使获取锁释放的通知
3. 队列提供了先来先服务的公平性
4. 无饥饿,队列中的每个线程都能保证被执行到
队列锁分为两类,一类是基于有界队列,一类是基于无界队列。
先看一下基于有界队列的队列锁。 ArrayLock有3个特点:
1. 基于一个volatile数组来组织线程
2. 通过一个原子变量tail来表示对尾线程
3. 通过一个ThreadLocal变量给每个线程一个索引号,表示它位于队列的哪个位置。
package com.test.lock; import java.util.concurrent.atomic.AtomicInteger; /** * 有界队列锁,使用一个volatile数组来组织线程 * 缺点是得预先知道线程的规模n,所有线程获取同一个锁的次数不能超过n * 假设L把锁,那么锁的空间复杂度为O(Ln) * **/ public class ArrayLock implements Lock{ // 使用volatile数组来存放锁标志, flags[i] = true表示可以获得锁 private volatile boolean[] flags; // 指向新加入的节点的后一个位置 private AtomicInteger tail; // 总容量 private final int capacity; private ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>(){ protected Integer initialValue() { return 0; } }; public ArrayLock(int capacity){ this.capacity = capacity; flags = new boolean[capacity]; tail = new AtomicInteger(0); // 默认第一个位置可获得锁 flags[0] = true; } @Override public void lock() { int slot = tail.getAndIncrement() % capacity; mySlotIndex.set(slot); // flags[slot] == true 表示获得了锁, volatile变量保证锁释放及时通知 while(!flags[slot]){ } } @Override public void unlock() { int slot = mySlotIndex.get(); flags[slot] = false; flags[(slot + 1) % capacity] = true; } <pre name="code" class="java"> public String toString(){ return "ArrayLock"; }
}
我们可以看到有界队列锁的缺点是:
1. 它必须知道线程的规模数,对于同一把锁如果线程获取的次数超过了n会出现线程状态被覆盖的问题
2. 空间复杂度是O(Ln)
3. 对于共享的volatile数组来保存线程获取锁的状态,仍然可能存在缓存一致性。我们知道CPU读取一次内存时,会读满数据总线的位长,比如64位总线,一次读取64位长度的数据。那么对于boolean类型的数组,boolean长度是1个字节,那么一次读取能读到8个boolean变量,而高速缓存的一个缓存块的长度也是64位,也就是说一个缓存块上可以保存8个boolean变量,所以如果一次CAS操作修改了一个变量导致一个缓存块无效,它实际上可能导致8个变量失效。
解决办法是把变量以8个长度为单位分散,比如flag[0] = thread1 flag[8] = thread2。这样的问题是消耗的空间更大。
无界队列锁可以克服有界队列锁的几个问题。
1. 它使用链表来代替数组,实现无界队列
2. 使用两个ThreadLocal变量表示指针,一个指向自己的节点,一个指向前一个节点
3. 使用一个原子引用变量指向队尾
4. 空间复杂度降低,如果有L把锁,n个线程,每个线程只获取一把锁,那么空间复杂度为O(L + n)
5. 对同一个锁,一个线程可以多次获取而不增加空间复杂度
6. 当线程结束后,GC会自动回收内存
package com.test.lock; import java.util.concurrent.atomic.AtomicReference; /** * 无界队列锁,使用一个链表来组织线程 * 假设L把锁,n个线程,那么锁的空间复杂度为O(L+n) * **/ public class CLHLock implements Lock{ // 原子变量指向队尾 private AtomicReference<QNode> tail; // 两个指针,一个指向自己的Node,一个指向前一个Node ThreadLocal<QNode> myNode; ThreadLocal<QNode> myPreNode; public CLHLock(){ tail = new AtomicReference<QNode>(new QNode()); myNode = new ThreadLocal<QNode>(){ protected QNode initialValue(){ return new QNode(); } }; myPreNode = new ThreadLocal<QNode>(){ protected QNode initialValue(){ return null; } }; } @Override public void lock() { QNode node = myNode.get(); node.lock = true; // CAS原子操作,保证原子性 QNode preNode = tail.getAndSet(node); myPreNode.set(preNode); // volatile变量,能保证锁释放及时通知 // 只对前一个节点的状态自旋,减少缓存一致性流量 while(preNode.lock){ } } @Override public void unlock() { QNode node = myNode.get(); node.lock = false; // 把myNode指向preNode,目的是保证同一个线程下次还能使用这个锁,因为myNode原来指向的节点有它的后一个节点的preNode引用 // 防止这个线程下次lock时myNode.get获得原来的节点 myNode.set(myPreNode.get()); } public static class QNode { volatile boolean lock; } public String toString(){ return "CLHLock"; } }
下面我们从正确性和平均获取锁的时间上来测试这两种锁。
我们设计一个测试用例来验证正确性: 使用50个线程对一个volatile变量++操作,由于volatile变量++操作不是原子的,在不加锁的情况下,可能同时有多个线程同时对voaltile变量++, 最终的结果是无法预测的。然后使用这两种锁,先获取锁再volatile变量++,由于volatile变量会防止重排序,并能保证可见性,我们可以确定如果锁是正确获取的,也就是说同一时刻只有一个线程对volatile变量++,那么结果肯定是顺序的1到50。
先看不加锁的情况
package com.test.lock; public class Main { //private static Lock lock = new ArrayLock(150); private static Lock lock = new CLHLock(); //private static TimeCost timeCost = new TimeCost(new TTASLock()); private static volatile int value = 0; public static void method(){ //lock.lock(); System.out.println("Value: " + ++value); //lock.unlock(); } public static void main(String[] args) { for(int i = 0; i < 50; i ++){ Thread t = new Thread(new Runnable(){ @Override public void run() { method(); } }); t.start(); } } }
运行结果: 我们可以看到确实是发生的线程同时对volatile变量++的操作,结果是无法预料的
Value: 1 Value: 1 Value: 2 Value: 3 Value: 4 Value: 5 Value: 6 Value: 7 Value: 8 Value: 9 Value: 10 Value: 11 Value: 13 Value: 12 Value: 14 Value: 15 Value: 16 Value: 17 Value: 18 Value: 19 Value: 20 Value: 21 Value: 22 Value: 23 Value: 24 Value: 25 Value: 26 Value: 27 Value: 28 Value: 29 Value: 30 Value: 31 Value: 32 Value: 33 Value: 34 Value: 35 Value: 36 Value: 37 Value: 38 Value: 37 Value: 39 Value: 40 Value: 41 Value: 42 Value: 43 Value: 44 Value: 45 Value: 46 Value: 47 Value: 48 Value: 50
使用有界队列锁:
package com.test.lock; public class Main { private static Lock lock = new ArrayLock(100); //private static Lock lock = new CLHLock(); //private static TimeCost timeCost = new TimeCost(new TTASLock()); private static volatile int value = 0; public static void method(){ lock.lock(); System.out.println("Value: " + ++value); lock.unlock(); } public static void main(String[] args) { for(int i = 0; i < 50; i ++){ Thread t = new Thread(new Runnable(){ @Override public void run() { method(); } }); t.start(); } } }
运行结果是1到50的顺序自增,说明锁保证了同一时刻只有一个线程在对volatile变量++,是正确的
Value: 1 Value: 2 Value: 3 Value: 4 Value: 5 Value: 6 Value: 7 Value: 8 Value: 9 Value: 10 Value: 11 Value: 12 Value: 13 Value: 14 Value: 15 Value: 16 Value: 17 Value: 18 Value: 19 Value: 20 Value: 21 Value: 22 Value: 23 Value: 24 Value: 25 Value: 26 Value: 27 Value: 28 Value: 29 Value: 30 Value: 31 Value: 32 Value: 33 Value: 34 Value: 35 Value: 36 Value: 37 Value: 38 Value: 39 Value: 40 Value: 41 Value: 42 Value: 43 Value: 44 Value: 45 Value: 46 Value: 47 Value: 48 Value: 49 Value: 50
使用无界队列锁的情况也是正确的,由于篇幅原因这里就不帖代码了。
再看平均获取锁的时间。
package com.test.lock; public class Main { private static Lock lock = new TimeCost(new CLHLock()); //private static Lock lock = new CLHLock(); //private static TimeCost timeCost = new TimeCost(new TTASLock()); private static volatile int value = 0; public static void method(){ lock.lock(); //System.out.println("Value: " + ++value); lock.unlock(); } public static void main(String[] args) { for(int i = 0; i < 100; i ++){ Thread t = new Thread(new Runnable(){ @Override public void run() { method(); } }); t.start(); } } }
在100个线程并发的情况下,
ArrayLock获取锁的平均时间是: 719550 ns
CLHLock获取锁的平均时间是: 488577 ns
可以看到,队列锁在使用多个共享变量自旋的情况下,减少了一致性流量,比TASLock和TTASLock 提高了程序的性能。而CLHLock比ArrayLock有更好的扩展性和性能,是一种很好的自旋锁实现。