Semaphore 源码分析

Semaphore

Semaphore 是基于同步器实现的计数信号量。
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
公平的信号量可以保证不会出现线程饥饿,而非公平的信号量可以提供更高的吞吐量。

创建实例

    private final Sync sync;

    /**
     *  信号量的同步器实现
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            // 写入许可数
            setState(permits);
        }

        final int getPermits() {
            // 获取可用许可数
            return getState();
        }

        /**
         *  非公平地获取 acquires 个许可
         * created by ZXD at 15 Dec 2018 T 11:43:17
         * @param acquires
         * @return
         */
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                // 读取可用许可数
                final int available = getState();
                // 计算剩余许可数
                final int remaining = available - acquires;
                /**
                 * 1)剩余许可数 < 0,则直接返回,不更新可用许可数
                 * 2)更新可用许可书
                 */
                if (remaining < 0 ||
                        compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }

        /**
         *  尝试释放 releases 个许可
         * created by ZXD at 15 Dec 2018 T 11:44:56
         * @param releases
         * @return
         */
        @Override
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                final int current = getState();
                // 计算可用许可数
                final int next = current + releases;
                if (next < current) {
                    throw new Error("Maximum permit count exceeded");
                }
                // 更新许可数
                if (compareAndSetState(current, next)) {
                    return true;
                }
            }
        }

        /**
         *  递减 reductions 个许可
         * created by ZXD at 15 Dec 2018 T 11:46:19
         * @param reductions
         */
        final void reducePermits(int reductions) {
            for (;;) {
                final int current = getState();
                final int next = current - reductions;
                if (next > current) {
                    throw new Error("Permit count underflow");
                }
                if (compareAndSetState(current, next)) {
                    return;
                }
            }
        }

        /**
         *  一次性获取全部许可
         * created by ZXD at 15 Dec 2018 T 11:46:41
         * @return
         */
        final int drainPermits() {
            for (;;) {
                // 读取当前许可数
                final int current = getState();
                // 如果不是 0,则将其置为 0
                if (current == 0 || compareAndSetState(current, 0)) {
                    // 返回读取到的许可数
                    return current;
                }
            }
        }
    }

    /**
     *  非公平版本
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        @Override
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     *  公平版本
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        @Override
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 如果已经有线程在阻塞等待获取许可,则不允许获取
                if (hasQueuedPredecessors()) {
                    return -1;
                }
                final int available = getState();
                final int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }
    }

    /**
     *  创建一个持有 permits 个许可的非公平信号量
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 创建一个持有 permits 个许可的
     * true:公平信号量
     * false:公平信号量
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

获取许可

  • 尝试获取一个许可,如果无许可可用,则阻塞等待,支持中断
    /**
     *  尝试获取一个许可,如果无许可可用,则阻塞等待
     *  1)获取到一个许可
     *  2)线程被中断
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
  • 尝试获取 permits 个许可,如果无许可可用,则阻塞等待,支持中断
    /**
     *  尝试获取 permits 个许可,如果无许可可用,则阻塞等待
     *  1)获取到一个许可
     *  2)线程被中断
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        sync.acquireSharedInterruptibly(permits);
    }
  • 尝试获取一个许可,如果无许可可用,则阻塞等待,不支持线程中断
    /**
     *  尝试获取一个许可,如果无许可可用,则阻塞等待,不支持线程中断
     *  1)获取到一个许可
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
  • 尝试获取 permits 个许可,如果无许可可用,则阻塞等待,不支持中断
    /**
     *  尝试获取 permits 个许可,如果无许可可用,则阻塞等待,不支持中断
     *  1)获取到一个许可
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        sync.acquireShared(permits);
    }
  • 如果有许可可用,则一次性获取所有的许可,并返回许可数,否则返回 0
    /**
     *  如果有许可可用,则一次性获取所有的许可,并返回许可数,否则返回 0
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

释放许可

  • 将一个许可归还给信号量
    /**
     *  将一个许可归还给信号量
     */
    public void release() {
        sync.releaseShared(1);
    }
  • 将 permits 个许可归还给信号量
    /**
     * 将 permits 个许可归还给信号量
     */
    public void release(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException();
        }
        sync.releaseShared(permits);
    }

