Java 7的并发编程-Phaser

Java 7的并发包中推出了Phaser,其功能跟CyclicBarrier和CountDownLatch有些重叠,但是提供了更灵活的用法,例如支持动态调整注册任务的数量等。本文在Phaser自带的示例代码基础上进行一下简单的分析。

注册(Registration)

Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量,此外也支持通过其构造函数进行指定初始数量。在适当的时机,Phaser支持减少注册任务的数量,例如 arriveAndDeregister()。单个Phaser实例允许的注册任务数的上限是65535。

到达(Arrival)

正如Phaser类的名字所暗示,每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到 达,arriveAndAwaitAdvance()方法用于记录到达,并且等待其它未到达的任务。

终止(Termination)

Phaser支持终止。Phaser终止之后,调用register()和bulkRegister(int parties)方法没有任何效果,arriveAndAwaitAdvance()方法也会立即返回。触发终止的时机是在protected boolean onAdvance(int phase, int registeredParties)方法返回时,如果该方法返回true,那么Phaser会被终止。默认实现是在注册任务数为0时返回true(即 return registeredParties == 0;)。此外,forceTermination()方法用于强制终止,isTerminated()方法用于判断是否已经终止。

层次结构(Tiering)

Phaser支持层次结构,即通过构造函数Phaser(Phaser parent)和Phaser(Phaser parent, int parties)构造一个树形结构。这有助于减轻因在单个的Phaser上注册过多的任务而导致的竞争,从而提升吞吐量,代价是增加单个操作的开销。

使用案例

样例1

在有些场景下,我们希望控制多个线程的启动时机:例如在并发相关的单元测试中,有时需要控制线程的启动时机,以期获得最大程度的并发,通常我们会使用CountDownLatch,以下是使用Phaser的版本。

Java代码  

  1. import java.util.concurrent.Phaser;
  2. public class PhaserTest1 {
  3. public static void main(String args[]) {
  4. //
  5. final int count = 5;
  6. final Phaser phaser = new Phaser(count);
  7. for(int i = 0; i < count; i++) {
  8. System.out.println("starting thread, id: " + i);
  9. final Thread thread = new Thread(new Task(i, phaser));
  10. thread.start();
  11. }
  12. }
  13. public static class Task implements Runnable {
  14. //
  15. private final int id;
  16. private final Phaser phaser;
  17. public Task(int id, Phaser phaser) {
  18. this.id = id;
  19. this.phaser = phaser;
  20. }
  21. @Override
  22. public void run() {
  23. phaser.arriveAndAwaitAdvance();
  24. System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
  25. }
  26. }
  27. }

以上例子中,由于线程是在一个循环中start,因此start的时机有一定的间隔。本例中这些线程实际开始工作的时机是在所有的线程都调用了phaser.arriveAndAwaitAdvance()之后。

此外,如果留心arriveAndAwaitAdvance()方法的签名,会发现它并没有抛出InterruptedException,实际上,即使 当前线程被中断,arriveAndAwaitAdvance()方法也不会返回,而是继续等待。如果在等待时希望可中断,或者可超时,那么需要使用以下 方法:

Java代码  

  1. awaitAdvance(arrive())  // 等效于arriveAndAwaitAdvance()
  2. awaitAdvanceInterruptibly(int phase)
  3. awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)

样例2

有些时候我们希望只有在某些外部条件满足时,才真正开始任务的执行,例如:

Java代码  

  1. import java.io.BufferedReader;
  2. import java.io.InputStreamReader;
  3. import java.util.concurrent.Phaser;
  4. public class PhaserTest2 {
  5. public static void main(String args[]) throws Exception {
  6. //
  7. final Phaser phaser = new Phaser(1);
  8. for(int i = 0; i < 5; i++) {
  9. phaser.register();
  10. System.out.println("starting thread, id: " + i);
  11. final Thread thread = new Thread(new Task(i, phaser));
  12. thread.start();
  13. }
  14. //
  15. System.out.println("Press ENTER to continue");
  16. BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
  17. reader.readLine();
  18. phaser.arriveAndDeregister();
  19. }
  20. public static class Task implements Runnable {
  21. //
  22. private final int id;
  23. private final Phaser phaser;
  24. public Task(int id, Phaser phaser) {
  25. this.id = id;
  26. this.phaser = phaser;
  27. }
  28. @Override
  29. public void run() {
  30. phaser.arriveAndAwaitAdvance();
  31. System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
  32. }
  33. }
  34. }

