Java Concurrency - Phaser, Controlling phase change in concurrent phased tasks

The Phaser class provides a method that is executed each time the phaser changes the phase. It‘s the onAdvance() method. It receives two parameters: the number of the current phase and the number of registered participants; it returns a Boolean value, false if the phaser continues its execution, or true if the phaser has finished and has to enter into the termination state.

The default implementation of this method returns true if the number of registered participants is zero, and false otherwise. But you can modify this behavior if you extend the Phaser class and you override this method. Normally, you will be interested in doing this when you have to execute some actions when you advance from one phase to the next one.

In this recipe, you will learn how to control the phase change in a phaser that is implementing your own version of the Phaser class that overrides the onAdvance() method to execute some actions in every phase change. You are going to implement a simulation of an exam, where there will be some students who have to do three exercises. All the students have to finish one exercise before they can proceed with the next one.

1. Create a class named MyPhaser and specify that it extends from the Phaser class, override the onAdvance() method.

package com.packtpub.java7.concurrency.chapter3.recipe6.task;

import java.util.concurrent.Phaser;

/**
 * Implements a subclass of the Phaser class. Overrides the onAdvance method to control
 * the change of phase
 *
 */
public class MyPhaser extends Phaser {

    /**
     * This method is called when the last register thread calls one of the advance methods
     * in the actual phase
     * @param phase Actual phase
     * @param registeredParties Number of registered threads
     * @return false to advance the phase, true to finish
     */
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        switch (phase) {
        case 0:
            return studentsArrived();
        case 1:
            return finishFirstExercise();
        case 2:
            return finishSecondExercise();
        case 3:
            return finishExam();
        default:
            return true;
        }
    }

    /**
     * This method is called in the change from phase 0 to phase 1
     * @return false to continue with the execution
     */
    private boolean studentsArrived() {
        System.out.printf("Phaser: The exam are going to start. The students are ready.\n");
        System.out.printf("Phaser: We have %d students.\n",getRegisteredParties());
        return false;
    }

    /**
     * This method is called in the change from phase 1 to phase 2
     * @return false to continue with the execution
     */
    private boolean finishFirstExercise() {
        System.out.printf("Phaser: All the students has finished the first exercise.\n");
        System.out.printf("Phaser: It‘s turn for the second one.\n");
        return false;
    }

    /**
     * This method is called in the change form phase 2 to phase 3
     * @return false to continue with the execution
     */
    private boolean finishSecondExercise() {
        System.out.printf("Phaser: All the students has finished the second exercise.\n");
        System.out.printf("Phaser: It‘s turn for the third one.\n");
        return false;
    }

    /**
     * This method is called in the change from phase 3 to phase 4
     * @return true. There are no more phases
     */
    private boolean finishExam() {
        System.out.printf("Phaser: All the students has finished the exam.\n");
        System.out.printf("Phaser: Thank you for your time.\n");
        return true;
    }

}

2. Create a class named Student and specify that it implements the Runnable interface. This class will simulate the students of the exam.

package com.packtpub.java7.concurrency.chapter3.recipe6.task;

import java.util.Date;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * This class implements an student in the exam
 *
 */
public class Student implements Runnable {

    /**
     * Phaser to control the execution
     */
    private Phaser phaser;

    /**
     * Constructor of the class. Initialize its objects
     * @param phaser Phaser to control the execution
     */
    public Student(Phaser phaser) {
        this.phaser=phaser;
    }

