在Phaser类中,我们在每个线程中,每个线程进行完一个阶段完成后都会等待其他线程完成后再一起进行,当所
有线程都完成了一个任务的时候,会调用Phaser的onAdvance方法,如果我们想在每个阶段,所有线程都完成他们的阶
段工作后做点啥事的话,那就得继承Phaser类来重写Onadvance这个方法来实现我们的目的,下面我们用一个例子来说
明,例子就是模拟多个学生考试,考试分为三个阶段,每个阶段完成后,都会等待所有的学生完成,同时我们希望在每
一个阶段,所有的学生完成一个阶段的任务后打印出几句话,下面看代码。
package com.bird.concursey.charpet5; import java.util.Date; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; /** * This class will simulate the students of the exam * * @author bird 2014年9月23日 下午8:01:47 */ public class Student implements Runnable { private Phaser phaser; public Student(Phaser phaser) { super(); this.phaser = phaser; } @Override public void run() { System.out.printf("%s: Has arrived to do the exam.%s\n", Thread .currentThread().getName(), new Date()); phaser.arriveAndAwaitAdvance(); System.out.printf("%s: Is going to do the first exercise.%s\n", Thread .currentThread().getName(), new Date()); doExercise1(); System.out.printf("%s: Has done the first exercise.%s\n", Thread .currentThread().getName(), new Date()); phaser.arriveAndAwaitAdvance(); System.out.printf("%s: Is going to do the second exercise.%s\n", Thread .currentThread().getName(), new Date()); doExercise2(); System.out.printf("%s: Has done the second exercise.%s\n", Thread .currentThread().getName(), new Date()); phaser.arriveAndAwaitAdvance(); System.out.printf("%s: Is going to do the third exercise.%s\n", Thread .currentThread().getName(), new Date()); doExercise3(); System.out.printf("%s: Has done the third exercise.%s\n", Thread .currentThread().getName(), new Date()); phaser.arriveAndAwaitAdvance(); } private void doExercise3() { try { long duration = (long) (Math.random() * 10); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } private void doExercise2() { try { long duration = (long) (Math.random() * 10); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } private void doExercise1() { try { long duration = (long) (Math.random() * 10); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.bird.concursey.charpet5; import java.util.concurrent.Phaser; public class MyPhaser extends Phaser { @Override protected boolean onAdvance(int phase, int registeredParties) { switch (phase) { case 0: return studentsArrived(); case 1: return finishFirstExercise(); case 2: return finishSecondExercise(); case 3: return finishExam(); default: return true; } } private boolean finishExam() { System.out.printf("Phaser: All the students have finished the exam.\n"); System.out.printf("Phaser: Thank you for your time.\n"); return false; } private boolean finishSecondExercise() { System.out .printf("Phaser: All the students have finished the second exercise.\n"); System.out.printf("Phaser: It's time for the third one.\n"); return false; } private boolean finishFirstExercise() { System.out .printf("Phaser: All the students have finished the first exercise.\n"); System.out.printf("Phaser: It's time for the second one.\n"); return false; } /** * It writes two log messages to the console and returns the false value to * indicate that the phaser continues with its execution. * * @return */ private boolean studentsArrived() { System.out .printf("Phaser: The exam are going to start. The students are ready.\n"); System.out.printf("Phaser: We have %d students.\n", getRegisteredParties()); return false; } public static void main(String[] args) { MyPhaser phaser = new MyPhaser(); Student students[] = new Student[5]; for (int i = 0; i < students.length; i++) { students[i] = new Student(phaser); phaser.register(); } Thread threads[] = new Thread[students.length]; for (int i = 0; i < students.length; i++) { threads[i] = new Thread(students[i], "Student " + i); threads[i].start(); } for (int i = 0; i < threads.length; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.printf("Main: The phaser has finished: %s.\n",phaser.isTerminated()); } }
运行结果,
Student 0: Has arrived to do the exam.Tue Sep 23 20:11:50 CST 2014 Student 3: Has arrived to do the exam.Tue Sep 23 20:11:50 CST 2014 Student 2: Has arrived to do the exam.Tue Sep 23 20:11:50 CST 2014 Student 4: Has arrived to do the exam.Tue Sep 23 20:11:50 CST 2014 Student 1: Has arrived to do the exam.Tue Sep 23 20:11:50 CST 2014 Phaser: The exam are going to start. The students are ready. Phaser: We have 5 students. Student 1: Is going to do the first exercise.Tue Sep 23 20:11:50 CST 2014 Student 3: Is going to do the first exercise.Tue Sep 23 20:11:50 CST 2014 Student 2: Is going to do the first exercise.Tue Sep 23 20:11:50 CST 2014 Student 4: Is going to do the first exercise.Tue Sep 23 20:11:50 CST 2014 Student 0: Is going to do the first exercise.Tue Sep 23 20:11:50 CST 2014 Student 4: Has done the first exercise.Tue Sep 23 20:11:52 CST 2014 Student 0: Has done the first exercise.Tue Sep 23 20:11:52 CST 2014 Student 1: Has done the first exercise.Tue Sep 23 20:11:52 CST 2014 Student 2: Has done the first exercise.Tue Sep 23 20:11:59 CST 2014
This exercise simulates the realization of an exam that has three exercises. All the
students have to finish one exercise before they can start the next one. To implement this
synchronization requirement, we use the Phaser class, but you have implemented your own
phaser extending the original class to override the onAdvance() method.
This method is called by the phaser before making a phase change and before waking up all
the threads that were sleeping in the arriveAndAwaitAdvance() method. This method
receives as parameters the number of the actual phase, where 0 is the number of the first
phase and the number of registered participants. The most useful parameter is the actual
phase. If you execute a different operation depending on the actual phase, you have to use an
alternative structure (if/else or switch) to select the operation you want to execute. In the
example, we used a switch structure to select a different method for each change of phase.
The onAdvance() method returns a Boolean value that indicates if the phaser has
terminated or not. If the phaser returns a false value, it indicates that it hasn‘t terminated,
so the threads will continue with the execution of other phases. If the phaser returns a true
value, then the phaser still wakes up the pending threads, but moves the phaser to the
terminated state, so all the future calls to any method of the phaser will return immediately,
and the isTerminated() method returns the true value.
In the Core class, when you created the MyPhaser object, you didn‘t specify the number of
participants in the phaser. You made a call to the register() method for every Student
object created to register a participant in the phaser. This calling doesn‘t establish a relation
between the Student object or the thread that executes it and the phaser. Really, the
number of participants in a phaser is only a number. There is no relationship between the
phaser and the participants.