1.同步器之CountDownLatch
类的名称的中文翻译为:倒数闩(倒数锁存器)。操作线程时,有时候我们希望这个线程进行等待,直到一定数量的事件发生之后为止。为了处理这种情况,并发API提供了CountDownLatch类,CountDownLatch在创建时指定要等待的事件的数量,在释放锁存器(闩)之前,必须发生指定数量的事件。每发生一个事件,计数器递减,当计数器减为0时,锁存器打开,等待的线程被唤醒,接着走。
由上可知,使用的方式很简单:创建一个公用倒数锁存器,在一个线程的需要等待的位置进行await(可以指定等待的最长时间,使用方法await(long wait,TimeUnit tu)wait指定等待的时长,tu指定时长的时间单位,是一个枚举类型的一个值,在后续博文介绍这个枚举类型),直到锁存器打开;其他线程使用同样的锁存器,只是完成一个指定的事件后,调用countDown,使计数器递减。
实例代码如下:
import java.util.concurrent.CountDownLatch; public class CountDownLetchTest { public static void main(String[] args) { //创建一个锁存器,要开启该锁存器必须等待5个事件的发生 CountDownLatch countDownLetch = new CountDownLatch(5); //线程一是使用锁存器的线程 new Thread(()->{ try { countDownLetch.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("五个事件执行完毕,所以才输出了这句话哦.."); }).start(); new Thread(()->{ System.out.println("第一个事件执行完毕"); countDownLetch.countDown(); }).start(); new Thread(()->{ System.out.println("第二个事件执行完毕"); countDownLetch.countDown(); }).start(); new Thread(()->{ System.out.println("第三个事件执行完毕"); countDownLetch.countDown(); }).start(); new Thread(()->{ System.out.println("第四个事件执行完毕"); countDownLetch.countDown(); }).start(); new Thread(()->{ System.out.println("第五个事件执行完毕"); countDownLetch.countDown(); }).start(); } // 输出结果: // 第一个事件执行完毕 // 第二个事件执行完毕 // 第三个事件执行完毕 // 第四个事件执行完毕 // 第五个事件执行完毕 // 五个事件执行完毕,所以才输出了这句话哦.. //根据输出结果,如果没有等待的话,出现这种结果的可能性很小很小很小很小 }
2.同步器之CyclicBarrier
类的名称的中文翻译为:生命周期拦截栅栏(周期屏障)。『真心感觉还是不要翻译的好。。。』在并发编程中,多个线程必须在预定的执行点进行等待,直到所有的线程都到达了各自的执行点,然后,屏障打开,所有线程都接着执行,只要有一个还没到就接着等,直到所有都到了。
由上可知,使用方式很简单:这个类有两个构造器分别为:CyclicBarrier(int numThreads) CyclicBarrier(int numThreads,Runnable action)第一个参数指定需要到达执行点的线程数量,第二个参数指定全部线程到达之后要执行的操作,然后需要同步的线程使用同一个CyclicBarrier,每个到达执行点的线程进行等待await()(也可指定时间),等指定数量的线程到达后,屏障打开,如果有指定的操作执行,这个操作也会执行。
未指定屏障打开后后续操作的实例代码:
import java.util.concurrent.CyclicBarrier; public class CyclicBarrierWithoutSpecificAfterwardAction { public static void main(String[] args) { //不指定全部到达等待界点后要执行的操作 CyclicBarrier cyclicBarrier = new CyclicBarrier(3); new Thread(()->{ System.out.println("I'm thread-0,I'm on the await-point.."); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Ok,Every thread is here, I'm thread-0,I'll go..."); }).start(); new Thread(()->{ System.out.println("I'm thread-1,I'm on the await-point.."); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Ok,Every thread is here, I'm thread-1,I'll go..."); }).start(); new Thread(()->{ System.out.println("I'm thread-2,I'm on the await-point.."); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Ok,Every thread is here, I'm thread-2,I'll go..."); }).start(); // 输出结果: // I'm thread-0,I'm on the await-point.. // I'm thread-1,I'm on the await-point.. // I'm thread-2,I'm on the await-point.. // Ok,Every thread is here, I'm thread-2,I'll go... // Ok,Every thread is here, I'm thread-0,I'll go... // Ok,Every thread is here, I'm thread-1,I'll go... } }
指定了屏障打开后后续操作的实例代码:
public class CyclicBarrierWithSpecificAfterwardAction { public static void main(String[] args) { //指定全部到达等待界点后要执行的操作 CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{ System.out.println("Ok,Every thread is here,Hope you every thread go dwon smoothly..."); }); new Thread(()->{ System.out.println("I'm thread-0,I'm on the await-point.."); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Ok,Every thread is here, I'm thread-0,I'll go..."); }).start(); new Thread(()->{ System.out.println("I'm thread-1,I'm on the await-point.."); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Ok,Every thread is here, I'm thread-1,I'll go..."); }).start(); new Thread(()->{ System.out.println("I'm thread-2,I'm on the await-point.."); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Ok,Every thread is here, I'm thread-2,I'll go..."); }).start(); // 输出结果: // I'm thread-0,I'm on the await-point.. // I'm thread-1,I'm on the await-point.. // I'm thread-2,I'm on the await-point.. // Ok,Every thread is here,Hope you every thread go dwon smoothly... // Ok,Every thread is here, I'm thread-2,I'll go... // Ok,Every thread is here, I'm thread-0,I'll go... // Ok,Every thread is here, I'm thread-1,I'll go... } }
3.同步器之Exchanger
Exchanger类的中文翻译为:交换器。可能最有趣的同步器就是Exchanger,其设计的目的是为了简化两个线程之间的数据交换。Exchanger对象的操作十分简单:简单的进行等待,直到两个独立的线程都调用了exchanger方法为止,此时,进行数据交换。
构造器为:Exchanger<V>,V指定要交换的数据的类型。
交换方法为:V exchanger(V Object)/V exchanger(V Object,long wait,TimeUnit tu)
由以上可知使用的过程大致为:创建一个共用的Exchanger,然后两个线程在各自该交换数据的执行点调用exchanger方法,使用相应的数据类型接收交换过来的数据即可。
实例代码如下:
import java.util.concurrent.Exchanger; public class ExchangeTest { public static void main(String[] args) { Exchanger<Integer> exchanger = new Exchanger<Integer>(); new Thread(()->{ int count = 1; while(count<5){ try { Integer receive = exchanger.exchange(count*2); System.out.println("Got from thread-1: "+receive); count++; } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(()->{ int count = 1; while(count<5){ try { Integer receive = exchanger.exchange(count*1); System.out.println("Got form thread-0: "+receive); count++; } catch (Exception e) { e.printStackTrace(); } } }).start(); } // 运行结果: // Got from thread-1: 1 // Got form thread-0: 2 // Got from thread-1: 2 // Got form thread-0: 4 // Got form thread-0: 6 // Got from thread-1: 3 // Got from thread-1: 4 // Got form thread-0: 8 }
4.同步器之Phaser
Phaser的中文翻译为:移相器(阶段同步器)。简单来看,Phaser是CyclicBarrier的增强版,CyclicBarrier是在一个执行点进行等待,当指定数量的线程到达这个执行点之后,这些线程就接着往下走了,而Phaser不同的是,过了这个执行点之后还可以有下一个执行点等待,即是分阶段等待。主要应用场景是:允许表示一个或多个活动阶段的线程进行同步。
构造器:Phaser()/Phaser(int numThreads):参数表示需要同步的线程的数量,如果一开始没有指定也没有关系,在程序的运行过程中,需要同步的线程的数量是可以变化的,变化的方式见下文
注册到同步器上:int register()返回值是注册到这个阶段同步器的哪个阶段上了,这里虽然是调用了注册方法,实际上阶段同步器并不知道到底是哪个线程注册了,只知道某个阶段需要同步的线程的数量增加了一个
到达:int arrive()/int arriveAndAwaitAdvance()/int arriveAndDeregister()
三个到达方法,返回值都是当前阶段编号。第一个方法仅仅是告诉调用线程,我到咯,但是我不等其他线程哦,也就是不等这个阶段完成,接着往下走;第二个是我到咯,我还要等,直到指定数量的线程到达了执行点,也就是这个阶段完成了,我才接着往下走;第三个方式是,我到咯,而且巧的是我到这一阶段就不需要再同步了,所以在我到达这一阶段我的执行点之后就注销注册了,那下一阶段需要到达指定执行点的线程数量减一,这里的注销仅仅是告诉阶段同步器下一阶段需要同步的线程的数量减一,到底减少了哪个线程并不知道。
当然,也可以让阶段同步器提前失效,即在阶段同步器运行到一定条件下就失去同步效果,这个可以通过重写Phaser类的boolean onAdvance()方法,可用来控制当前阶段同步器是否要失效的两个参数分别是执行阶段和当前注册的需要同步线程的数量。在阶段同步器向下阶段推进的时候,都会调用这个方法,如果返回TRUE,就立即失效,如果返回FALSE,接着向下一阶段推进。
使用的实例代码如下:
import java.util.concurrent.Phaser; public class PhaserTest { public static void main(String[] args) { //也可以不重写OnAdvace()方法,重写是为了在执行了特定的阶段之后返回,这里是执行到第三步返回,如果没有重写,直到注册的所有线程运行完了所有阶段也能正常结束 //传到构造器中的3是总共有3个线程注册到这个同步器上,问题来了,有时候并不是三个线程一直协同到最后,可能走着走着就只剩一个线程在起作用了,当一个线程到达之后并且不需要再同步的时候,达到之后注销即可,调用的方法为: Phaser phaser = new Phaser(3){ @Override protected boolean onAdvance(int phase,int parties){ if(phase==2||parties==0)return true; return false; } }; new Thread(()->{ //开始第一阶段 //获取当前是在第几阶段,如果已经结束则返回负数 int i = phaser.getPhase(); System.out.println("thread-0 phaser--"+i); phaser.arriveAndAwaitAdvance(); //开始第二阶段 i = phaser.getPhase(); System.out.println("thread-0 phaser---"+i); phaser.arriveAndAwaitAdvance(); //开始第三阶段 i = phaser.getPhase(); System.out.println("thread-0 phaser----"+i); phaser.arriveAndAwaitAdvance(); //开始第四阶段 i = phaser.getPhase(); System.out.println("thread-0 phaser----- "+i); }).start(); new Thread(()->{ //开始第一阶段 int i = phaser.getPhase(); System.out.println("thread-1 phaser--"+i); phaser.arriveAndAwaitAdvance(); //开始第二阶段 i = phaser.getPhase(); System.out.println("thread-1 phaser---"+i); phaser.arriveAndAwaitAdvance(); //开始第三阶段 i = phaser.getPhase(); System.out.println("thread-1 phaser----"+i); phaser.arriveAndAwaitAdvance(); //开始第四阶段 i = phaser.getPhase(); System.out.println("thread-1 phaser----- "+i); }).start(); new Thread(()->{ //开始第一阶段 int i = phaser.getPhase(); System.out.println("thread-2 phaser--"+i); phaser.arriveAndAwaitAdvance(); //开始第二阶段 i = phaser.getPhase(); System.out.println("thread-2 phaser---"+i); //注销啦,其实这里的注销仅仅是告诉Phaser,注册的线程少了一个,具体少了哪一个它并不知道 //phaser.arriveAndDeregister(); phaser.arriveAndAwaitAdvance(); //开始第三阶段 i = phaser.getPhase(); System.out.println("thread-2 phaser----"+i); phaser.arriveAndAwaitAdvance(); //开始第四阶段 i = phaser.getPhase(); System.out.println("thread-2 phaser----- "+i); }).start(); //运行结果 // thread-0 phaser--0 // thread-1 phaser--0 // thread-2 phaser--0 // thread-2 phaser---1 // thread-1 phaser---1 // thread-0 phaser---1 // thread-1 phaser----2 // thread-0 phaser----2 // thread-2 phaser----2 // thread-1 phaser----- -2147483645 // thread-0 phaser----- -2147483645 // thread-2 phaser----- -2147483645 } }