Java Concurrency - 浅析 Phaser 的用法

One of the most complex and powerful functionalities offered by the Java concurrency API is the ability to execute concurrent-phased tasks using the Phaser class. This mechanism is useful when we have some concurrent tasks divided into steps. The Phaser class provides us with the mechanism to synchronize the threads at the end of each step, so no thread starts its second step until all the threads have finished the first one.

As with other synchronization utilities, we have to initialize the Phaser class with the number of tasks that participate in the synchronization operation, but we can dynamically modify this number by increasing or decreasing it.

In this recipe, you will learn how to use the Phaser class to synchronize three concurrent tasks. The three tasks look for files with the extension .log modified in the last 24 hours in three different folders and their subfolders. This task is divided into three steps:

  1. Get a list of the files with the extension .log in the assigned folder and its subfolders.
  2. Filter the list created in the first step by deleting the files modified more than 24 hours ago.
  3. Print the results in the console.

At the end of the steps 1 and 2 we check if the list has any elements or not. If it hasn‘t any element, the thread ends its execution and is eliminated from the the phaser class.

1. Create a class named FileSearch and specify that it implements the Runnable interface. This class implements the operation of searching for files with a determined extension modified in the last 24 hours in a folder and its subfolders.

package com.packtpub.java7.concurrency.chapter3.recipe5.task;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * This class search for files with an extension in a directory
 */
public class FileSearch implements Runnable {

    /**
     * Initial path for the search
     */
    private String initPath;

    /**
     * Extension of the file we are searching for
     */
    private String end;

    /**
     * List that stores the full path of the files that have the extension we are searching for
     */
    private List<String> results;

    /**
     * Phaser to control the execution of the FileSearch objects. Their execution will be divided
     * in three phases
     *  1st: Look in the folder and its subfolders for the files with the extension
     *  2nd: Filter the results. We only want the files modified today
     *  3rd: Print the results
     */
    private Phaser phaser;

    /**
     * Constructor of the class. Initializes its attributes
     * @param initPath Initial path for the search
     * @param end Extension of the files we are searching for
     * @param phaser Phaser object to control the execution
     */
    public FileSearch(String initPath, String end, Phaser phaser) {
        this.initPath = initPath;
        this.end = end;
        this.phaser=phaser;
        results=new ArrayList<>();
    }

    /**
     * Main method of the class. See the comments inside to a better description of it
     */
    @Override
    public void run() {

        // Waits for the creation of all the FileSearch objects
        phaser.arriveAndAwaitAdvance();

        System.out.printf("%s: Starting.\n",Thread.currentThread().getName());

        // 1st Phase: Look for the files
        File file = new File(initPath);
        if (file.isDirectory()) {
            directoryProcess(file);
        }

        // If no results, deregister in the phaser and ends
        if (!checkResults()){
            return;
        }

        // 2nd Phase: Filter the results
        filterResults();

        // If no results after the filter, deregister in the phaser and ends
        if (!checkResults()){
            return;
        }

        // 3rd Phase: Show info
        showInfo();
        phaser.arriveAndDeregister();
        System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());

    }

    /**
     * This method prints the final results of the search
     */
    private void showInfo() {
        for (int i=0; i<results.size(); i++){
            File file=new File(results.get(i));
            System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
        }
        // Waits for the end of all the FileSearch threads that are registered in the phaser
        phaser.arriveAndAwaitAdvance();
    }

    /**
     * This method checks if there are results after the execution of a phase. If there aren‘t
     * results, deregister the thread of the phaser.
     * @return true if there are results, false if not
     */
    private boolean checkResults() {
        if (results.isEmpty()) {
            System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
            System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
            // No results. Phase is completed but no more work to do. Deregister for the phaser
            phaser.arriveAndDeregister();
            return false;
        } else {
            // There are results. Phase is completed. Wait to continue with the next phase
            System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
            phaser.arriveAndAwaitAdvance();
            return true;
        }
    }

    /**
     * Method that filter the results to delete the files modified more than a day before now
     */
    private void filterResults() {
        List<String> newResults=new ArrayList<>();
        long actualDate=new Date().getTime();
        for (int i=0; i<results.size(); i++){
            File file=new File(results.get(i));
            long fileDate=file.lastModified();

            if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
                newResults.add(results.get(i));
            }
        }
        results=newResults;
    }

    /**
     * Method that process a directory
     *
     * @param file
     *            : Directory to process
     */
    private void directoryProcess(File file) {

        // Get the content of the directory
        File list[] = file.listFiles();
        if (list != null) {
            for (int i = 0; i < list.length; i++) {
                if (list[i].isDirectory()) {
                    // If is a directory, process it
                    directoryProcess(list[i]);
                } else {
                    // If is a file, process it
                    fileProcess(list[i]);
                }
            }
        }
    }

    /**
     * Method that process a File
     *
     * @param file
     *            : File to process
     */
    private void fileProcess(File file) {
        if (file.getName().endsWith(end)) {
            results.add(file.getAbsolutePath());
        }
    }

}

