java并发之同步辅助类Phaser

Phaser含义:

更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。

函数:
arriveAndAwaitAdvance():类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行。
arriveAndDeregister():把执行到此的线程从Phaser中注销掉。
isTerminated():判断Phaser是否终止。
register():将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程。
forceTermination():强制Phaser进入终止态。
... ...

例子
使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:
1、在执行的文件夹及其子文件夹中获取扩展名为.log的文件
2、对每一步的结果进行过滤,删除修改时间超过24小时的文件
3、将结果打印到控制台
在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)

文件查找类:

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class FileSearch implements Runnable {

private String initPath;

private String end;

private List<String> results;

private Phaser phaser;

public FileSearch(String initPath, String end, Phaser phaser) {
    this.initPath = initPath;
    this.end = end;
    this.phaser=phaser;
    results=new ArrayList<>();
}
@Override
public void run() {

    phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了

    System.out.printf("%s: Starting.\n",Thread.currentThread().getName());

    // 1st Phase: 查找文件
    File file = new File(initPath);
    if (file.isDirectory()) {
        directoryProcess(file);
    }

    // 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行
    if (!checkResults()){
        return;
    }

    // 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集
    filterResults();

    // 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行
    if (!checkResults()){
        return;
    }

    // 3rd Phase: 显示结果
    showInfo();
    phaser.arriveAndDeregister();//任务完成,注销掉所有的线程
    System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
}
private void showInfo() {
    for (int i=0; i<results.size(); i++){
        File file=new File(results.get(i));
        System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
    }
    // Waits for the end of all the FileSearch threads that are registered in the phaser
    phaser.arriveAndAwaitAdvance();
}
private boolean checkResults() {
    if (results.isEmpty()) {
        System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
        System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
        //结果为空,Phaser完成并把该线程从Phaser中移除掉
        phaser.arriveAndDeregister();
        return false;
    } else {
        // 等待所有线程查找完成
        System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
        phaser.arriveAndAwaitAdvance();
        return true;
    }
}
private void filterResults() {
    List<String> newResults=new ArrayList<>();
    long actualDate=new Date().getTime();
    for (int i=0; i<results.size(); i++){
        File file=new File(results.get(i));
        long fileDate=file.lastModified();

        if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
            newResults.add(results.get(i));
        }
    }
    results=newResults;
}
private void directoryProcess(File file) {
    // Get the content of the directory
    File list[] = file.listFiles();
    if (list != null) {
        for (int i = 0; i < list.length; i++) {
            if (list[i].isDirectory()) {
                // If is a directory, process it
                directoryProcess(list[i]);
            } else {
                // If is a file, process it
                fileProcess(list[i]);
            }
        }
    }
}
private void fileProcess(File file) {
    if (file.getName().endsWith(end)) {
        results.add(file.getAbsolutePath());
    }
}
}

测试主类:

import java.util.concurrent.Phaser;

public class PhaserMain {

public static void main(String[] args) {

    Phaser phaser = new Phaser(3);

    FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
    FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
    FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);

    Thread systemThread = new Thread(system, "System");
    systemThread.start();
    Thread appsThread = new Thread(apps, "Apps");
    appsThread.start();
    Thread documentsThread = new Thread(documents, "Documents");
    documentsThread.start();
    try {
        systemThread.join();
        appsThread.join();
        documentsThread.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.printf("Terminated: %s\n", phaser.isTerminated());
}

}

海量视频获取 vue angular

原文地址:http://blog.51cto.com/13538361/2088745

时间: 2024-08-30 04:49:24

java并发之同步辅助类Phaser的相关文章

java并发之同步辅助类CountDownLatch

CountDownLatch 含义: CountDownLatch可以理解为一个计数器在初始化时设置初始值,当一个线程需要等待某些操作先完成时,需要调用await()方法.这个方法让线程进入休眠状态直到等待的所有线程都执行完成.每调用一次countDown()方法内部计数器减1,直到计数器为0时唤醒.这个可以理解为特殊的CyclicBarrier.线程同步点比较特殊,为内部计数器值为0时开始. 方法:核心方法两个:countDown()和await()countDown():使CountDown

java并发之同步辅助类CyclicBarrier

CyclicBarrier含义: 栅栏允许两个或者多个线程在某个集合点同步.当一个线程到达集合点时,它将调用await()方法等待其它的线程.线程调用await()方法后,CyclicBarrier将阻塞这个线程并将它置入休眠状态等待其它线程的到来.等最后一个线程调用await()方法时,CyclicBarrier将唤醒所有等待的线程然后这些线程将继续执行.CyclicBarrier可以传入另一个Runnable对象作为初始化参数.当所有的线程都到达集合点后,CyclicBarrier类将Run

java并发之同步辅助类

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier).它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活.CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞. cBarrierTest { public

java并发之同步辅助类semaphore

semaphore(sem??f?r)含义: 信号量就是可以声明多把锁(包括一把锁:此时为互斥信号量).举个例子:一个房间如果只能容纳5个人,多出来的人必须在门外面等着.如何去做呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一个人就取走一把钥匙,没有钥匙的不能进入该房间而是在外面等待.每出来一个人就把钥匙放回原处以方便别人再次进入. 常用方法acquire():获取信号量,信号量内部计数器减1release():释放信号量,信号量内部计数器加1tryAcquire():这个方法试图获取信号量

Java并发编程-同步辅助类之Exchanger

Exchanger是自jdk1.5起开始提供的工具套件,一般用于两个工作线程之间交换数据.在本文中我将采取由浅入深的方式来介绍分析这个工具类.首先我们来看看官方的api文档中的叙述: A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches wi

Java中的5种同步辅助类

当你使用synchronized关键字的时候,是通过互斥器来保障线程安全以及对共享资源的同步访问.线程间也经常需要更进一步的协调执行,来完成复杂的并发任务,比如wait/notify模式就是一种在多线程环境下的协调执行机制. 通过API来获取和释放锁(使用互斥器)或者调用wait/notify等方法都是底层调用的方式.进一步来说,有必要为线程同步创建更高层次的抽象.通常用到的同步辅助类,就是对2个或多个线程间的同步活动机制做进一步封装,其内部原理是通过使用现有的底层API来实现复杂的线程间的协调

JAVA线程同步辅助类CyclicBarrier循环屏障

CyclicBarrier是一个同步辅助类,主要作用是让一组线程互相等待,知道都到达一个公共障点,在一起走.在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier. CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次.若在继续所有参与线程之前更新共享状态,此屏

【Java多线程】CountDownLatch同步辅助类

CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 主要方法: public CountDownLatch(int count);  //构造方法参数, 指定了计数的次数 public void countDown();        //调用此方法,则计数减一 public void await();        //调用此方法会一直阻塞当前线程,直到计时器的值为0 代码: import java.util.concurren

JAVA线程同步辅助类CountDownLatch

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 用给定的计数 初始化 CountDownLatch.由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞.之后,会释放所有等待的线程,await 的所有后续调用都将立即返回.这种现象只出现一次——计数无法被重置.如果需要重置计数,请考虑使用 CyclicBarrier. CountDownLatch 是一个通用同步工具,它有很多用途.将计数 1 初始化的 Count