聊聊高并发(七)实现几种自旋锁(二)

聊聊高并发(六)实现几种自旋锁(一) 这篇中实现了两种基本的自旋锁:TASLock和TTASLock,它们的问题是会进行频繁的CAS操作,引发大量的缓存一致性流量,导致锁的性能不好。

对TTASLock的一种改进是BackoffLock,它会在锁高争用的情况下对线程进行回退,减少竞争,减少缓存一致性流量。但是BackoffLock有三个主要的问题:

1. 还是有大量的缓存一致性流量,因为所有线程在同一个共享变量上旋转,每一次成功的获取锁都会产生缓存一致性流量

2. 因为回退的存在,不能及时获取锁释放的信息,存在一个时间差,导致获取锁的时间变长

3. 不能保证无饥饿,有的线程可能一直无法获取锁

这篇会实现2种基于队列的锁,来解决上面提到的三个问题。主要的思路是将线程组织成一个队列,有4个优点:

1. 每个线程只需要检查它的前驱线程的状态,将自旋的变量从一个分散到多个,减少缓存一致性流量

2. 可以即使获取锁释放的通知

3. 队列提供了先来先服务的公平性

4. 无饥饿,队列中的每个线程都能保证被执行到

队列锁分为两类,一类是基于有界队列,一类是基于无界队列。

先看一下基于有界队列的队列锁。 ArrayLock有3个特点:

1. 基于一个volatile数组来组织线程

2. 通过一个原子变量tail来表示对尾线程

3. 通过一个ThreadLocal变量给每个线程一个索引号,表示它位于队列的哪个位置。

package com.test.lock;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 有界队列锁,使用一个volatile数组来组织线程
 * 缺点是得预先知道线程的规模n,所有线程获取同一个锁的次数不能超过n
 * 假设L把锁,那么锁的空间复杂度为O(Ln)
 * **/
public class ArrayLock implements Lock{
	// 使用volatile数组来存放锁标志, flags[i] = true表示可以获得锁
	private volatile boolean[] flags;

	// 指向新加入的节点的后一个位置
	private AtomicInteger tail;

	// 总容量
	private final int capacity;

	private ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>(){
		 protected Integer initialValue() {
		        return 0;
		 }
	};

	public ArrayLock(int capacity){
		this.capacity = capacity;
		flags = new boolean[capacity];
		tail = new AtomicInteger(0);
		// 默认第一个位置可获得锁
		flags[0] = true;
	}

	@Override
	public void lock() {
		int slot = tail.getAndIncrement() % capacity;
		mySlotIndex.set(slot);
		// flags[slot] == true 表示获得了锁, volatile变量保证锁释放及时通知
		 while(!flags[slot]){

		 }
	 }

	 @Override
	 public void unlock() {
	       int slot = mySlotIndex.get();
	       flags[slot] = false;
	       flags[(slot + 1) % capacity] = true;
	}
<pre name="code" class="java">
        public String toString(){
           return "ArrayLock";
        }

}


我们可以看到有界队列锁的缺点是:

1. 它必须知道线程的规模数,对于同一把锁如果线程获取的次数超过了n会出现线程状态被覆盖的问题

2. 空间复杂度是O(Ln)

3. 对于共享的volatile数组来保存线程获取锁的状态,仍然可能存在缓存一致性。我们知道CPU读取一次内存时,会读满数据总线的位长,比如64位总线,一次读取64位长度的数据。那么对于boolean类型的数组,boolean长度是1个字节,那么一次读取能读到8个boolean变量,而高速缓存的一个缓存块的长度也是64位,也就是说一个缓存块上可以保存8个boolean变量,所以如果一次CAS操作修改了一个变量导致一个缓存块无效,它实际上可能导致8个变量失效。

解决办法是把变量以8个长度为单位分散,比如flag[0] = thread1  flag[8] = thread2。这样的问题是消耗的空间更大。

无界队列锁可以克服有界队列锁的几个问题。

1. 它使用链表来代替数组,实现无界队列

2. 使用两个ThreadLocal变量表示指针,一个指向自己的节点,一个指向前一个节点

3. 使用一个原子引用变量指向队尾

4. 空间复杂度降低,如果有L把锁,n个线程,每个线程只获取一把锁,那么空间复杂度为O(L + n)

5. 对同一个锁,一个线程可以多次获取而不增加空间复杂度

6. 当线程结束后,GC会自动回收内存

package com.test.lock;

import java.util.concurrent.atomic.AtomicReference;

/**
 * 无界队列锁,使用一个链表来组织线程
 * 假设L把锁,n个线程,那么锁的空间复杂度为O(L+n)
 * **/
public class CLHLock implements Lock{
	// 原子变量指向队尾
	private AtomicReference<QNode> tail;
	// 两个指针,一个指向自己的Node,一个指向前一个Node
	ThreadLocal<QNode> myNode;
	ThreadLocal<QNode> myPreNode;

