多线程-CountDownLatch,CyclicBarrier,Semaphore,Exchanger,Phaser

CountDownLatch 
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化CountDownLatch。调用countDown()计数减一,当计数到达零之前await()方法会一直阻塞,计数无法被重置。

public class CountDownLatch {
    private final Sync sync;
    public CountDownLatch(int count);
    public void countDown() {
        sync.releaseShared(1);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
}

CountDownLatch中主要有countDown()和await()方法。 
countDown()递减计数,如果计数达到零,则是否所有等待的线程。 
1. 如果当前计数大于零,则计数减一; 
2. 如果减一之后计数为零,则重新调度所有等待该计数为零的线程; 
3. 如果计数已经为零,则不发生任何操作; 
await()使当前线程在计数为零之前一直阻塞,除非线程被中断或超出指定的等待时间; 
如果计数为零,则立刻返回true 
在进入此方法时,当前线程已经设置了中断状态或在等待时被中断,则抛出InterruptedException异常,并且清除当前线程的中断状态。如果超出了指定等待时间,则返回false,如果该时间小于等于零,则此方法根本不会等待。

package org.github.lujiango;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test16 {

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch begin = new CountDownLatch(1);
        final CountDownLatch end = new CountDownLatch(10);
        final ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            final int no = i + 1;
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        begin.await();
                        TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + " arrived");
                    } catch (Exception e) {

                    } finally {
                        end.countDown();
                    }
                }
            };
            exec.submit(run);
        }

        System.out.println("Game start");
        begin.countDown();
        end.await();
        System.out.println("Game over");
        exec.shutdown();
    }

}

CyclicBarrier 
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时的互相等待。

package org.github.lujiango;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test16 {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final CyclicBarrier end = new CyclicBarrier(10);
        final ExecutorService exec = Executors.newFixedThreadPool(10);
        System.out.println("Game start");
        for (int i = 0; i < 10; i++) {
            final int no = i + 1;
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        end.await();
                        TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + " arrived");
                    } catch (Exception e) {

                    } finally {
                    }
                }
            };
            exec.submit(run);
        }
        System.out.println("Game over");
        exec.shutdown();

    }

}

需要所有的子任务都完成时,才执行主任务,这个时候可以选择使用CyclicBarrier。

Semaphore 
一个计数信号量,信号量维护了一个许可集,在许可可用之前会阻塞每一个acquire(),然后获取该许可。每个release()释放许可,从而可能释放一个正在阻塞的获取者。 
Semaphore只对可用许可的号码进行计数,并采取相应的行动,拿到信号的线程可以进入代码,否则就等待。

package org.github.lujiango;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Test17 {

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semp = new Semaphore(5);
        for (int i = 0; i < 20; i++) {
            final int no = i;
            Runnable run = new Runnable() {

                @Override
                public void run() {
                    try {
                        semp.acquire();
                        System.out.println("Accessing: " + no);
                        TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                    } catch (Exception e) {

                    } finally {
                        semp.release();
                    }
                }
            };
            exec.submit(run);
        }
        exec.shutdown();
    }

}

Exchanger 
Exchanger可以在两个线程之间交换数据,只能在两个线程,不支持更多的线程之间互换数据。 
当线程A调用Exchanger对象的exchage()方法后,会阻塞;直到B线程也调用exchange()方法,然后线程以安全的方式交换数据,之后A和B线程继续执行。

package org.github.lujiango;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Exchanger;

public class Test18 {

    public static void main(String[] args) {
        Exchanger<List<Integer>> ex = new Exchanger<List<Integer>>();
        new A(ex).start();
        new B(ex).start();
    }

}

class A extends Thread {
    List<Integer> list = new ArrayList<Integer>();
    Exchanger<List<Integer>> ex;

    public A(Exchanger<List<Integer>> ex) {
        this.ex = ex;
    }

    @Override
    public void run() {
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            list.clear();
            list.add(random.nextInt(10));
            list.add(random.nextInt(10));
            list.add(random.nextInt(10));
            try {
                list = ex.exchange(list);
            } catch (Exception e) {

            }
        }
    }
}

class B extends Thread {
    List<Integer> list = new ArrayList<Integer>();
    Exchanger<List<Integer>> ex;

    public B(Exchanger<List<Integer>> ex) {
        this.ex = ex;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                list = ex.exchange(list);
            } catch (Exception e) {

            }
            System.out.println(list);
        }
    }
}

Phaser 
Phaser是一个灵活的线程同步工具,它包含了CountDownLatch和CyclicBarrier的相关功能。 
CountDownLatch的countDown()和await()可以通过Phaser的arrive()和awaitAdvance(int n)代替 
而CyclicBarrier的await可以使用Phaser的arriveAndAwaitAdvance()方法代替 
用Phaser代替CountDownLatch:

package org.github.lujiango;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Test19 {

    public static void main(String[] args) throws InterruptedException {
        final Phaser latch = new Phaser(10);
        for (int i = 1; i <= 10; i++) {
            final int id = i;
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep((long) (Math.random() * 10));
                        System.out.println("thread: " + id + " is running");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.arrive();
                    }
                }
            });
            t.start();
        }
        latch.awaitAdvance(latch.getPhase());
        System.out.println("all thread has run");

    }

}
package org.github.lujiango;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Test19 {

    public static void main(String[] args) throws InterruptedException {
        final Phaser latch = new Phaser(10);
        for (int i = 1; i <= 10; i++) {
            final int id = i;
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep((long) (Math.random() * 10));
                        latch.arriveAndAwaitAdvance(); // 所有线程都执行到这里,才会继续执行,否则全部阻塞
                        System.out.println("thread: " + id + " is running");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.arrive();
                    }
                }
            });
            t.start();
        }
    }

}

  

时间: 2024-11-29 10:05:23

多线程-CountDownLatch,CyclicBarrier,Semaphore,Exchanger,Phaser的相关文章

并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)

目录 1. 前言 2. 闭锁CountDownLatch 2.1 CountDownLatch功能简介 2.2 使用CountDownLatch 2.3 CountDownLatch原理浅析 3.循环屏障CyclicBarrier 3.1 CyclicBarrier功能简介 3.2 使用CyclicBarrier 3.3 CyclicBarrier原理浅析 4. 信号量Semaphore 4.1 Semaphore功能简介 4.2 使用Semaphore进行最大并发数的控制 4.3 Semaph

CountDownLatch/CyclicBarrier/Semaphore 使用过吗?

CountDownLatch/CyclicBarrier/Semaphore 使用过吗?下面详细介绍用法: 一,CountDownLatch  背景; countDownLatch(同步援助)是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier(同步援助).Semaphore(计数信号量).concurrentHashMap和BlockingQueue(阻塞队列). 存在于java.util.cucurrent包下.   概念理解: 让一些线程阻塞,直到另外一些线程完成

Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger

CountDownLatch允许一个或者多个线程等待其他线程完成操作,之后再对结果做统一处理: 适用场景,分布式系统中对多个微服务的调用,并发执行并且必须等待全部执行完成才能继续执行后续操作: 其实在java中默认的实现是join()方法,join()方法主要的作用是当前线程必须等待直到join线程执行完成之后才能继续执行后续的操作, 其本质就是轮询判断join线程是否存活,如果存活则主线程继续等待,否则,通过调用this.notifyAll()方法来继续执行主线程. 实例代码如下: publi

CountDownLatch CyclicBarrier Semaphore 比较

document CountDownLatch A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. CyclicBarrier A synchronization aid that allows a set of threads to all wait for each other to

Java多线程20:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger

前言 在多线程环境下,JDK给开发者提供了许多的组件供用户使用(主要在java.util.concurrent下),使得用户不需要再去关心在具体场景下要如何写出同时兼顾线程安全性与高效率的代码.之前讲过的线程池.BlockingQueue都是在java.util.concurrent下的组件,Timer虽然不在java.util.concurrent下,但也算是.后两篇文章将以例子的形式简单讲解一些多线程下其他组件的使用,不需要多深刻的理解,知道每个组件大致什么作用就行. 本文主要讲解的是Cou

java并发之CountDownLatch、Semaphore和CyclicBarrier

JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch.Semaphore和CyclicBarrier. CountDownLatch CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行. 这个类里主要有两个方法,一个是向下减计数器的方法:countdown(),其实现的核心代码如下: public boolean tryReleaseShared(int release

转发---[沧海拾遗]java并发之CountDownLatch、Semaphore和CyclicBarrier

JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch.Semaphore和CyclicBarrier. CountDownLatch CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行. 这个类里主要有两个方法,一个是向下减计数器的方法:countdown(),其实现的核心代码如下: public boolean tryReleaseShared(int release

Java核心-多线程-并发控制器-Semaphore信号量

Semaphore是非常有用的一个多线程并发控制组件(Java还有CountDownLatch.CyclicBarrier.Exchanger多线程组件),它相当于是一个并发控制器,是用于管理信号量的.构造的时候传入可供管理的信号量的数值,这个数值就是控制并发数量的,就是同时能几个线程访问.我们需要控制并发的代码,执行前先通过acquire方法获取信号,执行后通过release归还信号 .每次acquire返回成功后,Semaphore可用的信号量就会减少一个,如果没有可用的信号,acquire

CountDownLatch &amp; CyclicBarrier源码Android版实现解析

CountDownLatch CountDownLatch允许一条或者多条线程等待直至其它线程完成以系列的操作的辅助同步器. 用一个指定的count值对CountDownLatch进行初始化.await方法会阻塞,直至因为调用countDown方法把当前的count降为0,在这以后,所有的等待线程会被释放,并且在这以后的await调用将会立即返回.这是一个一次性行为--count不能被重置.如果你需要一个可以重置count的版本,考虑使用CyclicBarrier. 其实本类实现非常简单,和Re

C#多线程--信号量(Semaphore)

百度百科:Semaphore,是负责协调各个线程, 以保证它们能够正确.合理的使用公共资源.也是操作系统中用于控制进程同步互斥的量. Semaphore常用的方法有两个WaitOne()和Release(),Release()的作用是退出信号量并返回前一个计数,而WaitOne()则是阻止当前线程,直到当前线程的WaitHandle 收到信号.这里我举一个例子让大家更容易理解:当我们这样实例化Semaphore时候 Semaphore sema = new Semaphore( x , y );