java多线程并发系列之闭锁(Latch)和栅栏(CyclicBarrier)

-闭锁(Latch)

闭锁(Latch):一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。

应用场景:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须先在这个闭锁上等待。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。
  • 等待直到某个操作的所有参与者都就绪在继续执行。(例如:多人游戏中需要所有玩家准备才能开始)

CountDownLatch是JDK 5+里面闭锁的一个实现,允许一个或者多个线程等待某个事件的发生。CountDownLatch有一个正数计数器,countDown方法对计数器做减操作,await方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。


-栅栏(CyclicBarrier)

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。 栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

场景: 应用一些协议,比如几个家庭成员决定在某个地方集合,所有人在6:00在某地集合,到了以后要等待其他人,之后才能讨论去哪里吃饭。 并行迭代,将一个问题分成很多子问题,当一系列的子问题都解决之后(所有子问题线程都已经await()),此时将栅栏打开,所有子问题线程被释放,而栅栏位置可以留着下次使用。


-例子:两个分别关于CountDownlatch和CyclicBarrier的例子

1、CountDownLatch

有三个工人在为老板干活,这个老板有一个习惯,就是当三个工人把一天的活都干完了的时候,他就来检查所有工人所干的活。记住这个条件:三个工人先全部干完活,老板才检查。所以在这里用Java代码设计两个类,Worker代表工人,Boss代表老板,具体的代码实现如下:

工人:

package LatchAndCyclicBarrier;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Work implements Runnable{

        private CountDownLatch downLatch;
        private String name;

        public Work(CountDownLatch downLatch, String name){
            this.downLatch = downLatch;
            this.name = name;
        }

        public void run() {
            this.doWork();
            try{
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            }catch(InterruptedException ie){
            }
            System.out.println(this.name + "活干完了!");
            this.downLatch.countDown();

        }

        private void doWork(){
            System.out.println(this.name + "正在干活!");
        }

    }

老板:


package LatchAndCyclicBarrier;

import java.util.concurrent.CountDownLatch;

public class Boss implements Runnable{

        private CountDownLatch downLatch;

        public Boss(CountDownLatch downLatch){
            this.downLatch = downLatch;
        }

        public void run() {
            System.out.println("老板正在等所有的工人干完活......");
            try {
                this.downLatch.await();
            } catch (InterruptedException e) {
            }
            System.out.println("工人活都干完了,老板开始检查了!");
        }

    }

测试代码:

package LatchAndCyclicBarrier;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestLatch {

    public static void main(String[] args) {
            ExecutorService executor = Executors.newCachedThreadPool();

            CountDownLatch latch = new CountDownLatch(3);

            Work w1 = new Work(latch,"张三");
            Work w2 = new Work(latch,"李四");
            Work w3 = new Work(latch,"王二");

            Boss boss = new Boss(latch);

            executor.execute(w3);
            executor.execute(w2);
            executor.execute(w1);
            executor.execute(boss);

            executor.shutdown();
        }

    }

执行结果:

李四正在干活!
老板正在等所有的工人干完活......
王二正在干活!
张三正在干活!
李四活干完了!
王二活干完了!
张三活干完了!
工人活都干完了,老板开始检查了!

2、CyclicBarrier

接着上面的例子,还是这三个工人,不过这一次,这三个工人自由了,老板不用检查他们任务了,他们三个合作建桥,有三个桩,每人打一个,同时打完之后才能一起搭桥(搭桥需要三人一起合作)。也就是说三个人都打完桩之后才能继续工作。


package LatchAndCyclicBarrier;

import java.util.concurrent.CyclicBarrier;

public class CycWork implements Runnable {

        private CyclicBarrier cyclicBarrier ;
        private String name ;

        public CycWork(CyclicBarrier cyclicBarrier,String name)
       {
               this .name =name;
               this .cyclicBarrier =cyclicBarrier;
       }

        @Override
        public void run() {
               // TODO Auto-generated method stub

              System. out .println(name +"正在打桩,毕竟不轻松。。。。。" );

               try {
                     Thread. sleep(5000);
                     System. out .println(name +"不容易,终于把桩打完了。。。。" );
                      cyclicBarrier .await();

              } catch (Exception e) {
                      // TODO: handle exception
                     e.printStackTrace();
              }

              System. out .println(name +":其他逗b把桩都打完了,又得忙活了。。。" );

       }

}

测试程序

package LatchAndCyclicBarrier;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CycTest {

        public static void main(String[] args)
       {
              ExecutorService executorpool=Executors. newFixedThreadPool(3);
              CyclicBarrier cyclicBarrier= new CyclicBarrier(3);

              CycWork work1= new CycWork(cyclicBarrier, "张三" );
              CycWork work2= new CycWork(cyclicBarrier, "李四" );
              CycWork work3= new CycWork(cyclicBarrier, "王五" );

              executorpool.execute(work1);
              executorpool.execute(work2);
              executorpool.execute(work3);

              executorpool.shutdown();

       }

}

运行结果:

李四正在打桩,毕竟不轻松。。。。。
张三正在打桩,毕竟不轻松。。。。。
王五正在打桩,毕竟不轻松。。。。。
李四不容易,终于把桩打完了。。。。
张三不容易,终于把桩打完了。。。。
王五不容易,终于把桩打完了。。。。
王五:其他逗b把桩都打完了,又得忙活了。。。
李四:其他逗b把桩都打完了,又得忙活了。。。
张三:其他逗b把桩都打完了,又得忙活了。。。

CountDownlatch和CyclicBarrierde 源码部分

1、CountDownLatch中的两个关键方法

  public void countDown() {    //对计数器减一 表示有一个事件已经发生了
        sync.releaseShared(1);
    }

 public void await() throws InterruptedException { //等到计数器为0
        sync.acquireSharedInterruptibly(1);
    }