	public CLHLock(){
		tail = new AtomicReference<QNode>(new QNode());
		myNode = new ThreadLocal<QNode>(){
			protected QNode initialValue(){
				return new QNode();
			}
		};
		myPreNode = new ThreadLocal<QNode>(){
			protected QNode initialValue(){
				return null;
			}
		};
	}

	@Override
	public void lock() {
		QNode node = myNode.get();
		node.lock = true;
		// CAS原子操作,保证原子性
		QNode preNode = tail.getAndSet(node);
		myPreNode.set(preNode);
		// volatile变量,能保证锁释放及时通知
		// 只对前一个节点的状态自旋,减少缓存一致性流量
		while(preNode.lock){

		}
	}

	@Override
	public void unlock() {
		QNode node = myNode.get();
		node.lock = false;
		// 把myNode指向preNode,目的是保证同一个线程下次还能使用这个锁,因为myNode原来指向的节点有它的后一个节点的preNode引用
		// 防止这个线程下次lock时myNode.get获得原来的节点
		myNode.set(myPreNode.get());
	}

	public static class QNode {
		volatile boolean lock;
	}

        public String toString(){
           return "CLHLock";
        }
 }

下面我们从正确性和平均获取锁的时间上来测试这两种锁。

我们设计一个测试用例来验证正确性: 使用50个线程对一个volatile变量++操作,由于volatile变量++操作不是原子的,在不加锁的情况下,可能同时有多个线程同时对voaltile变量++, 最终的结果是无法预测的。然后使用这两种锁,先获取锁再volatile变量++,由于volatile变量会防止重排序,并能保证可见性,我们可以确定如果锁是正确获取的,也就是说同一时刻只有一个线程对volatile变量++,那么结果肯定是顺序的1到50。

先看不加锁的情况

package com.test.lock;

public class Main {
	//private static Lock lock = new ArrayLock(150);

	private static Lock lock = new CLHLock();

	//private static TimeCost timeCost = new TimeCost(new TTASLock());

	private static volatile int value = 0;
	public static void method(){
		//lock.lock();
		System.out.println("Value: " + ++value);
		//lock.unlock();
	}

	public static void main(String[] args) {
		for(int i = 0; i < 50; i ++){
		   Thread t = new Thread(new Runnable(){

		   @Override
	              public void run() {
			 method();
		   }

	           });
		  t.start();
	        }
	}

}

运行结果: 我们可以看到确实是发生的线程同时对volatile变量++的操作,结果是无法预料的

Value: 1
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 13
Value: 12
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 37
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 50

使用有界队列锁:

package com.test.lock;

public class Main {
	private static Lock lock = new ArrayLock(100);

	//private static Lock lock = new CLHLock();

	//private static TimeCost timeCost = new TimeCost(new TTASLock());

	private static volatile int value = 0;
	public static void method(){
		lock.lock();
		System.out.println("Value: " + ++value);
		lock.unlock();
	}

	public static void main(String[] args) {
		for(int i = 0; i < 50; i ++){
			Thread t = new Thread(new Runnable(){

				@Override
				public void run() {
					method();
				}

			});
			t.start();
		}
	}

}

运行结果是1到50的顺序自增,说明锁保证了同一时刻只有一个线程在对volatile变量++,是正确的

Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
Value: 50

使用无界队列锁的情况也是正确的,由于篇幅原因这里就不帖代码了。

再看平均获取锁的时间。

package com.test.lock;

public class Main {
	private static Lock lock = new TimeCost(new CLHLock());

	//private static Lock lock = new CLHLock();

	//private static TimeCost timeCost = new TimeCost(new TTASLock());

	private static volatile int value = 0;
	public static void method(){
		lock.lock();
		//System.out.println("Value: " + ++value);
		lock.unlock();
	}

	public static void main(String[] args) {
		for(int i = 0; i < 100; i ++){
			Thread t = new Thread(new Runnable(){

				@Override
				public void run() {
					method();
				}

			});
			t.start();
		}
	}

}

在100个线程并发的情况下,

ArrayLock获取锁的平均时间是: 719550 ns

CLHLock获取锁的平均时间是:  488577 ns

可以看到,队列锁在使用多个共享变量自旋的情况下,减少了一致性流量,比TASLock和TTASLock 提高了程序的性能。而CLHLock比ArrayLock有更好的扩展性和性能,是一种很好的自旋锁实现。

时间: 2024-09-30 19:17:41

聊聊高并发(七)实现几种自旋锁(二)的相关文章

聊聊高并发(十三)实现几种自旋锁(六)

