Java多线程之~~~Phaser类实现任务的同步

在多线程开发中,经常会碰到将多个任务分配给多个线程,每个线程执行他的任务,但是,每个任务又分为好几个

阶段,每个阶段期望各个线程同时达到,意思是,每一步每个线程都要同步,当有一个线程走完第一步的时候,他得等

待其他的线程都完成第一步了才能继续下一步,步调一致能解决很多问题。下面我们使用一个例子,这个例子是模拟遍

历机器上的一些文件,找出以log结尾的文件,并且他的最后修改时间为24小时以内,我们开启3个线程去完成这个任

务。并且使用Phaser来同步各个任务。

package com.bird.concursey.charpet5;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * This class implements the operation of searching for files with a determined
 * extension modified in the last 24 hours in a folder and its subfolders
 *
 * @author bird 2014年9月22日 下午10:12:18
 */
public class FileSearch implements Runnable {
	// store the folder in which the search operation will begin.
	private String initPath;
	// store the extension of the files we are going to look for.
	private String end;
	// store the full path of the files we will find with the desired
	// characteristics.
	private List<String> results;

	private Phaser phaser;

	public FileSearch(String initPath, String end, Phaser phaser) {
		this.initPath = initPath;
		this.end = end;
		this.phaser = phaser;
		this.results = new ArrayList<String>();
	}

	@Override
	public void run() {
		//The search won't begin until all the threads have been created.
		phaser.arriveAndAwaitAdvance();
		System.out.printf("%s: Starting.\n",Thread.currentThread().getName());
		File file = new File(initPath);
		if(file.isDirectory()) {
			directoryProcess(file);
		}
		if(!checkResults()) {
			return;
		}
		filterResults();
		if(!checkResults()) {
			return;
		}
		showInfo();
		phaser.arriveAndDeregister();
		System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
	}

	/**
	 * It receives a File object as a parameter and it processes all its files
	 * and subfolders.
	 *
	 * @param file
	 */
	private void directoryProcess(File file) {
		File files[] = file.listFiles();
		if (files != null) {
			for (int i = 0; i < files.length; i++) {
				if (files[i].isDirectory()) {
					directoryProcess(files[i]);
				} else {
					fileProcess(files[i]);
				}
			}
		}
	}

	/**
	 * checks if its extension is equal to the one we are looking for
	 *
	 * @param file
	 */
	private void fileProcess(File file) {
		if (file.getName().endsWith(end)) {
			results.add(file.getAbsolutePath());
		}
	}

	/**
	 * deleting the files that were modified more than 24 hours ago
	 */
	private void filterResults() {
		List<String> newResults = new ArrayList<String>();
		long actualDate = new Date().getTime();
		for (int i = 0; i < results.size(); i++) {
			File file = new File(results.get(i));
			long fileDate = file.lastModified();
			if (actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1,
					TimeUnit.DAYS)) {
				newResults.add(results.get(i));
			}
		}
		results = newResults;
	}

	private boolean checkResults() {
		if (results.isEmpty()) {
			System.out.printf("%s: Phase %d: 0 results.\n", Thread.currentThread().getName(), phaser.getPhase());
			System.out.printf("%s: Phase %d: End.\n", Thread.currentThread().getName(), phaser.getPhase());
			//停止phaser
			phaser.arriveAndDeregister();
			return false;
		}else{
			System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
			//this thread has finished the actual
			//phase and it wants to be blocked until all the participant threads in the phased
			//operation finish the actual phase.
			phaser.arriveAndAwaitAdvance();
			return true;
		}
	}

	private void showInfo() {
		for(int i = 0; i < results.size(); i++) {
			File file = new File(results.get(i));
			System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
		}
		phaser.arriveAndAwaitAdvance();
	}

	public static void main(String[] args) throws Exception {
		Phaser phaser = new Phaser(3);
		FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
		FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
		FileSearch documents = new FileSearch("C:\\ProgramData", "log", phaser);

		Thread systemThread = new Thread(system, "System");
		Thread appsThread = new Thread(apps, "appsThread");
		Thread documentsThread = new Thread(documents, "documentsThread");

		systemThread.start();
		appsThread.start();
		documentsThread.start();

		systemThread.join();
		appsThread.join();
		documentsThread.join();

		System.out.println("Terminated: "+ phaser.isTerminated());
	}
}