以上例子中,只有当用户按下回车之后,任务才真正开始执行。需要注意的是,arriveAndDeregister()方法不会被阻塞,并且返回到达时的phase number(arrive方法也是如此)。

样例3

CyclicBarrier支持barrier action, Phaser同样也支持。不同之处是Phaser的barrier action需要改写onAdvance方法来进行定制。

Java代码  

  1. import java.util.concurrent.Phaser;
  2. public class PhaserTest3 {
  3. public static void main(String args[]) throws Exception {
  4. //
  5. final int count = 5;
  6. final int phaseToTerminate = 3;
  7. final Phaser phaser = new Phaser(count) {
  8. @Override
  9. protected boolean onAdvance(int phase, int registeredParties) {
  10. System.out.println("====== " + phase + " ======");
  11. return phase >= phaseToTerminate || registeredParties == 0;
  12. }
  13. };
  14. //
  15. for(int i = 0; i < count; i++) {
  16. System.out.println("starting thread, id: " + i);
  17. final Thread thread = new Thread(new Task(i, phaser));
  18. thread.start();
  19. }
  20. }
  21. public static class Task implements Runnable {
  22. //
  23. private final int id;
  24. private final Phaser phaser;
  25. public Task(int id, Phaser phaser) {
  26. this.id = id;
  27. this.phaser = phaser;
  28. }
  29. @Override
  30. public void run() {
  31. do {
  32. try {
  33. Thread.sleep(500);
  34. } catch(InterruptedException e) {
  35. // NOP
  36. }
  37. System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
  38. phaser.arriveAndAwaitAdvance();
  39. } while(!phaser.isTerminated());
  40. }
  41. }
  42. }

本例中的barrier action只是简单地打印了一条信息,此外在超过指定的迭代次数后终止了Phaser。

样例4

在Smaple 3的例子中,主线程在其它工作线程结束之前已经终止。如果希望主线程等待这些工作线程结束,除了使用Thread.join()之外,也可以尝试以下的方式:

Java代码  

  1. import java.util.concurrent.Phaser;
  2. public class PhaserTest4 {
  3. public static void main(String args[]) throws Exception {
  4. //
  5. final int count = 5;
  6. final int phaseToTerminate = 3;
  7. final Phaser phaser = new Phaser(count) {
  8. @Override
  9. protected boolean onAdvance(int phase, int registeredParties) {
  10. System.out.println("====== " + phase + " ======");
  11. return phase == phaseToTerminate || registeredParties == 0;
  12. }
  13. };
  14. //
  15. for(int i = 0; i < count; i++) {
  16. System.out.println("starting thread, id: " + i);
  17. final Thread thread = new Thread(new Task(i, phaser));
  18. thread.start();
  19. }
  20. //
  21. phaser.register();
  22. while (!phaser.isTerminated()) {
  23. phaser.arriveAndAwaitAdvance();
  24. }
  25. System.out.println("done");
  26. }
  27. public static class Task implements Runnable {
  28. //
  29. private final int id;
  30. private final Phaser phaser;
  31. public Task(int id, Phaser phaser) {
  32. this.id = id;
  33. this.phaser = phaser;
  34. }
  35. @Override
  36. public void run() {
  37. while(!phaser.isTerminated()) {
  38. try {
  39. Thread.sleep(500);
  40. } catch(InterruptedException e) {
  41. // NOP
  42. }
  43. System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id);
  44. phaser.arriveAndAwaitAdvance();
  45. }
  46. }
  47. }
  48. }

如果希望主线程在特定的phase结束之后终止,那么可以在主线程中调用下述方法:

