CountDownLatch 闭锁、Semaphore信号量、Barrier栅栏

  同步工具类可以是任何一个对象。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)、以及闭锁(Latch)。

  所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

1.闭锁

  闭锁是一种同步工具类,可以延迟线程进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远打开。闭锁可以用来确保某些活动直到其他活动都完成才继续执行。

  CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组线程。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown递减计数器,表示一个事件已经发生,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

查看源码发现:我们传进去的参数相当于内部Sync的状态,每次调用countDown的时候将状态值减一,状态值为0表示结束状态(await会解除阻塞)

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

查看sync的源码:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }
    ...
}

例如:实现一个统计多个线程并发执行任务的用时功能:

  当线程执行run中代码的时候会阻塞到startLatch.await(); 直到主线程调用startLatch.countDown(); 将计数器减一。这时所有线程开始执行任务。

  当线程执行完的时候endLatch.countDown();将结束必锁的计数器减一,此时主线程阻塞在endLatch.await();,直到5个线程都执行完主线程也解除阻塞。

package cn.qlq.thread.tone;

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author Administrator
 *
 */
public class Demo4 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class);

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch endLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            Thread.sleep(1 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        startLatch.await();// 起始闭锁的计数器阻塞等到计数器减到零(标记第一个线程开始执行)
                        Thread.sleep(1 * 1000);
                        endLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        // 实现计时
        long startTime = System.nanoTime();
        startLatch.countDown();// 将起始闭锁的计数器减一
        endLatch.await();// 结束闭锁阻塞直到计数器为零
        long endTime = System.nanoTime();
        LOGGER.error("结束,用时{}", endTime - startTime);
    }
}

2.Semaphore 信号量

  计数信号量(counting Semaphore)用来控制同时访问某个资源的数量,或者同时执行某个操作的数量。计数信号量还可以实现某种资源池,或者对容器实施边界。

  信号量是1个的Semaphore意味着只能被1个线程占用,可以用来设计同步(相当于互斥锁)。信号量大于1的Semaphore可以用来设计控制并发数,或者设计有界容器。

  Semaphore中管理者一组虚拟的许可(permit),许可的初始数量可由构造函数指定。在执行操作时首先获得许可(只要还有剩余的许可),并在使用后释放。如果没有许可,那么acquire将会一直阻塞直到有许可(或者直到中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有了这个唯一的许可谁就拥有了互斥锁。

例如:例如信号量构造一个有界阻塞容器:

  信号量的计数值初始化为容器的最大值。add操作在向底层容器添加一个元素之前,首先要获取一个许可。如果add没有添加任何元素,那么会立刻释放信号量。同样,remove操作释放一个许可,使更多的元素能加到容器中。

class BoundedHashSet<T> {
    private Set<T> set;
    private Semaphore semaphore;

    public BoundedHashSet(int bound) {
        set = Collections.synchronizedSet(new HashSet());
        semaphore = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        semaphore.acquire();// 尝试获取信号量
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {// 如果添加失败就释放信号量,添加成功就占用一个信号量
                semaphore.release();
            }
        }
    }

    public boolean remove(T o) throws InterruptedException {
        boolean remove = set.remove(o);
        if (remove)// 如果删除成功之后就释放一个信号量
            semaphore.release();
        return remove;
    }
}

测试代码:

        BoundedHashSet<String> boundedHashSet = new BoundedHashSet<String>(3);
        System.out.println(boundedHashSet.add("1"));
        System.out.println(boundedHashSet.add("2"));
        System.out.println(boundedHashSet.add("2"));
        System.out.println(boundedHashSet.add("3"));
        System.out.println(boundedHashSet.add("4"));// 将会一直阻塞到这里
        System.out.println("=========");

结果:(JVM不会关闭)

注意:

1.Semaphore可以指定公平锁还是非公平锁,默认是非公平锁

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

2.acquire方法和release方法是可以有参数的,表示获取/返还的信号量个数

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

3. Barrier栅栏

  栅栏(Barrier)类似于闭锁(一种同步工具,可以延迟线程直到其达到其终止状态),它能阻塞一组线程直到某个事件发生。栅栏与闭锁的区别在于所有线程必须同时到达栅栏位置,才能继续执行。闭锁等于等待事件,而栅栏用于等待其他线程。栅栏可以用于实现一些协议,例如几个家庭成员决定在某个地方集合:"所有人6:00到达目的地,然后讨论下一步的事情"。

3.1  CyclicBarrier栅栏(循环屏障)

  CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题划分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞到所有线程到达栅栏位置。如果所有线程都到达栅栏,那么栅栏将打开所有线程被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功的通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以用这些索引号"选举"产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功的通过栅栏时会(在一个子线程)执行它,但在阻塞过程被释放之前是不能执行的。

  CyclicBarrier的构造方法可以传入参与的数量(也就是被栅栏拦截的线程的数量),也可以传入一个Runnable对象。

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

例如:

package cn.qlq.thread.tone;

import java.util.concurrent.CyclicBarrier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author Administrator
 *
 */
public class Demo2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);

    public static void main(String[] args) throws InterruptedException {
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        for (int i = 0; i < 4; i++) {
            Thread.sleep(2 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                }
            }).start();
        }
    }
}

结果:

18:08:00 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:04 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2

  00的时候0线程到达栅栏进入阻塞,02的时候1线程到达栅栏,由于栅栏的参与者是2所以此时相当于所有线程到达栅栏,栅栏放开,然后栅栏被重置。

  04的时候2线程到达栅栏进入阻塞,06的时候3线程到达栅栏,由于栅栏的参与者是2所以此时相当于所有参与者线程到达栅栏,然后栅栏放开。