原文地址:https://www.cnblogs.com/zhuxudong/p/10122899.html

时间: 2024-10-05 06:15:53

Semaphore 源码分析的相关文章

Java - &quot;JUC&quot; Semaphore源码分析

Java多线程系列--"JUC锁"11之 Semaphore信号量的原理和示例 Semaphore简介 Semaphore是一个计数信号量,它的本质是一个"共享锁". 信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可. Java并发提供了两种加锁模式:共享锁和独占锁.前面LZ介绍的Reent

并发编程(七)——AbstractQueuedSynchronizer 之 CountDownLatch、CyclicBarrier、Semaphore 源码分析

这篇,我们的关注点是 AQS 最后的部分,共享模式的使用.本文先用 CountDownLatch 将共享模式说清楚,然后顺着把其他 AQS 相关的类 CyclicBarrier.Semaphore 的源码一起过一下. CountDownLatch CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用,这是一个高频使用的类.使用方法在前面一篇文章中有介绍 并发编程(二)—— CountDownLatch.CyclicBarrier和Semaphore 使用例子 我们看下 Do

Semaphore源码分析

源码解析 Semaphore有两种模式,公平模式和非公平模式.公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO:而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程. 构造方法 Semaphore有两个构造方法,如下:        public Semaphore(int permits) {         sync = new NonfairSync(permits);     }    public Sem

【JUC】JDK1.8源码分析之Semaphore(六)

一.前言 分析了CountDownLatch源码后,下面接着分析Semaphore的源码.Semaphore称为计数信号量,它允许n个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源.下面开始分析Semaphore的源码. 二.Semaphore的数据结构 分析源码可以知道,Semaphore底层是基于AbstractQueuedSynchronizer来实现的,所以,Semaphore的数据结构也依托于AQS的数据结构,在前面对AQS的分析中

Java并发包中Semaphore的工作原理、源码分析及使用示例

1. 信号量Semaphore的介绍 我们以一个停车场运作为例来说明信号量的作用.假设停车场只有三个车位,一开始三个车位都是空的.这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦.以后来的车必须在入口等待,直到停车场中有车辆离开.这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复. 在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用.信号量是一个非负整数,表示了当前公共资源的可用数目(在上面的

【JDK】JDK源码分析-Semaphore

概述 Semaphore 是并发包中的一个工具类,可理解为信号量.通常可以作为限流器使用,即限制访问某个资源的线程个数,比如用于限制连接池的连接数. 打个通俗的比方,可以把 Semaphore 理解为一辆公交车:车上的座位数(初始的“许可” permits 数量)是固定的,行驶期间如果有人上车(获取许可),座位数(许可数量)就会减少,当人满的时候不能再继续上车了(获取许可失败):而有人下车(释放许可)后就空出了一些座位,其他人就可以继续上车了. 下面具体分析其代码实现. 代码分析 Semapho

Java显式锁学习总结之五:ReentrantReadWriteLock源码分析

概述 我们在介绍AbstractQueuedSynchronizer的时候介绍过,AQS支持独占式同步状态获取/释放.共享式同步状态获取/释放两种模式,对应的典型应用分别是ReentrantLock和Semaphore,AQS还可以混合两种模式使用,读写锁ReentrantReadWriteLock就是如此. 设想以下情景:我们在系统中有一个多线程访问的缓存,多个线程都可以对缓存进行读或写操作,但是读操作远远多于写操作,要求写操作要线程安全,且写操作执行完成要求对当前的所有读操作马上可见. 分析

JDK源码分析之concurrent包(一) -- Executor架构

Java5新出的concurrent包中的API,是一些并发编程中实用的的工具类.在高并发场景下的使用非常广泛.笔者在这做了一个针对concurrent包中部分常用类的源码分析系列.本系列针对的读者是已经对并发包中的Executor框架和工具类有所了解并懂得如何使用的人群,如果对并发包还不了解的朋友,请先做些了解.网上对这方面的讲述有丰富的资源. 本篇博文是第一期,首先对Executor架构做一个概述.这里只简单介绍接口和类的继承.使用关系. 盗用一张类图来描述结构: 解析: Executor是

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一