Java代码  

  1. public static void awaitPhase(Phaser phaser, int phase) {
  2. int p = phaser.register(); // assumes caller not already registered
  3. while (p < phase) {
  4. if (phaser.isTerminated()) {
  5. break; // ... deal with unexpected termination
  6. } else {
  7. p = phaser.arriveAndAwaitAdvance();
  8. }
  9. }
  10. phaser.arriveAndDeregister();
  11. }

需要注意的是,awaitPhase方法中的if (phaser.isTerminated()) 分支里需要能够正确处理Phaser终止的情况。否则由于在Phaser终止之后, phaser.register()和arriveAndAwaitAdvance()方法均返回负值,那么上述方法可能陷入死循环。

样例5

以下对Phaser进行分层的例子:

Java代码  

  1. import java.util.concurrent.Phaser;
  2. public class PhaserTest6 {
  3. //
  4. private static final int TASKS_PER_PHASER = 4;
  5. public static void main(String args[]) throws Exception {
  6. //
  7. final int phaseToTerminate = 3;
  8. final Phaser phaser = new Phaser() {
  9. @Override
  10. protected boolean onAdvance(int phase, int registeredParties) {
  11. System.out.println("====== " + phase + " ======");
  12. return phase == phaseToTerminate || registeredParties == 0;
  13. }
  14. };
  15. //
  16. final Task tasks[] = new Task[10];
  17. build(tasks, 0, tasks.length, phaser);
  18. for (int i = 0; i < tasks.length; i++) {
  19. System.out.println("starting thread, id: " + i);
  20. final Thread thread = new Thread(tasks[i]);
  21. thread.start();
  22. }
  23. }
  24. public static void build(Task[] tasks, int lo, int hi, Phaser ph) {
  25. if (hi - lo > TASKS_PER_PHASER) {
  26. for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
  27. int j = Math.min(i + TASKS_PER_PHASER, hi);
  28. build(tasks, i, j, new Phaser(ph));
  29. }
  30. } else {
  31. for (int i = lo; i < hi; ++i)
  32. tasks[i] = new Task(i, ph);
  33. }
  34. }
  35. public static class Task implements Runnable {
  36. //
  37. private final int id;
  38. private final Phaser phaser;
  39. public Task(int id, Phaser phaser) {
  40. this.id = id;
  41. this.phaser = phaser;
  42. this.phaser.register();
  43. }
  44. @Override
  45. public void run() {
  46. while (!phaser.isTerminated()) {
  47. try {
  48. Thread.sleep(200);
  49. } catch (InterruptedException e) {
  50. // NOP
  51. }
  52. System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);
  53. phaser.arriveAndAwaitAdvance();
  54. }
  55. }
  56. }
  57. }

需要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),可以考虑使用较小的TASKS_PER_PHASER值,例如4。反之可以适当增大TASKS_PER_PHASER。

时间: 2024-10-13 11:30:43

Java 7的并发编程-Phaser的相关文章

Java线程与并发编程实践----同步器(交换器、信号量)

一.交换器 交换器提供了一个线程之间能够交换对象的同步点.每个线程都会往这个 交换器的exchange()方法传入一些对象,匹配伙伴线程,同时接受伙伴对象作为返 回值.java.util.conurrent.Exchange<V>实现了交换器. 下面是一个代码小实例: import java.util.concurrent.Exchanger;   import java.util.concurrent.ExecutorService;   import java.util.concurren

Java线程与并发编程实践----锁框架

Java.util.concurrent.locks包提供了一个包含多种接口和类的框架,它 针对条件进行加锁和等待.不同于对象的内置加锁同步以及java.lang.Object的等 待/通知机制,包含锁框架的并发工具类通过轮询锁.显示等待及其它方式改善这种 机制. 锁框架包含了经常使用的锁.重入锁.条件.读写锁以及冲入读写锁等类别. 一.锁(Lock) Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作.此实 现允许更灵活的结构,可以具有差别很大的属性,可以

Java多线程视频教程并发编程面试知识