我们将栅栏的参与者改为5查看结果:

final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

结果:4个线程会阻塞到await方法处,而且JVM不会关闭,因为栅栏的参与者不够5个所以被一直阻塞。

3.2  Exchanger

  Exchanger相当于一个两方(Two-party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger非常有用。例如:一个线程向缓冲区写东西,另一个线程从缓冲区读数据Exchanger相当于参与者只有两个的CyclicBarrier。

  两个线程会阻塞在exchanger.exchange方法上,泛型可以指定其交换的数据类型。

例如:两个线程交换自己的线程名称

package cn.qlq.thread.tone;

import java.util.concurrent.Exchanger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author Administrator
 *
 */
public class Demo3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);

    public static void main(String[] args) throws InterruptedException {
        final Exchanger<String> exchanger = new Exchanger<String>();// 泛型指定交换的数据
        for (int i = 0; i < 4; i++) {
            Thread.sleep(2 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                    try {
                        String exchange = exchanger.exchange(Thread.currentThread().getName());
                        LOGGER.error("threadName -> {},exchange->{}", Thread.currentThread().getName(), exchange);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                }
            }).start();
        }
    }
}

结果:

18:28:33 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-0
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-1,exchange->Thread-0
18:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-0,exchange->Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-0
18:28:37 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-3,exchange->Thread-2
18:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-2,exchange->Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2

原文地址:https://www.cnblogs.com/qlqwjy/p/10251610.html

时间: 2024-08-02 08:10:17

CountDownLatch 闭锁、Semaphore信号量、Barrier栅栏的相关文章

java并发之CountDownLatch、Semaphore和CyclicBarrier

JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch.Semaphore和CyclicBarrier. CountDownLatch CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行. 这个类里主要有两个方法,一个是向下减计数器的方法:countdown(),其实现的核心代码如下: public boolean tryReleaseShared(int release

转发---[沧海拾遗]java并发之CountDownLatch、Semaphore和CyclicBarrier

JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch.Semaphore和CyclicBarrier. CountDownLatch CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行. 这个类里主要有两个方法,一个是向下减计数器的方法:countdown(),其实现的核心代码如下: public boolean tryReleaseShared(int release

并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)

目录 1. 前言 2. 闭锁CountDownLatch 2.1 CountDownLatch功能简介 2.2 使用CountDownLatch 2.3 CountDownLatch原理浅析 3.循环屏障CyclicBarrier 3.1 CyclicBarrier功能简介 3.2 使用CyclicBarrier 3.3 CyclicBarrier原理浅析 4. 信号量Semaphore 4.1 Semaphore功能简介 4.2 使用Semaphore进行最大并发数的控制 4.3 Semaph

CountDownLatch/CyclicBarrier/Semaphore 使用过吗?

CountDownLatch/CyclicBarrier/Semaphore 使用过吗?下面详细介绍用法: 一,CountDownLatch  背景; countDownLatch(同步援助)是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier(同步援助).Semaphore(计数信号量).concurrentHashMap和BlockingQueue(阻塞队列). 存在于java.util.cucurrent包下.   概念理解: 让一些线程阻塞,直到另外一些线程完成

Java多线程20:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger

前言 在多线程环境下,JDK给开发者提供了许多的组件供用户使用(主要在java.util.concurrent下),使得用户不需要再去关心在具体场景下要如何写出同时兼顾线程安全性与高效率的代码.之前讲过的线程池.BlockingQueue都是在java.util.concurrent下的组件,Timer虽然不在java.util.concurrent下,但也算是.后两篇文章将以例子的形式简单讲解一些多线程下其他组件的使用,不需要多深刻的理解,知道每个组件大致什么作用就行. 本文主要讲解的是Cou

聊聊高并发(二十六)解析java.util.concurrent各个组件(八) 理解CountDownLatch闭锁

CountDownLatch闭锁也是基于AQS实现的一种同步器,它表示了"所有线程都等待,直到锁打开才继续执行"的含义.它和Semaphore的语意不同, Semaphore的获取和释放操作都会修改状态,都可能让自己或者其他线程立刻拿到锁.而闭锁的获取操作只判断状态是否为0,不修改状态本身,闭锁的释放操作会修改状态,每次递减1,直到状态为0. 所以正常情况下,闭锁的获取操作只是等待,不会立刻让自己获得锁,直到释放操作把状态变为0. 闭锁可以用来实现很多场景,比如: 1. 某个服务依赖于

009-多线程-锁-JUC锁-Semaphore 信号量

一.概述 Semaphore是一个计数信号量.从概念上将,Semaphore包含一组许可证.如果有需要的话,每个acquire()方法都会阻塞,直到获取一个可用的许可证.每个release()方法都会释放持有许可证的线程,并且归还Semaphore一个可用的许可证.然而,实际上并没有真实的许可证对象供线程使用,Semaphore只是对可用的数量进行管理维护. 信号量机制是一种有限数量的共享模式锁.控制临界资源超出范围的一种手段.可用于流量控制,限制最大的并发访问数. Semaphore是通过共享

windows系统调用 semaphore信号量

1 #include "iostream" 2 #include "windows.h" 3 #include "cstring" 4 using namespace std; 5 6 HANDLE g_hSemThreads=INVALID_HANDLE_VALUE; 7 8 static DWORD WINAPI ThreadProc(LPVOID lpParam){ 9 LONG nPauseMs=reinterpret_cast<L

Semaphore信号量

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /**  * 信号量  *  * @author Vincent Zhao  * @version 1.0.0  * @Time 2015/4/20 15:57  */ public class SemaphoreDemo {     public st