【JDK】JDK源码分析-Semaphore

概述

Semaphore 是并发包中的一个工具类,可理解为信号量。通常可以作为限流器使用,即限制访问某个资源的线程个数,比如用于限制连接池的连接数。

打个通俗的比方,可以把 Semaphore 理解为一辆公交车:车上的座位数(初始的“许可” permits 数量)是固定的,行驶期间如果有人上车(获取许可),座位数(许可数量)就会减少,当人满的时候不能再继续上车了(获取许可失败);而有人下车(释放许可)后就空出了一些座位,其他人就可以继续上车了。

下面具体分析其代码实现。

代码分析

Semaphore 的方法如下:

其中主要方法是 acquire() 和 release() 相关的一系列方法,它们的作用类似。我们先从构造器开始分析。

构造器

private final Sync sync;

// 初始化 Semaphore,传入指定的许可数量,非公平
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// 初始化 Semaphore,传入指定的许可数量,指定是否公平
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

构造器初始化了 Sync 变量,根据传入的 fair 值指定为 FairSync 或 NonFairSync,下面分析这三个类。

内部嵌套类 Sync:

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    // 构造器,将父类 AQS 的 state 变量初始化为给定的 permits
    Sync(int permits) {
        setState(permits);
    }

    // 非公平方式尝试获取许可(减少 state 的值)
    final int nonfairTryAcquireShared(int acquires) {
        // 自旋操作
        for (;;) {
            // 获取许可值(state),并尝试 CAS 修改为减去后的结果
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // 释放许可(增加 state 的值)
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 操作与获取类似,不同的在于此处是增加 state 值
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 一些方法未给出...
}

可以看到 Sync 类继承自 AQS,并重写了 AQS 的 tryReleaseShared 方法,其中获取和释放许可分别对应的是对 AQS 中 state 值的减法和加法操作。具体可参考前文对 AQS 共享模式的分析「JDK源码分析-AbstractQueuedSynchronizer(3)」。

NonFairSync (非公平版本实现):

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    // 调用父类 Sync 的构造器来实现
    NonfairSync(int permits) {
        super(permits);
    }
    // 重写 AQS 的 tryAcquireShared 方法,代码实现在父类 Sync 中
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

FairSync (公平版本实现):

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    // 构造器调用父类 Sync 的构造器来实现
    FairSync(int permits) {
        super(permits);
    }

    // 重写 AQS 的 tryAcquireShared 方法,尝试获取许可(permit)
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 若队列中有其他线程等待,则获取失败(这就是体现“公平”的地方)
            if (hasQueuedPredecessors())
                return -1;
            // 获取当前的许可值
            int available = getState();
            // 计算剩余值
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

PS: 体现“公平”的地方在于 tryAcquireShared 方法中,公平的版本会先判断队列中是否有其它线程在等待(hasQueuedPredecessors 方法)。

主要方法的代码实现:

// 获取一个许可(可中断)
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 获取一个许可(不响应中断)
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

// 尝试获取一个许可
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

// 尝试获取一个许可(有超时等待)
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 释放一个许可
public void release() {
    sync.releaseShared(1);
}

还有一系列类似的操作,只不过获取/释放许可的数量可以指定:

// 获取指定数量的许可(可中断)
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

// 获取指定数量的许可(不可中断)
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

// 尝试获取指定数量的许可
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

// 尝试获取指定数量的许可(有超时等待)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

// 释放指定数量的许可
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

可以看到,Semaphore 的主要方法都是在嵌套类 FairSync 和 NonFairSync 及其父类 Sync 中实现的,内部嵌套类也是 AQS 的典型用法。

场景举例

为了便于理解 Semaphore 的用法,下面简单举例分析(仅供参考):

public class SemaphoreTest {
  public static void main(String[] args) {
    // 初始化 Semaphore
    // 这里的许可数为 2,即同时最多有 2 个线程可以获取到
    Semaphore semaphore = new Semaphore(2);
    for (int i = 0; i < 50; i++) {
      new Thread(() -> {
        try {
          // 获取许可
          semaphore.acquire();
          System.out.println(Thread.currentThread().getName() + " 正在执行..");
          // 模拟操作
          TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          // 释放许可
          semaphore.release();
        }
      }).start();
    }
  }
}
/*  执行结果(仅供参考):
    Thread-0 正在执行..
    Thread-1 正在执行..
    Thread-2 正在执行..
    Thread-3 正在执行..
    ...
*/

这里把 Semaphore 的初始许可值设为 2,表示最多有两个线程可同时获取到许可(运行程序可发现线程是两两一起执行的)。设置为其他值也是类似的。

比较特殊的是,如果把 Semaphore 的初始许可值设为 1,可以当做“互斥锁”来使用。

小结

Semaphore 是并发包中的一个工具类,其内部是基于 AQS 共享模式实现的。通常可以作为限流器使用,比如限定连接池等的大小。

相关阅读:

JDK源码分析-AbstractQueuedSynchronizer(3)

Stay hungry, stay foolish.

PS: 本文首发于微信公众号【WriteOnRead】。

原文地址:https://www.cnblogs.com/jaxer/p/11331043.html

时间: 2024-10-20 15:23:32

【JDK】JDK源码分析-Semaphore的相关文章

jdk源码分析总览

今天看到了一个源码分析按照重要性排序的例子, 这里拿过来用了,之后按照这个顺序不断的完善源码的内容. 引用的出处忘记了(对作者说声抱歉) 很多java开发的小伙伴都会阅读jdk源码,然而确不知道应该从哪读起.以下为小编整理的通常所需阅读的源码范围. 标题为包名,后面序号为优先级1-4,优先级递减 1.java.lang 1) Object 12) String 13) AbstractStringBuilder 14) StringBuffer 15) StringBuilder 16) Boo

