【Java并发工具类】Semaphore

前言

1965年,荷兰计算机科学家Dijkstra提出的信号量机制成为一种高效的进程同步机制。这之后的15年,信号量一直都是并发编程领域的终结者。1980年,管程被提出,成为继信号量之后的在并发编程领域的第二个选择。目前几乎所有的语言都支持信号量机制,Java也不例外。Java中提供了Semaphore并发工具类来支持信号量机制。下面我们就来了解Java实现的信号量机制。
首先介绍信号量模型,然后介绍如何使用,最后使用信号量来实现一个限流器。

信号量模型

信号量模型图(图来自参考[1]):

信号量模型总结为:一个计数器、一个等待队列和三个对外调用的方法。
计数器和等待队列时对外透明的,所有我们只能通过三个对外方法来访问计数器和等待队列。
init():设置计数器的初始值。
down():计数器的值减一。如果此时计数器的值小于0,则当前线程插入等待队列并阻塞,否则当前线程可以继续执行。
up():计数器的值加一。如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

这三个方法都是原子性的,由实现信号量模型的方法保证。在Java SDK中,信号量模型是由java.util.concurrent.Semaphore实现。

信号量模型代码化大致类似如下:

class Semaphore{
    int count; // 计数器
    Queue queue; // 等待队列

    // 初始化操作
    Semaphore(int c){
        this.count=c;
    }

    void down(){
        this.count--; // 计数器值减一
        if(this.count < 0){
            // 将当前线程插入等待队列
            // 阻塞当前线程
        }
    }

    void up(){
        this.count++; // 计数器值加一
        if(this.count <= 0) {
            // 移除等待队列中的某个线程T
            // 唤醒线程T
        }
    }
}

在信号量模型中,down()up()这两个操作也被成为P操作(荷兰语proberen,测试)和V操作(荷荷兰语verhogen,增加)。在我学的操作系统教材中(C语言实现),P操作对应wait(),V操作对应singal()。虽然叫法不同,但是语义都是相同的。在Java SDK并发包中,down()up()分别对应于Semaphore中的acquire()release()

如何使用信号量

信号量有时也被称为红绿灯,我们想想红绿灯时怎么控制交通的,就知道该如何使用信号量。车辆路过十字路时,需要先检查是否为绿灯,如果是则通行,否则就等待。想想和加锁机制有点相似,都是一样的操作,先检查是否符合条件(“尝试获取”),符合(“获取到”)则线程继续运行,否则阻塞线程。

下面使用累加器的例子来说明如何使用信号量。

count+=1操作是个临界区,只允许一个线程执行,即要保证互斥。于是我们在进入临界区之前,使用down()即Java中的acquire(),在退出之后使用up()即Java中的release()。

static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1); // 构造函数参数为1,表示只允许一个线程进行临界区。可实现一个互斥锁的功能。
//用信号量保证互斥
static void addOne() {
    s.acquire(); // 获取一个许可(可看作加锁机制中加锁)
    try {
        count+=1;
    } finally {
        s.release(); // 归还许可(可看做加锁机制中解锁)
    }
}

完整代码如下:

