CountDownLatch/CyclicBarrier/Semaphore 使用过吗?

CountDownLatch/CyclicBarrier/Semaphore 使用过吗?下面详细介绍用法:

一,CountDownLatch

 背景;

  • countDownLatch(同步援助)是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier(同步援助)、Semaphore(计数信号量)、concurrentHashMap和BlockingQueue(阻塞队列)。
  • 存在于java.util.cucurrent包下。

  概念理解:

让一些线程阻塞,直到另外一些线程完成一系列操作后才被唤醒。

主要方法介绍

     通过一个计数器来实现的,计数器的初始值是线程的数量。await方法会被阻塞,每当一个线程执行完毕后,countDown方法会让计数器的值-1,不会阻塞,当计数器的值为0时,表示其他线程都执行完毕后,调用await方法的线程会被唤醒,继续执行。

源码:

       (1),countDownLatch类中只提供了一个构造器:

CountDownLatch(int count)
构造一个 CountDownLatch初始化与给定的数。 

(2)类中有三个方法是最重要的:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//将count值减1
public void countDown() { }; 

使用举例:

 public static void CountDownLatchUsed() throws InterruptedException {
        CountDownLatch downLatch = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
          new Thread(()->{
      System.out.println(Thread.currentThread().getName()+"\t 下自习走人");
                downLatch.countDown();
            },String.valueOf(i)).start();
        }
        downLatch.await();
    System.out.println(Thread.currentThread().getName()+"自习室关门走人");
    }

运行结果:

小结:CountDownLatch和CyclicBarrier区别:

  1. countDownLatch是一个计数器,线程完成一个记录一个,计数器递减,只能使用一次。
  2. CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器从0开始递增,同时提供reset()方法初始化计数值,可以多次使用。

了解更多:https://blog.csdn.net/chenssy/article/details/49794141#commentBox

二,CyclicBarrier

概念理解:

CyclicBarrier字面上就是可循环使用的屏障。当一组线程得到一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续工作。进入屏障通过await方法。

主要方法介绍:

           

CyclicBarrier的内部是使用重入锁ReentrantLock和Condition。它有两个构造函数:    

  • CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的线程处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。(直到到达屏障时才会去执行)
  • CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的线程处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

说明:

(1)parties:拦截线程的数量:

(2)barrierAction:为CyclicBarrier接收的Runnable命令,用于在线程到达屏障时,优先执行barrierAction ,用于处理更加复杂的业务场景。

源码:

  private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
  public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

在CyclicBarrier中最重要的方法莫过于await()方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。如下:

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

await()方法内部调用dowait(boolean timed, long nanos)方法:

其实await()的处理逻辑还是比较简单的:如果该线程不是到达的最后一个线程,则他会一直处于等待状态,除非发生以下情况:

  1. 最后一个线程到达,即index == 0
  2. 超出了指定时间(超时等待)
  3. 其他的某个线程中断当前线程
  4. 其他的某个线程中断另一个等待的线程
  5. 其他的某个线程在等待barrier超时
  6. 其他的某个线程在此barrier调用reset()方法。reset()方法用于将屏障重置为初始状态。

应用场景:

   CyclicBarrier试用与多线程结果合并的操作,用于多线程计算数据,最后合并计算结果的应用场景。比如我们需要统计多个Excel中的数据,然后等到一个总结果。我们可以通过多线程处理每一个Excel,执行完成后得到相应的结果,最后通过barrierAction来计算这些线程的计算结果,得到所有Excel的总和。

应用实例:

比如开会要等所有人到齐了,才会开会:

public class BlockQueueDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到齐了吧,咱们开会。。。");
            }
        });
        for (int i = 1; i <= 4; i++) {
            final int tempInt = i;
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "\t" + tempInt + "已到位");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

运行结果:

了解更多:https://blog.csdn.net/chenssy/article/details/70160595

三,Semaphore 信号量

概念理解:

信号量Semaphore是一个控制访问多个共享资源的计数器,它本质上是一个“共享锁”。

理论知识

(1) Java并发提供了两种加锁模式,共享锁和独占锁。对于独占锁,他每次只能有一个线程持有,而共享锁则不同,它允许多个线程并行持有锁,并发的访问共享资源。

(2)独占锁采用的是一种悲观的加锁策略,对于写而言为冲突是必须的,但对于读而言可以进行共享,因为它不印象数据的一致性,如果某个只读线程获取独占锁,则其他读线程都只能等待了,这种情况下就限制了不必要的并发性,降低了吞吐量。而共享锁则不同,它放宽了加锁的条件,采用了乐观锁机制,它是允许多个读线程同时访问同一个共享资源的。

Semaphore官方解释:

一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

(3)Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