聊聊高并发(十一)实现几种自旋锁(五) 给出了限时有界队列锁的lock和unlock实现.这篇给出tryLock的实现 tryLock比lock略微复杂一点.要处理超时的情况.超时有几种情况: 1. 第一步在等待队列还没有获得节点的时候超时,直接返回false就可以 2. 第二步在等待队列已经获得节点可是还没有增加工作队列时超时,把节点状态能够直接改成FREE给兴许线程使用,然后返回false就可以 3. 第三步在前一个节点的状态上自旋时超时,将节点的preNode设置成前一个节点,然后将节点状

聊聊高并发(六)实现几种自旋锁(一)

在聊聊高并发(五)理解缓存一致性协议以及对并发编程的影响 我们了解了处理器缓存一致性协议的原理,并且提到了它对并发编程的影响,"多个线程对同一个变量一直使用CAS操作,那么会有大量修改操作,从而产生大量的缓存一致性流量,因为每一次CAS操作都会发出广播通知其他处理器,从而影响程序的性能." 这一篇我们通过两种实现自旋锁的方式来看一下不同的编程方式带来的程序性能的变化. 先理解一下什么是自旋,所谓自旋就是线程在不满足某种条件的情况下,一直循环做某个动作.所以对于自旋锁来锁,当线程在没有获

聊聊高并发(十一)实现几种自旋锁(五)

在聊聊高并发(九)实现几种自旋锁(四)中实现的限时队列锁是一个基于链表的限时无界队列锁,它的tryLock方法支持限时操作和中断操作,无饥饿,保证了先来先服务的公平性,在多个共享状态上自旋,是低争用的.但是它的一个缺点是牺牲了空间,为了让线程可以多次使用锁,每次Lock的时候都要new QNode,并设置给线程,而不能重复使用原来的节点. 这篇说说限时有界队列锁,它采用了有界队列,并且和ArrayLock不同,它不限制线程的个数.它的特点主要有 1. 采用有界队列,减小了空间复杂度,L把锁的空间

聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

前几篇分析了一下AQS的原理和实现.这篇拿Semaphore信号量做样例看看AQS实际是怎样使用的. Semaphore表示了一种能够同一时候有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据,仅仅有拿到了票据的线程尽能够进入临界区,否则就等待.直到获得释放出的票据. Semaphore经常使用在资源池中来管理资源.当状态仅仅有1个0两个值时,它退化成了一个相互排斥的同步器.类似锁. 以下来看看Semaphore的代码. 它维护了一个内部类Sync来继承AQS,定制tryXXX方法来使

聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

这篇说说java.util.concurrent.atomic包里的类,总共12个.网上有非常多文章解析这几个类.这里挑些重点说说. 这12个类能够分为三组: 1. 普通类型的原子变量 2. 数组类型的原子变量 3. 域更新器 普通类型的原子变量的6个, 1. 当中AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference分别相应boolean, int,  long, object完毕主要的原子操作 2. AtomicMarkableRe

聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何实现的锁降级.但是下面几个问题没说清楚,这篇补充一下 1. 释放锁时的优先级问题,是让写锁先获得还是先让读锁先获得 2. 是否允许读线程插队 3. 是否允许写线程插队,因为读写锁一般用在大量读,少量写的情况,如果写线程没有优先级,那么可能造成写线程的饥饿 关于释放锁后是让写锁先获得还是让读锁先获得,

聊聊高并发(十二)分析java.util.concurrent.atomic.AtomicStampedReference源码来看如何解决CAS的ABA问题

在聊聊高并发(十一)实现几种自旋锁(五)中使用了java.util.concurrent.atomic.AtomicStampedReference原子变量指向工作队列的队尾,为何使用AtomicStampedReference原子变量而不是使用AtomicReference是因为这个实现中等待队列的同一个节点具备不同的状态,而同一个节点会多次进出工作队列,这就有可能出现出现ABA问题. 熟悉并发编程的同学应该知道CAS操作存在ABA问题.我们先看下CAS操作. CAS(Compare and

【转载】聊聊高并发系统之降级特技

原文:聊聊高并发系统之降级特技 在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.之前已经有一些文章介绍过缓存和限流了.本文将详细聊聊降级.当访问量剧增.服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务.系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级.本文将介绍一些笔者在实际工作中遇到的或见到过的一些降级方案供大家参考. 降级的最终目的是保证核心服务可用,即使是有损的.而且有些服务是无法降级的(如加入购

聊聊高并发(十八)理解AtomicXXX.lazySet方法

看过java.util.concurrent.atomic包里面各个AtomicXXX类实现的同学应该见过lazySet方法,比如AtomicBoolean类的lazySet方法 public final void lazySet(boolean newValue) { int v = newValue ? 1 : 0; unsafe.putOrderedInt(this, valueOffset, v); } 它的底层实现调用了Unsafe的putOrderedInt方法,来看看putOrde