package com.sakura.concrrent;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
    static int count;
    static final Semaphore s = new Semaphore(1);
    static void addOne() throws InterruptedException {
        //只会有一个线程将信号量中的计数器减为1,而另外一个线程只能将信号量中计数器减为-1,导致被阻塞
        s.acquire();??
        try {
            count +=1;
            System.out.println("Now thread is " + Thread.currentThread() + "???and count is " + count);
        }finally {
            //进入临界区的线程在执行完临界区代码后将信号量中计数器的值加1然后,此时信号量中计数器的值为0,则从阻塞队列中唤醒被阻塞的进程
            s.release();???
        }
    }

    public static void main(String[] args) {
        // 创建两个线程运行
        MyThread thread1 = new MyThread();
        MyThread thread2 = new MyThread();

        thread1.start();
        thread2.start();
        System.out.println("main thread");

    }
}
class MyThread extends Thread{
    @Override
    public void run() {
        super.run();
        for(int i=0; i<10; i++) {???????????????????
            try {
                SemaphoreTest.addOne();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

如果Semaphore的构造函数参数(许可数量,内置计数器的值)修改一下:

static final Semaphore s = new Semaphore(2);

计数器值的为2,那么就允许有两个线程进入临界区,我们的count值就会出现问题

快速实现一个限流器

当设置信号量的计数器为1时,可实现一个简单的互斥锁功能。但是,我们前面刚介绍过Java SDK中的Lock,Semaphore的用途显然不会与Lock一致,不然就重复造轮子了。Semaphore最重要的一个功能便是:可以允许多个线程访问一个临界区。(上述例子我们就设置了计数器的值为2,可发现thread1和thread2都可进入临界区。)

我们会在什么地方遇见这种需求呢?
各种池化资源,例如连接池、对象池、线程池等等。例如,数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池,当然,每个连接在被释放之前,是不允许其他线程使用的。

我们设计如下可以允许N个线程使用的对象池,我们将信号量的计数器值设为N,就可以让N个线程同时进行临界区,多余的就会被阻塞。(代码来自参考[1])

class ObjPool<T, R> {
    final List<T> pool;? ? //使用List保存实例对象
    // 用信号量实现限流器
    final Semaphore sem;

    // 构造函数
    ObjPool(int size, T t){
        pool = new Vector<T>(){};
        for(int i=0; i<size; i++){
            pool.add(t);
        }
        sem = new Semaphore(size);
    }

    // 获取对象池的对象,调用 func
    R exec(Function<T,R> func) {
        T t = null;
        sem.acquire();? ? //允许N个进程同时进入临界区
        try {
            //我们需要注意,因为多个进行可以进入临界区,所以Vector的remove方法是线程安全的
            t = pool.remove(0);? ??
            return func.apply(t);? ? //获取对象池汇中的一个对象后,调用func函数
        } finally {
            pool.add(t);? ? //离开临界区之前,将之前获取的对象放回到池中
            sem.release();? ? //使得计数器加1,如果信号量中计数器小于等于0,那么说明有线程在等待,此时就会自动唤醒等待线程
        }
    }
}
// 创建对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);

// 通过对象池获取 t,之后执行??
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

小结

记得学习操作系统时,信号量类型分为了好几种整型信号量、记录型信号量、AND信号量以及“信号量集”(具体了解可戳参考[2])。我认为Java SDK中Semaphore应该是记录型信号量的实现。不由想起,编程语言是对OS层面操作的一种抽象描述。这句话需要品需要细细品。

参考:
[1] 极客时间专栏王宝令《Java并发编程实战》
[2] 静水深流.操作系统之信号量机制总结.https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.html

原文地址:https://www.cnblogs.com/myworld7/p/12315393.html

时间: 2024-11-10 18:26:49

【Java并发工具类】Semaphore的相关文章

25.大白话说java并发工具类-CountDownLatch,CyclicBarrier,Semaphore,Exchanger

1. 倒计时器CountDownLatch 在多线程协作完成业务功能时,有时候需要等待其他多个线程完成任务之后,主线程才能继续往下执行业务功能,在这种的业务场景下,通常可以使用Thread类的join方法,让主线程等待被join的线程执行完之后,主线程才能继续往下执行.当然,使用线程间消息通信机制也可以完成.其实,java并发工具类中为我们提供了类似"倒计时"这样的工具类,可以十分方便的完成所说的这种业务场景. 为了能够理解CountDownLatch,举一个很通俗的例子,运动员进行跑

Java并发工具类 - CountDownLatch

Java并发工具类 - CountDownLatch 1.简介 CountDownLatch是Java1.5之后引入的Java并发工具类,放在java.util.concurrent包下面 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html 官方API. CountDownLatch能够使一个或多个线程等待其他线程完成各自的工作后再执行:CountDownLatch是JDK 5+里面

Java并发工具类CyclicBarrier

CyclicBarrier同步屏障 java并发工具类中有一个叫做CyclicBarrier的类,与CountDownLatch类似,都可以实现线程间的同步,但是差别是CyclicBarrier是可重置的同步屏障. 想象一个场景,有N个人不同时间走到一扇门,因为门需要N个人合力才能推开,所以人不足N个时,只能阻塞在此,等到N个人都到了之后,可以推开门,继续进行之前的工作.CyclicBarrier就是这扇门. 看看下面的代码,定义了一个线程数为2的,CyclicBarrier,并在主线程和另外一

【Java并发工具类】Java并发容器

前言 Java并发包有很大一部分都是关于并发容器的.Java在5.0版本之前线程安全的容器称之为同步容器.同步容器实现线程安全的方式:是将每个公有方法都使用synchronized修饰,保证每次只有一个线程能访问容器的状态.但是这样的串行度太高,将严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低.因此,在Java 5.0版本时提供了性能更高的容器来改进之前的同步容器,我们称其为并发容器. 下面我们先来介绍Java 5.0之前的同步容器,然后再来介绍Java 5.0之后的并发容器. Ja

Java并发工具类(三)控制并发线程数的Semaphore

作用 Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源. 简介 Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制.使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数. 主要方法摘要: void acquire():从此信号量获取一个许可,在提供一个许可前翼子将线程阻塞,否则线程被中断. void release():释放一个许可,将其返回给信号量. in

Java并发工具类(四):线程间交换数据的Exchanger

简介 Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.它提供一个同步点,在这个同步点两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方. Exchanger的应用场景 1.Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时

Java并发工具类(一)等待多线程完成的CountDownLatch

作用 CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行 简介 CountDownLatch是在java1.5被引入的,存在于java.util.concurrent包下,它允许1个或者多个线程一直等待,直到一组操作执行完成.它初始一个整数值,此值是线程将要等待的操作数.当某个线程为了想要执行这些操作而等待时, 它要使用 await()方法.此方法让线程进入休眠直到操作完成. 当某个操作结束,它使用countDown() 方法来减少Cou

Java并发工具类(四)线程间交换数据的Exchanger

简介 Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.它提供一个同步点,在这个同步点两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方. Exchanger的应用场景 1,Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时

【多线程与并发】Java并发工具类

主要有两类 ①并发流程控制相关:CountDownLatch.CyclicBarrier.Semaphore ②线程间交换数据相关:Exchanger: CountDownLatch 作用:允许一个或多个线程等待其他线程完成操作 使用步骤: ①定义一个CountDownLatch(称为计数器),并指定等待次数: ②在合适的时机将计数器减1: ③在需要等待所有任务结束的位置,调用await()方法: 根据JDK中的说明文档整理的两个例子: 例子1: public class CountDownLa