JDK源码分析之concurrent包(三) -- Future方式的实现

上一篇我们基于JDK的源码对线程池ThreadPoolExecutor的实现做了分析,本篇来对Executor框架中另一种典型用法Future方式做源码解读.我们知道Future方式实现了带有返回值的程序的异步调用,关于异步调用的场景大家可以自行脑补Ajax的应用(获取返回结果的方式不同,Future是主动询问获取,Ajax是回调函数),这里不做过多说明. 在进入源码前,首先来看下Future方式相关的API: 接口Callable:有返回结果并且可能抛出异常的任务: 接口Future:表示异步

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用. 1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程.BlockingQueue的核心方法有: boolean add(E e) ,把 e 添加到BlockingQueue里.如果BlockingQueue可以容纳,则返回true,否则抛出异常. boolean offer(E e),表示如果可能的话,将 e 加到B

JDK源码分析之String篇

------------------------------String在内存中的存储情况(一下内容摘自参考资料1)----------------------------------- 前提:先了解下什么是声明,什么时候才算是产生了对象实例 其中x并未看到内存分配,变量在使用前必须先声明,再赋值,然后才可以使用.java基础数据类型会用对应的默认值进行初始化 一.首先看看Java虚拟机JVM的内存块及其变量.对象内存空间是怎么存储分配的 1.栈:存放基本数据类型及对象变量的引用,对象本身不存放

JDK中String类的源码分析(二)

1.startsWith(String prefix, int toffset)方法 包括startsWith(*),endsWith(*)方法,都是调用上述一个方法 1 public boolean startsWith(String prefix, int toffset) { 2 char ta[] = value; 3 int to = toffset; 4 char pa[] = prefix.value; 5 int po = 0; 6 int pc = prefix.value.l

【JDK源码分析】通过源码分析CyclicBarrier

前言 CyclicBarrier它是什么?一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点.类似于朋友之间联系要在中午聚个会,几个朋友全部到齐后才开始喝酒吃菜. 源码 CyclicBarrier属性和构造器 public class CyclicBarrier { // 互斥锁 private final ReentrantLock lock = new ReentrantLock(); // 条件等待 private final Condition trip = lock.new

【源码阅读系列】JDK 8 ConcurrentHashMap 源码分析之 由transfer引发的bug

不阅读源码就不会发现这个事儿 前段时间在阅读ConcurrentHashMap源码,版本JDK 8,目前源码研究已经告一段落.感谢鲁道的ConcurrentHashMap源码分析文章,读到文章,感觉和作者发生了一些交流,解答了很多疑惑,也验证了一些想法.鲁道在简书的addCount分析文章点这里 (文章底部的评论中就有这篇文章发酵的原由).鲁道还有其他ConcurrentHashMap源码分析的系列文章,在简书.掘金都有分布,感兴趣的同学可以进一步追踪. 推完文章,回到本篇的主题"阅读源码&qu

JDK源码阅读(一):Object源码分析

最近经过某大佬的建议准备阅读一下JDK的源码来提升一下自己 所以开始写JDK源码分析的文章 阅读JDK版本为1.8 目录 Object结构图 构造器 equals 方法 getClass 方法 hashCode 方法 toString 方法 finalize 方法 registerNatives 方法 1. Object结构图 2. 类构造器 ??类构造器是创建Java对象的方法之一.一般我们都使用new关键字来进行实例,还可以在构造器中进行相应的初始化操作. ??在一个Java类中必须存在一个

【JDK】JDK源码分析-CountDownLatch

概述 CountDownLatch 是并发包中的一个工具类,它的典型应用场景为:一个线程等待几个线程执行,待这几个线程结束后,该线程再继续执行. 简单起见,可以把它理解为一个倒数的计数器:初始值为线程数,每个线程结束时执行减 1 操作,当计数器减到 0 时等待的线程再继续执行. 代码分析 CountDownLatch 的类签名和主要方法如下: public class CountDownLatch {} 常用方法为:await().await(long, TimeUnit) 和 countDow