await方法调用了AbstractQueuedSynchronizer中的acquireSharedInterruptibly

 public final void acquireSharedInterruptibly (int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
 public final boolean releaseShared (int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true ;
        }
        return false ;
    }
protected boolean tryReleaseShared (int arg) {
        throw new UnsupportedOperationException();
    }

2、CyclicBarrier中的await()方法

  public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We‘re about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

上面dowait方法中有一个index,index=--count而count的值在源码中来自

count = parties;

提到 parties就不得不看看构造函数了

 public CyclicBarrier(int parties) {
        this(parties, null);
    }

如上例子,我们构造了CyclicBarrier(3)那么此时的 count值为3,接着dowait源码,当index==0时,后面执行的

final Runnable command = barrierCommand;

其实是可以设置的,这个Runnable可以传进来,当我们希望所有线程都达到某一时刻之后,用什么线程执行接下来的工作,当没有传Runnable进来时,就继续执行(唤醒其他线程),否则就runnable.run()(唤醒其他线程之前执行)

时间: 2024-11-06 01:03:10

java多线程并发系列之闭锁(Latch)和栅栏(CyclicBarrier)的相关文章

java多线程并发系列之锁的深入了解

上一篇博客中 : java多线程.并发系列之 (synchronized)同步与加锁机制 .介绍了java中Synchronized和简单的加锁机制,在加锁的模块中介绍了 轮询锁和定时锁,简单回顾下 轮询锁:利用tryLock来获取两个锁,如果不能同时获得,那么回退并重新尝试. 定时锁:索取锁的时候可以设定一个超时时间,如果超过这个时间还没索取到锁,则不会继续堵塞而是放弃此次任务. 锁的公平性 在公平的锁上,线程将按照它们发出请求的顺序来获取锁 上面似乎忘记了还有一种可中断锁和可选择粒度锁 可中

java 多线程并发系列之 生产者消费者模式的两种实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式. 什么是生

[转]Java多线程干货系列—(一)Java多线程基础

Java多线程干货系列—(一)Java多线程基础 字数7618 阅读1875 评论21 喜欢86 前言 多线程并发编程是Java编程中重要的一块内容,也是面试重点覆盖区域,所以学好多线程并发编程对我们来说极其重要,下面跟我一起开启本次的学习之旅吧. 正文 线程与进程 1 线程:进程中负责程序执行的执行单元线程本身依靠程序进行运行线程是程序中的顺序控制流,只能使用分配给程序的资源和环境 2 进程:执行中的程序一个进程至少包含一个线程 3 单线程:程序中只存在一个线程,实际上主方法就是一个主线程 4

java高并发系列 - 第15天:JUC中的Semaphore,最简单的限流工具类,必备技能

这是java高并发系列第15篇文章 Semaphore(信号量)为多线程协作提供了更为强大的控制方法,前面的文章中我们学了synchronized和重入锁ReentrantLock,这2种锁一次都只能允许一个线程访问一个资源,而信号量可以控制有多少个线程可以访问特定的资源. Semaphore常用场景:限流 举个例子: 比如有个停车场,有5个空位,门口有个门卫,手中5把钥匙分别对应5个车位上面的锁,来一辆车,门卫会给司机一把钥匙,然后进去找到对应的车位停下来,出去的时候司机将钥匙归还给门卫.停车

java高并发系列 - 第32天:高并发中计数器的实现方式有哪些?

这是java高并发系列第32篇文章. java环境:jdk1.8. 本文主要内容 4种方式实现计数器功能,对比其性能 介绍LongAdder 介绍LongAccumulator 需求:一个jvm中实现一个计数器功能,需保证多线程情况下数据正确性. 我们来模拟50个线程,每个线程对计数器递增100万次,最终结果应该是5000万. 我们使用4种方式实现,看一下其性能,然后引出为什么需要使用LongAdder.LongAccumulator. 方式一:synchronized方式实现 package

Java多线程并发09——如何实现线程间与线程内数据共享

本文将为各位带来 Java 阻塞队列相关只是.关注我的公众号「Java面典」了解更多 Java 相关知识点. 线程间数据共享 Java 里面进行多线程通信的主要方式就是共享内存的方式,共享内存主要的关注点有两个:可见性和有序性原子性.Java 内存模型(JMM)解决了可见性和有序性的问题,而锁解决了原子性的问题,理想情况下我们希望做到"同步"和"互斥".有以下常规实现方法: 将数据抽象成一个类 将数据抽象成一个类,并将对这个数据的操作作为这个类的方法,这么设计可以和

java 多线程并发问题总结

java 多线程并发主要通过关键字synchronized实现 Java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码. 一.当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行.另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块. 二.然而,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该ob

对JAVA多线程 并发编程的理解

对JAVA多线程并发编程的理解 Java多线程编程关注的焦点主要是对单一资源的并发访问,本文从Java如何实现支持并发访问的角度,浅析对并发编程的理解,也算是对前段时间所学的一个总结. 线程状态转换 Java语言定义了5中线程状态,在任何一个时间点,一个线程只能有且只有其中一种状态,这5中状态分别是: ?  新建(New):创建后尚未启动的线程处于这种状态 ?  运行(Runable):Runable包括了操作系统线程状态中的Running和Ready,也就是处于此状态的线程可能正在执行,也有可

Java多线程并发技术

Java多线程并发技术 参考文献: http://blog.csdn.net/aboy123/article/details/38307539 http://blog.csdn.net/ghsau/article/category/1707779 http://www.iteye.com/topic/366591 JAVA多线程实现方式主要有三种:继承Thread类.实现Runnable接口.使用ExecutorService.Callable.Future实现有返回结果的多线程.其中前两种方式