CountDownLatch和Semephore优缺点对比

     CountDownLatch的问题是不能复用。比如count=3,那么加到3,就不能继续操作了。而Semaphore可以解决这个问题,比如6辆车3个停车位,对于CountDownLatch只能停3辆车,而Semaphore可以停6辆车,车位空出来后,其它车可以占有,这就涉及到了Semaphore.accquire()Semaphore.release()方法。

    生活实例版本解释:

我们假设只有三个理发师、一个接待人。一开始来了五个客人,接待人则安排三个客人进行理发,其余两个人必须在那里等着,此后每个来理发店的人都必须等待。一段时间后,一个理发师完成理发后,接待人则安排另一个人(公平还是非公平机制呢??)来理发。在这里理发师则相当于公共资源,接待人则相当于信号量(Semaphore),客户相当于线程。

深入理解:

进一步讲,我们确定信号量Semaphore是一个非负整数(>=1)。当一个线程想要访问某个共享资源时,它必须要先获取Semaphore,当Semaphore >0时,获取该资源并使Semaphore – 1。如果Semaphore值 = 0,则表示全部的共享资源已经被其他线程全部占用,线程必须要等待其他线程释放资源。当线程释放资源时,Semaphore则+1;

当信号量Semaphore = 1 时,它可以当作互斥锁使用。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。

源码分析

(1)Semaphore的结构如下:

      

从上面可以看出,Semaphore和ReentrantLock一样,都是包含公平锁(FairySync)和非公平锁(NonfairSync),两个锁都是继承Sync,而Sync也是继承自AQS。其构造函数如下:

/**
     * 创建具有给定的许可数和非公平的公平设置的 Semaphore。  (默认是非公平锁)
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 创建具有给定的许可数和给定的公平设置的 Semaphore。
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

(2)信号量的获取:acquire()



在ReentrantLock中已经阐述过,公平锁和非公平锁获取锁机制的差别:对于公平锁而言,如果当前线程不在CLH队列(CLH锁是一个自旋锁。能确保无饥饿性。提供先来先服务的公平性)的头部,则需要排队等候,而非公平锁则不同,它无论当前线程处于CLH队列的何处都会直接获取锁。所以公平信号量和非公平信号量的区别也一样。

关于CLH 自旋队列锁 更多知识:https://blog.csdn.net/aesop_wubo/article/details/7533186

源码:

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

对于公平信号量和非公平信号量,他们机制的差异就体现在traAcquireShared()方法中:

公平锁 tryAcquireShared() 尝试获取信号量

源码:

protected int tryAcquireShared(int acquires) {
        for (;;) {
            //判断该线程是否位于CLH队列的列头,如果是的话返回 -1,调用doAcquireSharedInterruptibly()
            if (hasQueuedPredecessors())
                return -1;
            //获取当前的信号量许可
            int available = getState();
            //设置“获得acquires个信号量许可之后,剩余的信号量许可数”
            int remaining = available - acquires;

            //如果剩余信号量 > 0 ,则设置“可获取的信号量”为remaining
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
    }

注意 : tryAcquireShared是尝试获取 信号量,remaining表示下次可获取的信号量。

(2)hasQueuedPredecessors() 是否有队列同步器



对于hasQueuedPredecessors、compareAndSetState在ReentrantLock中已经阐述了,hasQueuedPredecessors用于判断该线程是否位于CLH队列列头,compareAndSetState用于设置state的,它是进行原子操作的。代码如下:

public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

(3) doAcquireSharedInterruptibly源代码如下:

private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            /*
             * 创建CLH队列的node节点,Node.SHARED表示该节点为共享锁
             */
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    //获取该节点的前继节点
                    final Node p = node.predecessor();
                    //当p为头节点时,基于公平锁机制,线程尝试获取锁
                    if (p == head) {
                        //尝试获取锁
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //判断当前线程是否需要阻塞,如果阻塞的话,则一直处于阻塞状态知道获取共享锁为止
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

doAcquireSharedInterruptibly主要是做两个工作;1、尝试获取共享锁,2、阻塞线程直到线程获取共享锁。

非公平锁

对于非公平锁就简单多了,她没有那些所谓的要判断是不是CLH队列的列头,如下:

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

在非公平锁中,tryAcquireShared直接调用AQS的nonfairTryAcquireShared()。通过上面的代码我可看到非公平锁并没有通过if (hasQueuedPredecessors())这样的条件来判断该节点是否为CLH队列的头节点,而是直接判断信号量。

信号量的释放:release()

信号量Semaphore的释放和获取不同,它没有分公平锁和非公平锁。如下:

public void release() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        //尝试释放共享锁
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

注意:release()释放线索所占有的共享锁,它首先通过tryReleaseShared尝试释放共享锁,如果成功直接返回,如果失败则调用doReleaseShared来释放共享锁。

tryReleaseShared:源码部分:

protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //设置可获取的信号许可数为next
            if (compareAndSetState(current, next))
                return true;
        }
    }

doReleaseShared:源码部分:

