JDK 源码解析 —— Semaphore

零. 简介

这是一个用来对并发计数的信号量,并发量超过一定数值则只能等待。从概念上来说,semaphore 维持着一组许可证。获取锁的时候,需要先获得 semaphore 的许可才行。

一. 从 Demo 解析源码

package com.wenniuwuren.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by zhuyb on 16/5/1.
 */
public class SemaphoreTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        Semaphore semaphore = new Semaphore(5);

        for (int i = 0; i < 10; i++) {
            int count = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire(); // 获取许可
                        System.out.println("当前循环:" + count);
                        System.out.println("当前还剩多少许可数量:" + semaphore.availablePermits());
                        Thread.sleep(10000);
                        semaphore.release(); // 释放占用的许可
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            };
            executorService.execute(runnable);
        }
        executorService.shutdown();
    }
}

(1) 先看 Semaphore semaphore = new Semaphore(5) 的构造函数

默认使用非公平锁,调用的是继承自 AQS 的内部类 NonfairSync

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

看下 NonfairSync 的具体实现:构造函数调用父类来初始化,其实就是 AQS 的 Sync 构造函数

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

AQS 的 Sync 构造函数:设置 AQS 的同步状态 stat

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);
    }

// 省略无用代码
}

AQS 的状态值由具体调用的类来定义 stat 的含义,对于 Semaphore 来说 stat 的数量含义就是可以有多少个线程并发使用某个资源

protected final void setState(int newState) {
    state = newState;
}
(2)semaphore.acquire(); // 获取许可
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

acquireSharedInterruptibly 从代码看出来是响应中断的,再看下 tryAcquireShared(arg):AQS 中 tryAcquireShared 没有具体实现,交给具体的继承类去实现

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

下面是 Semaphore 的 tryAcquireShared 具体实现:可以看到用当前的 stat 值减去传入的 1。举个具体的例子,如果是第一次调用 semaphore.acquire(), 则 available 就等于初始化  Semaphore 时候的值,然后减去 acquires=1,如果小于零就要调用 doAcquireSharedInterruptibly(arg) 这个方法是在
AQS 的 FIFO 队列中排队;如果大于零则 CAS 更新 AQS 的 stat 值,说明线程获得了 Semaphore 的许可,可以成功执行

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

(3)semaphore.release(); // 释放占用的许可

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared(arg) 和 tryAcquireShared() 一样都是需要具体类自己实现的,这样才能由该类定义 stat 的具体含义:参数 releases =1 表示释放一个 Semaphore, CAS 设置新的 stat 值

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
接下来看 tryReleaseShared 返回 true 后进入的 doReleaseShared():与互斥锁不同的是,共享锁在释放锁的时候会将共享锁释放信息向 AQS 的队列中传播这个共享锁已经释放的 SIGNAL,这样等待这个共享锁的线程就能较快地脱离 AQS 的等待队列。从 ws == Node.SIGNAL 分支执行的就是共享锁的向后传递 SIGNAL 方法 unparkSuccessor(h)。最外面的 for(;;)就是为了保证释放锁的正常进行,异常就是循环重试
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

二. 总结

Semaphore 是借助 AQS(AbstractQueuedSynchronizer) 这个同步控制器来实现共享锁的获取和释放,AQS 中的同步变量 stat 在 Semaphore 的意思就是并发的数量,线程并发数量超过这个 stat 总数,之后的线程只能进入 AQS 的等待队列直到其他线程释放这个 stat,然后公平地排队获取锁或者非公平地抢占锁。

时间: 2024-10-07 19:24:50

JDK 源码解析 —— Semaphore的相关文章

设计模式-简单工厂Coding+jdk源码解析

前面的软件设计七大原则,目前只有理论这块,因为最近参与项目重构,暂时没有时间把Coding的代码按照设计思路一点点写出来.本周周末会花时间整理出来,然后现在想的是白天更新一点并发编程,晚上回家学习设计模式.非科班出身,脑子也比较笨.博文都是自己根据学习的时候所想的思路,如果能有帮到各位的地方,那十分荣幸.如果有欠缺之处,希望能在评论中指出一起进步.好啦,开始正文了. 本套设计模式的博文,包含各种设计模式的定义.类型.适用场景及优缺点分析.并通过Coding去实际加深理论理解. 简单工厂: 该模式