The program starts creating a Phaser object that will control the synchronization of the threads

at the end of each phase. The constructor of Phaser receives the number of participants as

a parameter. In our case, Phaser has three participants. This number indicates to Phaser

the number of threads that have to execute an arriveAndAwaitAdvance() method before

Phaser changes the phase and wakes up the threads that were sleeping.

Once Phaser has been created, we launch three threads that execute three different

FileSearch objects.

The first instruction in the run() method of this FileSearch object is a call to the

arriveAndAwaitAdvance() method of the Phaser object. As we mentioned earlier, the

Phaser knows the number of threads that we want to synchronize. When a thread calls this

method, Phaser decreases the number of threads that have to finalize the actual phase and

puts this thread to sleep until all the remaining threads finish this phase. Calling this method

at the beginning of the run() method makes none of the FileSearch threads begin their

job until all the threads have been created.

At the end of phase one and phase two, we check if the phase has generated results and the

list with the results has elements, or otherwise the phase hasn‘t generated results and the list

is empty. In the first case, the checkResults() method calls arriveAndAwaitAdvance()

as explained earlier. In the second case, if the list is empty, there‘s no point in the thread

continuing with its execution, so it returns. But you have to notify the phaser that there will be

one less participant. For this, we used arriveAndDeregister(). This notifies the phaser

that this thread has finished the actual phase, but it won‘t participate in the future phases, so

the phaser won‘t have to wait for it to continue.

At the end of the phase three implemented in the showInfo() method, there is a call to the

arriveAndAwaitAdvance() method of the phaser. With this call, we guarantee that all the

threads finish at the same time. When this method ends its execution, there is a call to the

arriveAndDeregister() method of the phaser. With this call, we deregister the threads

of the phaser as we explained before, so when all the threads finish, the phaser will have zero

participants.

Finally, the main() method waits for the completion of the three threads and calls the

isTerminated() method of the phaser. When a phaser has zero participants, it enters the

so called termination state and this method returns true. As we deregister all the threads of

the phaser, it will be in the termination state and this call will print true to the console.

时间: 2024-10-14 14:00:49

Java多线程之~~~Phaser类实现任务的同步的相关文章

Java多线程之~~~Phaser重写onAdvance方法

在Phaser类中,我们在每个线程中,每个线程进行完一个阶段完成后都会等待其他线程完成后再一起进行,当所 有线程都完成了一个任务的时候,会调用Phaser的onAdvance方法,如果我们想在每个阶段,所有线程都完成他们的阶 段工作后做点啥事的话,那就得继承Phaser类来重写Onadvance这个方法来实现我们的目的,下面我们用一个例子来说 明,例子就是模拟多个学生考试,考试分为三个阶段,每个阶段完成后,都会等待所有的学生完成,同时我们希望在每 一个阶段,所有的学生完成一个阶段的任务后打印出几

Java多线程01(Thread类、线程创建、线程池)

Java多线程(Thread类.线程创建.线程池) 第一章 多线程 1.1 多线程介绍 1.1.1 基本概念 进程:进程指正在运行的程序.确切的来说,当一个程序进入内存运行,即变成一个进程,进程是处于运行过程中的程序,并且具有一定独立功能. 线程:线程是进程中的一个执行单元,负责当前进程中程序的执行,一个进程中至少有一个线程.一个进程中是可以有多个线程的,这个应用程序也可以称之为多线程程序. 简而言之:一个程序运行后至少有一个进程,一个进程中可以包含多个线程 1.1.2 单线程程序 - 从入口m