2. Implement the main class of the example by creating a class named Main.

package com.packtpub.java7.concurrency.chapter3.recipe5.core;

import java.util.concurrent.Phaser;

import com.packtpub.java7.concurrency.chapter3.recipe5.task.FileSearch;

/**
 * Main class of the example
 *
 */
public class Main {

    /**
     * Main method of the example
     * @param args
     */
    public static void main(String[] args) {

        // Creates a Phaser with three participants
        Phaser phaser=new Phaser(3);

        // Creates 3 FileSearch objects. Each of them search in different directory
        FileSearch system=new FileSearch("C:\\Windows", "log", phaser);
        FileSearch apps=new FileSearch("C:\\Program Files","log",phaser);
        FileSearch documents=new FileSearch("C:\\Documents And Settings","log",phaser);

        // Creates a thread to run the system FileSearch and starts it
        Thread systemThread=new Thread(system,"System");
        systemThread.start();

        // Creates a thread to run the apps FileSearch and starts it
        Thread appsThread=new Thread(apps,"Apps");
        appsThread.start();

        // Creates a thread to run the documents  FileSearch and starts it
        Thread documentsThread=new Thread(documents,"Documents");
        documentsThread.start();
        try {
            systemThread.join();
            appsThread.join();
            documentsThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("Terminated: %s\n",phaser.isTerminated());

    }

}

The program starts creating a Phaser object that will control the synchronization of the threads at the end of each phase. The constructor of Phaser receives the number of participants as a parameter. In our case, Phaser has three participants. This number indicates to Phaser the number of threads that have to execute an arriveAndAwaitAdvance() method before Phaser changes the phase and wakes up the threads that were sleeping.

Once Phaser has been created, we launch three threads that execute three different FileSearch objects.

The first instruction in the run() method of this FileSearch object is a call to the arriveAndAwaitAdvance() method of the Phaser object. As we mentioned earlier, the Phaser knows the number of threads that we want to synchronize. When a thread calls this method, Phaser decreases the number of threads that have to finalize the actual phase and puts this thread to sleep until all the remaining threads finish this phase. Calling this method at the beginning of the run() method makes none of the FileSearch threads begin their job until all the threads have been created.

At the end of phase one and phase two, we check if the phase has generated results and the list with the results has elements, or otherwise the phase hasn‘t generated results and the list is empty. In the first case, the checkResults() method calls arriveAndAwaitAdvance() as explained earlier. In the second case, if the list is empty, there‘s no point in the thread continuing with its execution, so it returns. But you have to notify the phaser that there will be one less participant. For this, we used arriveAndDeregister(). This notifies the phaser that this thread has finished the actual phase, but it won‘t participate in the future phases, so the phaser won‘t have to wait for it to continue.

At the end of the phase three implemented in the showInfo() method, there is a call to the arriveAndAwaitAdvance() method of the phaser. With this call, we guarantee that all the threads finish at the same time. When this method ends its execution, there is a call to the arriveAndDeregister() method of the phaser. With this call, we deregister the threads of the phaser as we explained before, so when all the threads finish, the phaser will have zero participants.

Finally, the main() method waits for the completion of the three threads and calls the isTerminated() method of the phaser. When a phaser has zero participants, it enters the so called termination state and this method returns true. As we deregister all the threads of the phaser, it will be in the termination state and this call will print true to the console.

A Phaser object can be in two states:

Active: Phaser enters this state when it accepts the registration of new participants and its synchronization at the end of each phase. In this state, Phaser works as it has been explained in this recipe. This state is not mentioned in the Java concurrency API.

Termination: By default, Phaser enters in this state when all the participants in Phaser have been deregistered, so Phaser has zero participants. More in detail, Phaser is in the termination state when the method onAdvance() returns the true value. If you override that method, you can change the default behavior. When Phaser is on this state, the synchronization method arriveAndAwaitAdvance() returns immediately without doing any synchronization operation.

A notable feature of the Phaser class is that you haven‘t had to control any exception from the methods related with the phaser. Unlike other synchronization utilities, threads that are sleeping in a phaser don‘t respond to interruption events and don‘t throw an InterruptedException exception.

The Phaser class provides other methods related to the change of phase. These methods are as follows:

arrive(): This method notifies the phaser that one participant has finished the actual phase, but it should not wait for the rest of the participants to continue with its execution. Be careful with the utilization of this method, because it doesn‘t synchronize with other threads.

awaitAdvance(int phase): This method puts the current thread to sleep until all the participants of the phaser have finished the current phase of the phaser, if the number we pass as the parameter is equal to the actual phase of the phaser. If the parameter and the actual phase of the phaser aren‘t equal, the method returns immediately.

awaitAdvanceInterruptibly(int phaser): This method is equal to the method explained earlier, but it throws an InterruptedException exception if the thread that is sleeping in this method is interrupted.

Registering participants in the Phaser

When you create a Phaser object, you indicate how many participants will have that phaser. But the Phaser class has two methods to increment the number of participants of a phaser. These methods are as follows:

  • register(): This method adds a new participant to Phaser. This new participant will be considered as unarrived to the actual phase.
  • bulkRegister(int Parties): This method adds the specified number of participants to the phaser. These new participants will be considered as unarrived to the actual phase.

The only method provided by the Phaser class to decrement the number of participants is the arriveAndDeregister() method that notifies the phaser that the thread has finished the actual phase, and it doesn‘t want to continue with the phased operation.

Forcing the termination of a Phaser

When a phaser has zero participants, it enters a state denoted by Termination. The Phaser class provides forceTermination() to change the status of the phaser and makes it enter in the Termination state independently of the number of participants registered in the phaser. This mechanism may be useful when one of the participants has an error situation, to force the termination of the phaser.

When a phaser is in the Termination state, the awaitAdvance() and arriveAndAwaitAdvance() methods immediately return a negative number, instead of a positive one that returns normally. If you know that your phaser could be terminated, you should verify the return value of those methods to know if the phaser has been terminated.

时间: 2024-11-07 23:55:00

Java Concurrency - 浅析 Phaser 的用法的相关文章

Java Concurrency - 浅析 CountDownLatch 的用法

CountDownLatch 是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 用给定的计数初始化 CountDownLatch.由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞.之后,会释放所有等待的线程,await 的所有后续调用都将立即返回.这种现象只出现一次——计数无法被重置.如果需要重置计数,请考虑使用 CyclicBarrier. CountDownLatch 是一个通用同步工具,它有很多用途.将

Java Concurrency - Phaser, Controlling phase change in concurrent phased tasks

The Phaser class provides a method that is executed each time the phaser changes the phase. It's the onAdvance() method. It receives two parameters: the number of the current phase and the number of registered participants; it returns a Boolean value

深入浅出 Java Concurrency (35): 线程池 part 8 线程池的实现及原理 (3)[转]

线程池任务执行结果 这一节来探讨下线程池中任务执行的结果以及如何阻塞线程.取消任务等等. 1 package info.imxylz.study.concurrency.future;2 3 public class SleepForResultDemo implements Runnable {4 5     static boolean result = false;6 7     static void sleepWhile(long ms) {8         try {9      

深入浅出 Java Concurrency (33): 线程池 part 6 线程池的实现及原理 (1)[转]

线程池数据结构与线程构造方法 由于已经看到了ThreadPoolExecutor的源码,因此很容易就看到了ThreadPoolExecutor线程池的数据结构.图1描述了这种数据结构. 图1 ThreadPoolExecutor 数据结构 其实,即使没有上述图形描述ThreadPoolExecutor的数据结构,我们根据线程池的要求也很能够猜测出其数据结构出来. 线程池需要支持多个线程并发执行,因此有一个线程集合Collection<Thread>来执行线程任务: 涉及任务的异步执行,因此需要

Java EE学习--Quartz基本用法

新浪博客完全不适合写技术类文章.本来是想找一个技术性的博客发发自己最近学的东西,发现博客园起源于咱江苏,一个非常质朴的网站,行,咱要养成好习惯,以后没事多总结总结经验吧.很多时候都在网上搜索别人的总结,我自己也总结些东西,或许多多少少能帮得上别人. 首先提到的是Quartz,一个开源的定期执行计划任务的框架.其实我内心好奇这个框架很久了,像那些能定时修改数据库数据,定时分配任务的功能一直觉得很神奇.心动不如行动,今天我就小小的学习了一下用法,力求言简意赅,大家都懂的我就不说了. 第一步:下载Qu

JAVA数组的定义及用法

数组是有序数据的集合,数组中的每一个元素具有同样的数组名和下标来唯一地确定数组中的元素. 1. 一维数组 1.1 一维数组的定义 type arrayName[]; type[] arrayName; 当中类型(type)能够为Java中随意的数据类型,包含简单类型组合类型,数组名arrayName为一个合法的标识符,[]指明该变量是一个数组类型变量. 另外一种形式对C++开发人员可能认为非常奇怪,只是对JAVA或C#这种开发语言来说,另外一种形式可能更直观,由于这里定义的仅仅是个变量而已,系统

Java HashSet和LinkedHashSet的用法

Java HashSet和LinkedHashSet的用法 类HashSet和LinkedHashSet都是接口Set的实现,两者都不能保存重复的数据.主要区别是HashSet不保证集合中元素的顺序,即不能保证迭代的顺序与插入的顺序一致. 而LinkedHashSet按照元素插入的顺序进行迭代,即迭代输出的顺序与插入的顺序保持一致. 以下是HastSet和LinkedHashSet的用法示例: [java] view plain copy import java.util.Collections

深入浅出 Java Concurrency (28): 线程池 part 1 简介[转]

从这一节开始正式进入线程池的部分.其实整个体系已经拖了很长的时间,因此后面的章节会加快速度,甚至只是一个半成品或者简单化,以后有时间的慢慢补充.完善. 其实线程池是并发包里面很重要的一部分,在实际情况中也是使用很多的一个重要组件. 下图描述的是线程池API的一部分.广义上的完整线程池可能还包括Thread/Runnable.Timer/TimerTask等部分.这里只介绍主要的和高级的API以及架构和原理. 大多数并发应用程序是围绕执行任务(Task)进行管理的.所谓任务就是抽象.离散的工作单元

深入浅出 Java Concurrency (34): 线程池 part 7 线程池的实现及原理 (2)[转]

线程池任务执行流程 我们从一个API开始接触Executor是如何处理任务队列的. java.util.concurrent.Executor.execute(Runnable) Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execut