聊聊高并发(十一)实现几种自旋锁(五)

聊聊高并发(九)实现几种自旋锁(四)中实现的限时队列锁是一个基于链表的限时无界队列锁,它的tryLock方法支持限时操作和中断操作,无饥饿,保证了先来先服务的公平性,在多个共享状态上自旋,是低争用的。但是它的一个缺点是牺牲了空间,为了让线程可以多次使用锁,每次Lock的时候都要new
QNode,并设置给线程,而不能重复使用原来的节点。

这篇说说限时有界队列锁,它采用了有界队列,并且和ArrayLock不同,它不限制线程的个数。它的特点主要有

1. 采用有界队列,减小了空间复杂度,L把锁的空间复杂度在最坏的情况下(有界队列长度为1)是O(L)

2. 非公平,不保证先来先服务,这也是一个很常见的需求

3. 因为是有界队列,所以在高并发下存在高争用,需要结合回退锁来降低争用

它的实现思路是:

1. 采用了一个有界的等待队列,等待队列的每个节点都有多种状态,每个节点是可复用的

2. 采用了一个工作队列,Tail指针指向工作队列的队尾节点。获取和是否锁的操作是在工作队列中的节点之间进行

3. 由于是限时队列,并支持中断,所以队列中的节点都是可以退出队列的

4. 算法分为三步,第一步是线程从有界的等待队列中获得一个节点,并设置为WAITING,如果没有获得,就自旋

第二步是把这个节点加入工作队列,并获得前一个节点的指针

第三步是在前一个节点的状态上自旋,直到获得锁,并把前一个节点RELEASED状态改为FREE

节点有4种状态:

1. FREE:  表示节点可以被获得。当前一个节点释放锁,并设置状态为RELEASED的时候,后一个节点需要把前一个节点设置为FREE。当节点在没有进入工作队列时超时,也被设置为FREE.

2. RELEASED:节点释放锁时设置为RELEASED,需要后续节点把它设置为FREE。如果是工作队列的最后一个节点,那么RELEASED状态的节点在第一步时可被获得

3. WAITING:表示获得了锁或在工作队列中等待锁。是在第一步中被设置的,第一步的结果就是获得一个状态为WAITING的节点

4. ABORTED:工作队列中的节点超时或者中断的节点被设置为ABORTED。 队尾的ABORTED节点可以被第一步获得,队中的ABORTED节点不能被第一步获取,只能把它的preNode指针指向它的前一个节点,表示它自己不能被获取了

理解节点这4种状态的转变是理解这个设计的关键。这个设计比较复杂,从篇幅考虑,这篇只介绍Lock和UnLock操作,下一篇说tryLock限时操作

1. 创建枚举类型State来表示状态

2. 创建QNode表示节点,使用一个AtomicReference原子变量指向它的State,以便于支持CAS操作。节点维护一个PreNode引用,只有节点被Aborted的时候才设置这个引用的值,表示跳过这个节点

3. 一个有界的QNode队列,使用数组表示

4. MIN_BACKOFF和MAX_BACKOFF支持回退操作,单位是毫秒。这两个值依赖于硬件性能,需要通过不断测试来获取最优值

5. 一个Random随机数,来产生随即的数组下标,非公平性需要

6. 一个AtomicStampedReference类型的原子变量作为队尾指针tail。AtomicStampedReference采用了版本号来避免CAS操作的ABA问题。这很重要,因为有界等待队列的节点会多次进出工作队列,所以可能发生同一个节点被前一个线程准备CAS操作时,已经被后几个线程进出了工作队列,导致第一个线程拿到的QNode的状态不正确。

7. lock实现分为三步,上文已经说过了

8. unlock操作就是两步,第一修改状态通知其他线程获取锁。第二是设置自己的节点引用,以便下次可再次获得锁而不影响其他线程的状态。这里是把线程指向的节点状态设置为RELEASED,同时设置线程的节点引用为空,这样其他线程可以继续使用这个节点。

package com.zc.lock;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * 限时有界队列锁,并且直接不限数量的线程
 * 由于是有界的队列,所以争用激烈,可以复合回退锁的概念,减少高争用
 * 分为三步:
 * 第一步是取得一个State为FREE的节点,设置为WAITING
 * 第二步是把这个节点加入队列,获取前一个节点
 * 第三步是在前一个节点上自旋
 *
 * 优点是L个锁的空间复杂度是O(L),而限时无界队列锁的空间复杂度为O(Ln)
 * **/
public class CompositeLock implements TryLock{

	enum State {FREE, WAITING, RELEASED, ABORTED}