课程目录:  1-1.并发编程入门到实战课程简介1-2.什么是并发编程1-3.并发编程的挑战之频繁的上下文切换1-4.并发编程的挑战之死锁1-5.并发编程的挑战之线程安全1-6.并发编程的挑战之资源限制2-1.进程与线程的区别2-2.线程的状态及其相互转换2-3.创建线程的方式(上)2-4.创建线程的方式(下)2-5.线程的挂起及其恢复2-6.线程的中断操作2-7.线程的优先级2-8.守护线程3-1.什么是线程安全性3-2.从字节码角度剖析线程不安全操作3-3.原子性操作3-4.深入理解sync

Java线程与并发编程实践----并发工具类与Executor框架

java5之前,我们使用诸如synchronized,wait(),notify()方法对线程的操作属于对 底层线程的操作,这样会出现很多的问题: 低级的并发原语,比如synchronized,wait(),notify()经常难以正确使用.误用会导致 竞态条件,线程饿死,死锁等风险. 泰国依赖synchronized会影响程序性能以及程序的可扩展性 开发者经常需要高级线程结构,如线程池,信号量.java对底层线程的操作不包含这些结. 为解决这些问题,java5引入并发工具类,该工具类主要有下面

JAVA多线程之并发编程三大核心问题

概述 并发编程是Java语言的重要特性之一,它能使复杂的代码变得更简单,从而极大的简化复杂系统的开发.并发编程可以充分发挥多处理器系统的强大计算能力,随着处理器数量的持续增长,如何高效的并发变得越来越重要.但是开发难,并发更难,因为并发程序极易出现bug,这些bug是比较诡异的,跟踪难,且难以复现.如果要解决这些问题就要正确的发现这些问题,这就需要弄清并发编程的本质,以及并发编程要解决什么问题.本文主要讲解并发要解决的三大问题:原子性.可见性.有序性. 基本概念 硬件的发展 硬件的发展中,一直存

Java线程与并发编程实践----同步器(倒计时门闩)

Java提供的synchronized关键字对临界区进行线程同步访问.由于基于synchronized很难 正确编写同步代码,并发工具类提供了高级的同步器.倒计时门闩(countdown latch).同步屏 障(cyclic barrier).交换器(exchanger).信号量(semaphore)以及phaser同步器.下面主要 介绍倒计时门闩. 倒计时门闩会导致一条或多条线程在"门口"一直等待,直到另一条线程打开这扇门,线程 才得以继续运行.他是由一个计数变量和两个操作组成的,

Java多线程、并发编程知识点小结

1.线程的状态    1.1创建 线程 的两种方式,接口和线程类.利用接口的好处:更好的体现面向对象的思想,可以避免由于Java的单继承特性而带来的局限: 增强程序的健壮性,代码能够被多个线程共享,代码与数据是独立的:(同步问题)适合多个相同程序代码的线程区处理同一资源的情况.    1.2线程就绪等待调度运行start()方法. 1.3线程的中断 这里需要注意的是,如果只是单纯的调用interrupt()方法,线程并没有实际被中断,会继续往下执行. 1.4.线程挂起和恢复(挂起还拥有对象锁,死

Java内存模型---并发编程网 - ifeve.com

Java内存模型 转自:http://ifeve.com/java-memory-model-6/ 原文地址  作者:Jakob Jenkov 译者:张坤 Java内存模型规范了Java虚拟机与计算机内存是如何协同工作的.Java虚拟机是一个完整的计算机的一个模型,因此这个模型自然也包含一个内存模型——又称为Java内存模型. 如果你想设计表现良好的并发程序,理解Java内存模型是非常重要的.Java内存模型规定了如何和何时可以看到由其他线程修改过后的共享变量的值,以及在必须时如何同步的访问共享

Java线程与并发编程实践----额外的并发工具类

一.并发集合 java.util包下提供了很多的集合类,如ArrayList.TreeSet.HashMap,但是这些 集合都是非线程安全的,并且对于单列集合的迭代器,采用的是快速失败机制,当正在迭代 遍历的集合被其它线程修改时,便会抛出 java.util.ConcurrentModificationException. 这显然对于多线程操作的集合是十分不方便的,但早Colections这个工具类中有方法可以返回 线程安全的集合,然而这种集合对于高并发环境,性能十分低下. 于是,java.ut