多线程并发常用类:condition,semaphore,CyclicBarrier,countdownlatch,exchanger使用整理

condition 类:

作为一个示例,假定有一个绑定的缓冲区,它支持 put 和 take 方法。如果试图在空的缓冲区上执行 take 操作,则在某一个项变得可用之前,线程将一直阻塞;如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞。我们喜欢在单独的等待
set 中保存 put 线程和 take 线程,这样就可以在缓冲区中的项或空间变得可用时利用最佳规划,一次只通知一个线程。可以使用两个 Condition 实例来做到这一点。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition();
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

semaphore类:

参见:http://www.cnblogs.com/linjiqin/archive/2013/07/25/3214676.html

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,
它负责协调各个线程, 以保证它们能够正确、合理的使用公共资源

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 信号量
 *
 * @author 林计钦
 * @version 1.0 2013-7-25 下午02:03:40
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        // 线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只能5个线程同时访问
        final Semaphore semp = new Semaphore(5);
        // 模拟20个客户端访问
        for (int index = 0; index < 50; index++) {
            final int NO = index;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        // 获取许可
                        semp.acquire();
                        System.out.println("Accessing: " + NO);
                        Thread.sleep((long) (Math.random() * 10000));
                        // 访问完后,释放
                        semp.release();
                        //availablePermits()指的是当前信号灯库中有多少个可以被使用
                        System.out.println("-----------------" + semp.availablePermits());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            exec.execute(run);
        }
        // 退出线程池
        exec.shutdown();
    }
}

CyclicBarrier介绍 (二)

张孝祥视频学习笔记:

CyclicBarrier 表示大家彼此等待,大家集合好后才开始出发,分散活动后又在i指定地点集合碰面,这就好比整个公司的人员利用周末时间集体郊游一样,先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始就餐……

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CyclicBarrierTest {

public static void main(String [] args){

ExecutorService service=Executors.newCachedThreadPool();

final CyclicBarrier cb=new CyclicBarrier(3);  //三个线程同时到达

for(int i=0;i<3;i++){

Runnable runnable=new Runnable(){

public void run(){

try {

Thread.sleep((long)(Math.random()*10000));

System.out.println("线程"+Thread.currentThread().getName()+

"即将到达集合地点1,当前已有"+(cb.getNumberWaiting()+1)+"个已到达"+

(cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));

try {

cb.await();

} catch (BrokenBarrierException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

Thread.sleep((long)(Math.random()*10000));

System.out.println("线程"+Thread.currentThread().getName()+

"即将到达集合地点2,当前已有"+(cb.getNumberWaiting()+1)+"个已到达"+

(cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));

try {

cb.await();

} catch (BrokenBarrierException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

Thread.sleep((long)(Math.random()*10000));

System.out.println("线程"+Thread.currentThread().getName()+

"即将到达集合地点3,当前已有"+(cb.getNumberWaiting()+1)+"个已到达"+

(cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));

try {

cb.await();

} catch (BrokenBarrierException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

};

service.execute(runnable);

}

service.shutdown();

}

}

运行结果:

线程pool-1-thread-3即将到达集合地点1,当前已有1个已到达正在等候

线程pool-1-thread-2即将到达集合地点1,当前已有2个已到达正在等候

线程pool-1-thread-1即将到达集合地点1,当前已有3个已到达都到齐了,继续走啊

线程pool-1-thread-1即将到达集合地点2,当前已有1个已到达正在等候

线程pool-1-thread-2即将到达集合地点2,当前已有2个已到达正在等候

线程pool-1-thread-3即将到达集合地点2,当前已有3个已到达都到齐了,继续走啊

线程pool-1-thread-2即将到达集合地点3,当前已有1个已到达正在等候

线程pool-1-thread-1即将到达集合地点3,当前已有2个已到达正在等候

线程pool-1-thread-3即将到达集合地点3,当前已有3个已到达都到齐了,继续走啊


countdownlatch类:

也可参考:http://www.iteye.com/topic/1002652

/**
CountDownLatch类是一个同步计数器,构造时传入int参数,该参数就是计数器的初始值,每调用一次countDown()方法,计数器减1,计数器大于0 时,await()方法会阻塞程序继续执行
CountDownLatch如其所写,是一个倒计数的锁存器,当计数减至0时触发特定的事件。利用这种特性,可以让主线程等待子线程的结束。下面以一个模拟运动员比赛的例子加以说明。
*/
import java.util.concurrent.CountDownLatch;  import java.util.concurrent.Executor;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;

public class CountDownLatchDemo {     private static final int PLAYER_AMOUNT = 5;     public CountDownLatchDemo() {         // TODO Auto-generated constructor stub          }     /**      * @param args      */     public static void main(String[] args) {         // TODO Auto-generated method stub         //对于每位运动员,CountDownLatch减1后即结束比赛         CountDownLatch begin = new CountDownLatch(1);         //对于整个比赛,所有运动员结束后才算结束         CountDownLatch end = new CountDownLatch(PLAYER_AMOUNT);         Player[] plays = new Player[PLAYER_AMOUNT];

for(int i=0;i<PLAYER_AMOUNT;i++)             plays[i] = new Player(i+1,begin,end);

//设置特定的线程池,大小为5         ExecutorService exe = Executors.newFixedThreadPool(PLAYER_AMOUNT);         for(Player p:plays)             exe.execute(p);            //分配线程         System.out.println("Race begins!");         begin.countDown();         try{             end.await();            //等待end状态变为0,即为比赛结束         }catch (InterruptedException e) {             // TODO: handle exception             e.printStackTrace();         }finally{             System.out.println("Race ends!");         }         exe.shutdown();     } }
public class Player implements Runnable {

private int id;     private CountDownLatch begin;     private CountDownLatch end;     public Player(int i, CountDownLatch begin, CountDownLatch end) {         // TODO Auto-generated constructor stub         super();         this.id = i;         this.begin = begin;         this.end = end;     }

@Override     public void run() {         // TODO Auto-generated method stub         try{             begin.await();        //等待begin的状态为0             Thread.sleep((long)(Math.random()*100));    //随机分配时间,即运动员完成时间             System.out.println("Play"+id+" arrived.");         }catch (InterruptedException e) {             // TODO: handle exception             e.printStackTrace();         }finally{             end.countDown();    //使end状态减1,最终减至0         }     } }

exchanger类:

Exchanger可以在两个线程之间交换数据,只能是2个线程,他不支持更多的线程之间互换数据。

当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行

  1. public class ThreadLocalTest {
  2. public static void main(String[] args) {
  3. Exchanger<List<Integer>> exchanger = new Exchanger<>();
  4. new Consumer(exchanger).start();
  5. new Producer(exchanger).start();
  6. }
  7. }
  8. class Producer extends Thread {
  9. List<Integer> list = new ArrayList<>();
  10. Exchanger<List<Integer>> exchanger = null;
  11. public Producer(Exchanger<List<Integer>> exchanger) {
  12. super();
  13. this.exchanger = exchanger;
  14. }
  15. @Override
  16. public void run() {
  17. Random rand = new Random();
  18. for(int i=0; i<10; i++) {
  19. list.clear();
  20. list.add(rand.nextInt(10000));
  21. list.add(rand.nextInt(10000));
  22. list.add(rand.nextInt(10000));
  23. list.add(rand.nextInt(10000));
  24. list.add(rand.nextInt(10000));
  25. try {
  26. list = exchanger.exchange(list);
  27. catch (InterruptedException e) {
  28. // TODO Auto-generated catch block
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }
  34. class Consumer extends Thread {
  35. List<Integer> list = new ArrayList<>();
  36. Exchanger<List<Integer>> exchanger = null;
  37. public Consumer(Exchanger<List<Integer>> exchanger) {
  38. super();
  39. this.exchanger = exchanger;
  40. }
  41. @Override
  42. public void run() {
  43. for(int i=0; i<10; i++) {
  44. try {
  45. list = exchanger.exchange(list);
  46. catch (InterruptedException e) {
  47. // TODO Auto-generated catch block
  48. e.printStackTrace();
  49. }
  50. System.out.print(list.get(0)+", ");
  51. System.out.print(list.get(1)+", ");
  52. System.out.print(list.get(2)+", ");
  53. System.out.print(list.get(3)+", ");
  54. System.out.println(list.get(4)+", ");
  55. }
  56. }
  57. }
时间: 2024-08-28 16:36:41

多线程并发常用类:condition,semaphore,CyclicBarrier,countdownlatch,exchanger使用整理的相关文章

并发工具类:倒计时器-CountDownLatch

title: 并发工具类:倒计时器-CountDownLatch author: Enjoyitlife.top date: 2019-10-01 10:51:33 summary: JUC包中的工具类CountDownLatch到底给我们提供了什么功能? categories: Concurrent tags: Concurrent-Tools 并发工具类:倒计时器-CountDownLatch CountDownLatch是JDK1.5提供一个并发编程辅助类,用于控制一个和多个线程进行等待,

【Java并发工具类】Semaphore

前言 1965年,荷兰计算机科学家Dijkstra提出的信号量机制成为一种高效的进程同步机制.这之后的15年,信号量一直都是并发编程领域的终结者.1980年,管程被提出,成为继信号量之后的在并发编程领域的第二个选择.目前几乎所有的语言都支持信号量机制,Java也不例外.Java中提供了Semaphore并发工具类来支持信号量机制.下面我们就来了解Java实现的信号量机制. 首先介绍信号量模型,然后介绍如何使用,最后使用信号量来实现一个限流器. 信号量模型 信号量模型图(图来自参考[1]): 信号

JUC——线程同步辅助工具类(Semaphore,CountDownLatch,CyclicBarrier)

CountDownLatch CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成.CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器.调用该类await方法的线程会一直处于阻塞状态,直到其他线程调用countDown方法使当前计数器的值变为零,每次调用countDown计数器的值减1.当计数器值减至零时,所有因调用await()方法而处于

java多线程并发(一)——Semaphore,volatile,synchronized ,Lock

在并发编程中,我们通常会遇到以下三个问题:原子性问题,可见性问题,有序性问题.我们先看具体看一下这三个概念: 1.原子性 原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行. 一个很经典的例子就是银行账户转账问题 2.可见性 可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值. 3.有序性 有序性:即程序执行的顺序按照代码的先后顺序执行. Semaphore 简介 信号量(Semaphore),有时被称为信号

并发工具类:CountDownLatch、CyclicBarrier、Semaphore

在多线程的场景下,有些并发流程需要人为来控制,在JDK的并发包里提供了几个并发工具类:CountDownLatch.CyclicBarrier.Semaphore. 一.CountDownLatch 1 import java.util.concurrent.CountDownLatch; 2 3 4 public class CountDownLatchTest 5 { //设置N为2 6 static CountDownLatch c = new CountDownLatch(2); 7 p

25.大白话说java并发工具类-CountDownLatch,CyclicBarrier,Semaphore,Exchanger

1. 倒计时器CountDownLatch 在多线程协作完成业务功能时,有时候需要等待其他多个线程完成任务之后,主线程才能继续往下执行业务功能,在这种的业务场景下,通常可以使用Thread类的join方法,让主线程等待被join的线程执行完之后,主线程才能继续往下执行.当然,使用线程间消息通信机制也可以完成.其实,java并发工具类中为我们提供了类似"倒计时"这样的工具类,可以十分方便的完成所说的这种业务场景. 为了能够理解CountDownLatch,举一个很通俗的例子,运动员进行跑

java.util.concurrent常用类(CountDownLatch,Semaphore,CyclicBarrier,Future)

CyclicBarrier CyclicBarrier是用来一个关卡来阻挡住所有线程,等所有线程全部执行到关卡处时,再统一执行下一步操作.假设一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家就等待 . 代码示例: public class UseCyclicBarrier { static class Runner implements Runnable { private CyclicBarrier barrier; private Strin

java并发的艺术-读书笔记-第八章常用的并发工具类

jdk中提供了几个非常有用的工具类,分别是CountDownLatch,CyclicBarrier和semaphore exchanger CountDownLatch:允许一个或者多个线程等待其他线程完成操作 public class CountDownLatchTest{ static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args){ new Thread(new Runnabl

Java学习笔记—多线程(并发工具类,java.util.concurrent.atomic包)

在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段.本章会配合一些应用场景来介绍如何使用这些工具类. CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作.假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,