	class QNode{
		AtomicReference<State> state = new AtomicReference<CompositeLock.State>(State.FREE);
		volatile QNode preNode;
	}

	private final int SIZE = 10;

	private final int MIN_BACKOFF = 1;

	private final int MAX_BACKOFF = 10;

	private Random random = new Random();

	// 有界的QNode数组,表示队列总共可以使用的节点数
	private QNode[] waitings = new QNode[10];

	// 指向队尾节点,使用AtomicStampedReference带版本号的原子引用变量,可以防止ABA问题,因为这个算法实现需要对同一个Node多次进出队列
	private AtomicStampedReference<QNode> tail = new AtomicStampedReference<CompositeLock.QNode>(null, 0);

	// 每个线程维护一个QNode引用
	private ThreadLocal<QNode> myNode = new ThreadLocal<CompositeLock.QNode>(){
		public QNode initialValue(){
			return null;
		}
	};

	public CompositeLock(){
		for(int i = 0; i < SIZE; i ++){
			waitings[i] = new QNode();
		}
	}

	@Override
	public void lock() {
		Backoff backoff = new Backoff(MIN_BACKOFF, MAX_BACKOFF);
		QNode node = waitings[random.nextInt(SIZE)];

		// 第一步: 先获得数组里的一个Node,并把它的状态设置为WAITING,否则就自旋
		GETNODE:
		while(true){
			while(node.state.get() != State.FREE){
				// 因为释放锁时只是设置了State为RELEASED,由后继的线程来设置RELEASED为FREE
				// 如果该节点已经是队尾节点了并且是RELEASED,那么可以直接可以被使用
				// 获取当前原子引用变量的版本号
				int[] currentStamp = new int[1];
				QNode tailNode = tail.get(currentStamp);
				if(tailNode == node && tailNode.state.get() == State.RELEASED){
					if(tail.compareAndSet(tailNode, null, currentStamp[0], currentStamp[0] + 1)){
						node.state.set(State.WAITING);
						break GETNODE;
					}
				}
			}
			if(node.state.compareAndSet(State.FREE, State.WAITING)){
				break;
			}
			try {
				backoff.backoff();
			} catch (InterruptedException e) {
				throw new RuntimeException("Thread interrupted, stop to get the lock");
			}
		}
		// 第二步加入队列
		int[] currentStamp = new int[1];
		QNode preTailNode = null;
		do{
			preTailNode = tail.get(currentStamp);
		}
		// 如果没加入队列,就一直自旋
		while(!tail.compareAndSet(preTailNode, node, currentStamp[0], currentStamp[0] + 1));

		// 第三步在前一个节点自旋,如果前一个节点为null,证明是第一个加入队列的节点
		if(preTailNode != null){
			// 在前一个节点的状态自旋
			while(preTailNode.state.get() != State.RELEASED){}
			// 设置前一个节点的状态为FREE,可以被其他线程使用
			preTailNode.state.set(State.FREE);
		}

		// 将线程的myNode指向获得锁的node
		myNode.set(node);
		return;
	}

	@Override
	public void unlock() {
		QNode node = myNode.get();
		node.state.set(State.RELEASED);
		myNode.set(null);
	}

	@Override
	public boolean trylock(long time, TimeUnit unit)
			throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}

}

采用我们之前的验证锁正确性的测试用例来测试lock, unlock操作。

package com.zc.lock;

public class Main {
	//private static Lock lock = new TimeCost(new ArrayLock(150));

	private static Lock lock = new CompositeLock();

	//private static TimeCost timeCost = new TimeCost(new TTASLock());

	private static volatile int value = 0;
	public static void method(){
		lock.lock();
		System.out.println("Value: " + ++value);
		lock.unlock();
	}

	public static void main(String[] args) {
		for(int i = 0; i < 50; i ++){
			Thread t = new Thread(new Runnable(){

				@Override
				public void run() {
					method();
				}

			});
			t.start();
		}
	}

}

结果是顺序打印的,证明锁是正确的,每次只有一个线程获得了锁

Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
时间: 2024-09-29 08:54:19

聊聊高并发(十一)实现几种自旋锁(五)的相关文章

聊聊高并发(十三)实现几种自旋锁(六)

聊聊高并发(十一)实现几种自旋锁(五) 给出了限时有界队列锁的lock和unlock实现.这篇给出tryLock的实现 tryLock比lock略微复杂一点.要处理超时的情况.超时有几种情况: 1. 第一步在等待队列还没有获得节点的时候超时,直接返回false就可以 2. 第二步在等待队列已经获得节点可是还没有增加工作队列时超时,把节点状态能够直接改成FREE给兴许线程使用,然后返回false就可以 3. 第三步在前一个节点的状态上自旋时超时,将节点的preNode设置成前一个节点,然后将节点状