    /**
     * Main method of the student. It arrives to the exam and does three exercises. After each
     * exercise, it calls the phaser to wait that all the students finishes the same exercise
     */
    public void run() {
        System.out.printf("%s: Has arrived to do the exam. %s\n",Thread.currentThread().getName(),new Date());
        phaser.arriveAndAwaitAdvance();
        System.out.printf("%s: Is going to do the first exercise. %s\n",Thread.currentThread().getName(),new Date());
        doExercise1();
        System.out.printf("%s: Has done the first exercise. %s\n",Thread.currentThread().getName(),new Date());
        phaser.arriveAndAwaitAdvance();
        System.out.printf("%s: Is going to do the second exercise. %s\n",Thread.currentThread().getName(),new Date());
        doExercise2();
        System.out.printf("%s: Has done the second exercise. %s\n",Thread.currentThread().getName(),new Date());
        phaser.arriveAndAwaitAdvance();
        System.out.printf("%s: Is going to do the third exercise. %s\n",Thread.currentThread().getName(),new Date());
        doExercise3();
        System.out.printf("%s: Has finished the exam. %s\n",Thread.currentThread().getName(),new Date());
        phaser.arriveAndAwaitAdvance();
    }

    /**
     * Does an exercise is to wait a random time
     */
    private void doExercise1() {
        try {
            Long duration=(long)(Math.random()*10);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Does an exercise is wait a random time
     */
    private void doExercise2() {
        try {
            Long duration=(long)(Math.random()*10);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Does an exercise is wait a random time
     */
    private void doExercise3() {
        try {
            Long duration=(long)(Math.random()*10);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

3. Implement the main class of the example by creating a class named Main.

package com.packtpub.java7.concurrency.chapter3.recipe6.core;

import com.packtpub.java7.concurrency.chapter3.recipe6.task.MyPhaser;
import com.packtpub.java7.concurrency.chapter3.recipe6.task.Student;

/**
 * Main class of the example
 *
 */
public class Main {

    /**
     * Main method of the example
     * @param args
     */
    public static void main(String[] args) {

        // Creates the Phaser
        MyPhaser phaser=new MyPhaser();

        // Creates 5 students and register them in the phaser
        Student students[]=new Student[5];
        for (int i=0; i<students.length; i++){
            students[i]=new Student(phaser);
            phaser.register();
        }

        // Create 5 threads for the students and start them
        Thread threads[]=new Thread[students.length];
        for (int i=0; i<students.length; i++) {
            threads[i]=new Thread(students[i],"Student "+i);
            threads[i].start();
        }

        // Wait for the finalization of the threads
        for (int i=0; i<threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // Check that the Phaser is in the Terminated state
        System.out.printf("Main: The phaser has finished: %s.\n",phaser.isTerminated());

    }

}

This exercise simulates the realization of an exam that has three exercises. All the students have to finish one exercise before they can start the next one. To implement this synchronization requirement, we use the Phaser class, but you have implemented your own phaser extending the original class to override the onAdvance() method.

This method is called by the phaser before making a phase change and before waking up all the threads that were sleeping in the arriveAndAwaitAdvance() method. This method receives as parameters the number of the actual phase, where 0 is the number of the first phase and the number of registered participants. The most useful parameter is the actual phase. If you execute a different operation depending on the actual phase, you have to use an alternative structure (if/else or switch) to select the operation you want to execute. In the example, we used a switch structure to select a different method for each change of phase.

The onAdvance() method returns a Boolean value that indicates if the phaser has terminated or not. If the phaser returns a false value, it indicates that it hasn‘t terminated, so the threads will continue with the execution of other phases. If the phaser returns a true value, then the phaser still wakes up the pending threads, but moves the phaser to the terminated state, so all the future calls to any method of the phaser will return immediately, and the isTerminated() method returns the true value.

In the Core class, when you created the MyPhaser object, you didn‘t specify the number of participants in the phaser. You made a call to the register() method for every Student object created to register a participant in the phaser. This calling doesn‘t establish a relation between the Student object or the thread that executes it and the phaser. Really, the number of participants in a phaser is only a number. There is no relationship between the phaser and the participants.

时间: 2024-08-07 08:23:31

Java Concurrency - Phaser, Controlling phase change in concurrent phased tasks的相关文章

Java Concurrency - 浅析 Phaser 的用法

One of the most complex and powerful functionalities offered by the Java concurrency API is the ability to execute concurrent-phased tasks using the Phaser class. This mechanism is useful when we have some concurrent tasks divided into steps. The Pha

Java concurrency (multi-threading) - Tutorial

Java concurrency (multi-threading) This article describes how to do concurrent programming with Java. It covers the concepts of parallel programming, immutability, threads, the executor framework (thread pools), futures, callables and the fork-join f

深入浅出 Java Concurrency (35): 线程池 part 8 线程池的实现及原理 (3)[转]

线程池任务执行结果 这一节来探讨下线程池中任务执行的结果以及如何阻塞线程.取消任务等等. 1 package info.imxylz.study.concurrency.future;2 3 public class SleepForResultDemo implements Runnable {4 5     static boolean result = false;6 7     static void sleepWhile(long ms) {8         try {9      

深入浅出 Java Concurrency (33): 线程池 part 6 线程池的实现及原理 (1)[转]

线程池数据结构与线程构造方法 由于已经看到了ThreadPoolExecutor的源码,因此很容易就看到了ThreadPoolExecutor线程池的数据结构.图1描述了这种数据结构. 图1 ThreadPoolExecutor 数据结构 其实,即使没有上述图形描述ThreadPoolExecutor的数据结构,我们根据线程池的要求也很能够猜测出其数据结构出来. 线程池需要支持多个线程并发执行,因此有一个线程集合Collection<Thread>来执行线程任务: 涉及任务的异步执行,因此需要

深入浅出 Java Concurrency (34): 线程池 part 7 线程池的实现及原理 (2)[转]

线程池任务执行流程 我们从一个API开始接触Executor是如何处理任务队列的. java.util.concurrent.Executor.execute(Runnable) Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execut

深入浅出 Java Concurrency (一) ----原子操作

[-] part1 从AtomicInteger开始 part 2 数组引用的原子操作 part3 指令重排序与happens-before法则 part 4 CAS操作 part1 从AtomicInteger开始 从相对简单的Atomic入手(java.util.concurrent是基于Queue的并发包,而Queue,很多情况下使用到了Atomic操作,因此首先从这里开始).很多情况下我们只是需要一个简单的.高效的.线程安全的递增递减方案.注意,这里有三个条件:简单,意味着程序员尽可能少

《深入浅出 Java Concurrency》——原子操作

part1 从AtomicInteger開始 从相对简单的Atomic入手(java.util.concurrent是基于Queue的并发包.而Queue.非常多情况下使用到了Atomic操作.因此首先从这里開始).非常多情况下我们仅仅是须要一个简单的.高效的.线程安全的递增递减方案. 注意,这里有三个条件:简单,意味着程序猿尽可能少的操作底层或者实现起来要比較easy:高效意味着耗用资源要少.程序处理速度要快.线程安全也非常重要,这个在多线程下能保证数据的正确性.这三个条件看起来比較简单,可是

Java Concurrency(二)——J.U.C atomic包源码解读

java5之后的java.util.concurrent包(J.U.C)是世界级并发大师Doug Lea的作品,里面主要实现了 atomic包里Integer/Long对应的原子类,主要基于CAS: 一些同步子,包括Lock,CountDownLatch,Semaphore,FutureTask等,这些都是基于AbstractQueuedSynchronizer类: 关于线程执行的Executors类等: 一些并发的集合类,比如ConcurrentHashMap,ConcurrentLinked

深入浅出 Java Concurrency (28): 线程池 part 1 简介[转]

从这一节开始正式进入线程池的部分.其实整个体系已经拖了很长的时间,因此后面的章节会加快速度,甚至只是一个半成品或者简单化,以后有时间的慢慢补充.完善. 其实线程池是并发包里面很重要的一部分,在实际情况中也是使用很多的一个重要组件. 下图描述的是线程池API的一部分.广义上的完整线程池可能还包括Thread/Runnable.Timer/TimerTask等部分.这里只介绍主要的和高级的API以及架构和原理. 大多数并发应用程序是围绕执行任务(Task)进行管理的.所谓任务就是抽象.离散的工作单元