Java 多线程之 Thread 类 和 Runnable 接口初步使用

Thread 类 Thread 类是在 JDK1.0 时就存在的, 在 Java 中接触的多线程开发基本上都会从这个类开始. Thread之定义线程类 使用 Thread 创建线程的方法很简单, Thread 是一个类, 在需要创建线程时, 我们只需要继承这个类, 并将 run() 方法进行重写即可. class PrimeThread extends Thread { long minPrime; PrimeThread(long minPrime) { this.minPrime = min

Java多线程之~~~CyclicBarrier 类的使用

上一节说了CountDown的使用方法,对于用来同步多个线程之间的协作关系,Java更提供了更加高级的方法来实 现,这个类就是CyclicBarrier. 它可以实现当多个分支线程完成他们的工作后,调用await方法来等待,然后等所有的分 支线程工作完毕后,会自动的调用主线程的run方法,这个主线程是一个实现runnable接口的类,在CyclicBarrier实例化 的时候就调用了. 下面我们就用代码来说明这个问题.代码实现的效果是从一个二维数组中查询到我们需要的数字,然后分支成五个 线程来分

Java 多线程 (Thread 类)

1.多线程 1.多线程实现 两种方式可以实现多线程: 继承 Thread 类,重写 run 方法:定义对象,调用 start 方法 创建类实现 Runnable 接口,作为实参传递给 thread 的构造方法.定义对象,调用 start 方法. 1.1.继承 Thread 继承类,重写方法 class TDemo1 extends Thread { public String name; // 取个名字,便于识别 public TDemo1 (String name) { // 构造方法 thi

Java多线程之原子操作类

在并发编程中很容易出现并发安全问题,最简单的例子就是多线程更新变量i=1,多个线程执行i++操作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过Synchronized进行控制来达到线程安全的目的.但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案.实际上,在J.U.C下的Atomic包提供了一系列的操作简单,性能高效,并能保证线程安全的类去更新多种类型.Atomic包下的这些类都是采用乐观锁策略CAS来更新数据. CAS原理与问题 CAS操作(又称为无

java多线程实现卖票程序

本文采用java多线程实现了模拟车站多个车票卖票的功能. 关键词:java多线程 共享变量 实现runnable接口 volatile  线程同步. 代码如下 Ticket类 package ex7_TicketSaler; /*同一对象的多个线程thread0/1/2,对共享变量count的操作,需要将count的值声明为volatile * 并且因为多个线程操作的是同一个对象ticket,因此count是资源共享的 * */ public class Ticket implements Ru

java基础知识回顾之java Thread类学习(八)--java多线程通信等待唤醒机制经典应用(生产者消费者)

 *java多线程--等待唤醒机制:经典的体现"生产者和消费者模型 *对于此模型,应该明确以下几点: *1.生产者仅仅在仓库未满的时候生产,仓库满了则停止生产. *2.消费者仅仅在有产品的时候才能消费,仓空则等待. *3.当消费者发现仓储没有产品可消费的时候,会唤醒等待生产者生产. *4.生产者在生产出可以消费的产品的时候,应该通知等待的消费者去消费. 下面先介绍个简单的生产者消费者例子:本例只适用于两个线程,一个线程生产,一个线程负责消费. 生产一个资源,就得消费一个资源. 代码如下: pub

java基础知识回顾之java Thread类学习(七)--java多线程通信等待唤醒机制(wait和notify,notifyAll)

1.wait和notify,notifyAll: wait和notify,notifyAll是Object类方法,因为等待和唤醒必须是同一个锁,不可以对不同锁中的线程进行唤醒,而锁可以是任意对象,所以可以被任意对象调用的方法,定义在Object基类中. wait()方法:对此对象调用wait方法导致本线程放弃对象锁,让线程处于冻结状态,进入等待线程的线程池当中.wait是指已经进入同步锁的线程,让自己暂时让出同步锁,以便使其他正在等待此锁的线程可以进入同步锁并运行,只有其它线程调用notify方