聊聊高并发(七)实现几种自旋锁(二)

在聊聊高并发(六)实现几种自旋锁(一) 这篇中实现了两种基本的自旋锁:TASLock和TTASLock,它们的问题是会进行频繁的CAS操作,引发大量的缓存一致性流量,导致锁的性能不好. 对TTASLock的一种改进是BackoffLock,它会在锁高争用的情况下对线程进行回退,减少竞争,减少缓存一致性流量.但是BackoffLock有三个主要的问题: 1. 还是有大量的缓存一致性流量,因为所有线程在同一个共享变量上旋转,每一次成功的获取锁都会产生缓存一致性流量 2. 因为回退的存在,不能及时获取

聊聊高并发(六)实现几种自旋锁(一)

在聊聊高并发(五)理解缓存一致性协议以及对并发编程的影响 我们了解了处理器缓存一致性协议的原理,并且提到了它对并发编程的影响,"多个线程对同一个变量一直使用CAS操作,那么会有大量修改操作,从而产生大量的缓存一致性流量,因为每一次CAS操作都会发出广播通知其他处理器,从而影响程序的性能." 这一篇我们通过两种实现自旋锁的方式来看一下不同的编程方式带来的程序性能的变化. 先理解一下什么是自旋,所谓自旋就是线程在不满足某种条件的情况下,一直循环做某个动作.所以对于自旋锁来锁,当线程在没有获

聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何实现的锁降级.但是下面几个问题没说清楚,这篇补充一下 1. 释放锁时的优先级问题,是让写锁先获得还是先让读锁先获得 2. 是否允许读线程插队 3. 是否允许写线程插队,因为读写锁一般用在大量读,少量写的情况,如果写线程没有优先级,那么可能造成写线程的饥饿 关于释放锁后是让写锁先获得还是让读锁先获得,

聊聊高并发(四十一)解析java.util.concurrent各个组件(十七) 任务的异步执行和状态控制

聊聊高并发(三十九)解析java.util.concurrent各个组件(十五) 理解ExecutorService接口的设计这篇说了ExecutorService接口扩展了Executor接口,在执行任务的基础上,提供了执行框架生命周期的管理,任务的异步执行,批量任务的执行的能力.AbstractExecutorService抽象类实现了ExecutorService接口,提供了任务异步执行和批量执行的默认实现.这篇说说任务的异步执行和状态控制 说明一点,使用Executor框架执行任务的方式

聊聊高并发(十二)分析java.util.concurrent.atomic.AtomicStampedReference源码来看如何解决CAS的ABA问题

在聊聊高并发(十一)实现几种自旋锁(五)中使用了java.util.concurrent.atomic.AtomicStampedReference原子变量指向工作队列的队尾,为何使用AtomicStampedReference原子变量而不是使用AtomicReference是因为这个实现中等待队列的同一个节点具备不同的状态,而同一个节点会多次进出工作队列,这就有可能出现出现ABA问题. 熟悉并发编程的同学应该知道CAS操作存在ABA问题.我们先看下CAS操作. CAS(Compare and

聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

这篇说说java.util.concurrent.atomic包里的类,总共12个.网上有非常多文章解析这几个类.这里挑些重点说说. 这12个类能够分为三组: 1. 普通类型的原子变量 2. 数组类型的原子变量 3. 域更新器 普通类型的原子变量的6个, 1. 当中AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference分别相应boolean, int,  long, object完毕主要的原子操作 2. AtomicMarkableRe

【转载】聊聊高并发系统之降级特技

原文:聊聊高并发系统之降级特技 在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.之前已经有一些文章介绍过缓存和限流了.本文将详细聊聊降级.当访问量剧增.服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务.系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级.本文将介绍一些笔者在实际工作中遇到的或见到过的一些降级方案供大家参考. 降级的最终目的是保证核心服务可用,即使是有损的.而且有些服务是无法降级的(如加入购

聊聊高并发(十八)理解AtomicXXX.lazySet方法

看过java.util.concurrent.atomic包里面各个AtomicXXX类实现的同学应该见过lazySet方法,比如AtomicBoolean类的lazySet方法 public final void lazySet(boolean newValue) { int v = newValue ? 1 : 0; unsafe.putOrderedInt(this, valueOffset, v); } 它的底层实现调用了Unsafe的putOrderedInt方法,来看看putOrde