JDK 源码解析 —— ConcurrentHashMap

零. 概述 ConcurrentHashMap 是将锁的范围细化来实现高效并发的. 基本策略是将数据结构分为一个一个 Segment(每一个都是一个并发可读的 hash table, 即分段锁)作为一个并发单元. 为了减少开销, 除了一处 Segment 是在构造器初始化的, 其他都延迟初始化(详见 ensureSegment). 并使用 volatile 关键字来保证 Segment 延迟初始化的可见性问题. HashMap 不是线程安全的, 故多线程情况下会出现 infinit loop.

【JDK】JDK源码分析-Semaphore

概述 Semaphore 是并发包中的一个工具类,可理解为信号量.通常可以作为限流器使用,即限制访问某个资源的线程个数,比如用于限制连接池的连接数. 打个通俗的比方,可以把 Semaphore 理解为一辆公交车:车上的座位数(初始的“许可” permits 数量)是固定的,行驶期间如果有人上车(获取许可),座位数(许可数量)就会减少,当人满的时候不能再继续上车了(获取许可失败):而有人下车(释放许可)后就空出了一些座位,其他人就可以继续上车了. 下面具体分析其代码实现. 代码分析 Semapho

JDK 源码解析 —— HashSet

零. 简介 这个类实现了 Set 接口,内部是由 HashMap 实例辅助实现的.它不保证元素的顺序,数据允许为 null. 假如 hash 方法将数据分散地比较合理,比如一个 bucket 一个数据,那么 add.remove.contains.size 性能开销是常数时间. 这个类非线程安全的,如果多线程并发访问,并且至少一个线程在做修改操作,那么必须在外部做好同步处理.例如使用:Set s = Collections.synchronizedSet(new HashSet(...)); 一

JDK 源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池

零. 简介 Executors 是 Executor.ExecutorService.ThreadFactory.Callable 类的工厂和工具方法. 一. 源码解析 创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销.当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲.由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行.线程池中的线程会一直存在,除非明确地 shutdown 掉. public static Exec

JDK 源码解析 —— CyclicBarrier

一. 简介 CyclicBarrier 是一个让一系列线程集合互相等待直到一个公共屏障点(barrier point)的同步辅助工具.这个屏障被称为循环屏障,是因为它可以在等待线程释放后被重用. CyclicBarrier 支持一个可选的 Runnable 命令,在最后一个线程到达后执行一次 Runnable 命令. 二. 简单使用示例 CyclicBarrier(3) 等到 3 个线程都到了,这个对象还可以重用,而 CountDownLatch 则不能重用,从 Cyclic 名字就可以看出这个

JDK 源码解析 —— Integer

零. 简介 对于 Integer 这个 Java 程序员几乎天天使用的类, 使用上却可以看出普通程序员和优秀程序员区别. 一. 深入代码 在创建数字 1 的对象时, 大多数人会使用 new Integer(1), 而使用 Integer.valueOf(1) 可以使用系统缓存,既减少可能的内存占用,也省去了频繁创建对象的开销. 系统默认只缓存 -128-127 之间的整数.下面我们看一下 Integer.valueOf(int) 方法的代码: public static Integer valu

Integer.parseInt不同jdk源码解析

执行以下代码: System.out.println(Integer.parseInt("-123")); System.out.println(Integer.parseInt("+123")); 以下仅提供1.6和1.7两个版本的比较  1.6版本执行结果为:    1.7版本执行结果为: 从两方面去查证结果的原因,分别是:查看API文档 和 查看对应的源代码 [查看API文档]  1.6版本对应的API文档:    1.7版本对应的API文档: 可以看出,对第

【jdk源码分析】ArrayList的size()==0和isEmpty()

先看结果 分析源码 [jdk源码解析]jdk8的ArrayList初始化长度为0 java的基本数据类型默认值 无参构造 size()方法 isEmpty()方法 原文地址:https://www.cnblogs.com/xiaostudy/p/10781148.html