private void doReleaseShared() {
            for (;;) {
                //node 头节点
                Node h = head;
                //h != null,且h != 尾节点
                if (h != null && h != tail) {
                    //获取h节点对应线程的状态
                    int ws = h.waitStatus;
                    //若h节点状态为SIGNAL,表示h节点的下一个节点需要被唤醒
                    if (ws == Node.SIGNAL) {
                        //设置h节点状态
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;
                        //唤醒h节点对应的下一个节点
                        unparkSuccessor(h);
                    }
                    //若h节点对应的状态== 0 ,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;
                }
                //h == head时,则退出循环,若h节点发生改变时则循环继续
                if (h == head)
                    break;
            }
        }

参考博客:

JAVA多线程–信号量(Semaphore)

Java多线程系列--“JUC锁”11之 Semaphore信号量的原理和示例

【Java并发编程实战】—–“J.U.C”:Semaphore

原文地址:https://www.cnblogs.com/2019wxw/p/11663067.html

时间: 2024-10-28 14:44:19

CountDownLatch/CyclicBarrier/Semaphore 使用过吗?的相关文章

并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)

目录 1. 前言 2. 闭锁CountDownLatch 2.1 CountDownLatch功能简介 2.2 使用CountDownLatch 2.3 CountDownLatch原理浅析 3.循环屏障CyclicBarrier 3.1 CyclicBarrier功能简介 3.2 使用CyclicBarrier 3.3 CyclicBarrier原理浅析 4. 信号量Semaphore 4.1 Semaphore功能简介 4.2 使用Semaphore进行最大并发数的控制 4.3 Semaph

Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger

CountDownLatch允许一个或者多个线程等待其他线程完成操作,之后再对结果做统一处理: 适用场景,分布式系统中对多个微服务的调用,并发执行并且必须等待全部执行完成才能继续执行后续操作: 其实在java中默认的实现是join()方法,join()方法主要的作用是当前线程必须等待直到join线程执行完成之后才能继续执行后续的操作, 其本质就是轮询判断join线程是否存活,如果存活则主线程继续等待,否则,通过调用this.notifyAll()方法来继续执行主线程. 实例代码如下: publi

CountDownLatch CyclicBarrier Semaphore 比较

document CountDownLatch A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. CyclicBarrier A synchronization aid that allows a set of threads to all wait for each other to

java并发之CountDownLatch、Semaphore和CyclicBarrier

JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch.Semaphore和CyclicBarrier. CountDownLatch CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行. 这个类里主要有两个方法,一个是向下减计数器的方法:countdown(),其实现的核心代码如下: public boolean tryReleaseShared(int release

转发---[沧海拾遗]java并发之CountDownLatch、Semaphore和CyclicBarrier

JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch.Semaphore和CyclicBarrier. CountDownLatch CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行. 这个类里主要有两个方法,一个是向下减计数器的方法:countdown(),其实现的核心代码如下: public boolean tryReleaseShared(int release

CountDownLatch &amp; CyclicBarrier源码Android版实现解析

CountDownLatch CountDownLatch允许一条或者多条线程等待直至其它线程完成以系列的操作的辅助同步器. 用一个指定的count值对CountDownLatch进行初始化.await方法会阻塞,直至因为调用countDown方法把当前的count降为0,在这以后,所有的等待线程会被释放,并且在这以后的await调用将会立即返回.这是一个一次性行为--count不能被重置.如果你需要一个可以重置count的版本,考虑使用CyclicBarrier. 其实本类实现非常简单,和Re

Java多线程20:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger

前言 在多线程环境下,JDK给开发者提供了许多的组件供用户使用(主要在java.util.concurrent下),使得用户不需要再去关心在具体场景下要如何写出同时兼顾线程安全性与高效率的代码.之前讲过的线程池.BlockingQueue都是在java.util.concurrent下的组件,Timer虽然不在java.util.concurrent下,但也算是.后两篇文章将以例子的形式简单讲解一些多线程下其他组件的使用,不需要多深刻的理解,知道每个组件大致什么作用就行. 本文主要讲解的是Cou

Java学习笔记--并发工具Semaphore,CountDownLatch,CyclicBarrier,Exchanger

Semaphore 实现典型的信号量 CountDownLatch 在指定数量的事件发生前一直等待 CyclicBarrier 使一组线程在一个预定义的执行点等待 Exchanger 交换两个线程的数据 1. Semaphore 信号量(Semaphore),是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源 在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能

CountDownLatch,CyclicBarrier,Semaphore的使用

什么时候使用CountDownLatch CountDownLatch原理和示例 Semaphore信号量的原理和示例 CyclicBarrier的用法 CyclicBarrier 和 CountDownLatch 在用法上的不同: 1.CountDownLatch 适用于一组线程和另一个主线程之间的工作协作.一个主线程等待一组工作线程的任务完毕才继续它的执行是使用 CountDownLatch 的主要场景:CyclicBarrier 用于一组或几组线程,比如一组线程需要在一个时间点上达成一致,