Java阻塞队列线程集控制的实现

队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。 在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会 自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。

java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、 ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是 ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了, 等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。

生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。

我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技 巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线 程取到这个虚拟对象时,就将其放回并终止。

注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。

import java.io.*;
import java.util.*;
import java.util.concurrent.*;  

public class BlockingQueueTest
{
   public static void main(String[] args)
   {
      Scanner in = new Scanner(System.in);
      System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
      String directory = in.nextLine();
      System.out.print("Enter keyword (e.g. volatile): ");
      String keyword = in.nextLine();  

      final int FILE_QUEUE_SIZE = 10;
      final int SEARCH_THREADS = 100;  

      BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);  

      FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
      new Thread(enumerator).start();
      for (int i = 1; i <= SEARCH_THREADS; i++)
         new Thread(new SearchTask(queue, keyword)).start();
   }
}  

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable
{
   /**
    * Constructs a FileEnumerationTask.
    * @param queue the blocking queue to which the enumerated files are added
    * @param startingDirectory the directory in which to start the enumeration
    */
   public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
   {
      this.queue = queue;
      this.startingDirectory = startingDirectory;
   }  

   public void run()
   {
      try
      {
         enumerate(startingDirectory);
         queue.put(DUMMY);
      }
      catch (InterruptedException e)
      {
      }
   }  

   /**
    * Recursively enumerates all files in a given directory and its subdirectories
    * @param directory the directory in which to start
    */
   public void enumerate(File directory) throws InterruptedException
   {
      File[] files = directory.listFiles();
      for (File file : files)
      {
         if (file.isDirectory()) enumerate(file);
         else queue.put(file);
      }
   }  

   public static File DUMMY = new File("");  

   private BlockingQueue<File> queue;
   private File startingDirectory;
}  

/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable
{
   /**
    * Constructs a SearchTask.
    * @param queue the queue from which to take files
    * @param keyword the keyword to look for
    */
   public SearchTask(BlockingQueue<File> queue, String keyword)
   {
      this.queue = queue;
      this.keyword = keyword;
   }  

   public void run()
   {
      try
      {
         boolean done = false;
         while (!done)
         {
            File file = queue.take();
            if (file == FileEnumerationTask.DUMMY)
            {
               queue.put(file);
               done = true;
            }
            else search(file);
         }
      }
      catch (IOException e)
      {
         e.printStackTrace();
      }
      catch (InterruptedException e)
      {
      }
   }  

   /**
    * Searches a file for a given keyword and prints all matching lines.
    * @param file the file to search
    */
   public void search(File file) throws IOException
   {
      Scanner in = new Scanner(new FileInputStream(file));
      int lineNumber = 0;
      while (in.hasNextLine())
      {
         lineNumber++;
         String line = in.nextLine().trim();
         if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line);
      }
      in.close();
   }  

   private BlockingQueue<File> queue;
   private String keyword;
}
时间: 2024-11-08 11:16:23

Java阻塞队列线程集控制的实现的相关文章

Java多线程之线程的控制

Java多线程之线程的控制 线程中的7 种非常重要的状态:  初始New.可运行Runnable.运行Running.阻塞Blocked.锁池lock_pool.等待队列wait_pool.结束Dead 如果将"锁池"和"等待队列"都看成是"阻塞"状态的特殊情况,那么可以将线程归纳为5个状态: 新建,就绪,运行,阻塞,死亡. ┌--------------------< 阻塞 ↓                    (1)(2)(3)  

java阻塞队列

对消息的处理有些麻烦,要保证各种确认.为了确保消息的100%发送成功,笔者在之前的基础上做了一些改进.其中要用到多线程,用于重复发送信息. 所以查了很多关于线程安全的东西,也看到了阻塞队列,发现这个模式很不错,可惜我目前用不到. 关于这个的讲解已经很多了,阻塞这个,就是当队列中没有数据的时候,线程读取的话会等待.当队列中的数据满的时候,线程添加数据的时候,也会等待. 有个例子很生动形象,往盘子里面放鸡蛋,只能放固定数目的.盘子里面没有鸡蛋,无法从中拿出来.当盘子里满了,也放不进去.直到被拿出去才

Callable,阻塞队列,线程池问题

一.说说Java创建多线程的方法 1. 通过继承Thread类实现run方法   2. 通过实现Runnable接口 3. 通过实现Callable接口 4. 通过线程池获取 二. 可以写一个Callable的案例吗?如何调用Callable接口 /*是一个带返回值的多线程类,如果需要有线程返回的结果,就需要使用此类*/ class MyThread implements Callable<Integer> { @Override public Integer call() { return

java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析

BlockingQueue是阻塞队列接口类,该接口继承了Queue接口 BlockingQueue实现类常见的有以下几种. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里.有界也就意味着,它不能够存储无限多数量的元素.它有一个同一时间能够存储元素数量的上限.你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改). D

Java阻塞队列实现原理分析-ArrayBlockingQueue和LinkedBlockingQueue

Java中的阻塞队列接口BlockingQueue继承自Queue接口. BlockingQueue接口提供了3个添加元素方法. add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常 offer:添加元素到队列里,添加成功返回true,添加失败返回false put:添加元素到队列里,如果容量满了会阻塞直到容量不满 3个删除方法. poll:删除队列头部元素,如果队列为空,返回null.否则返回元素. remove:基于对象找到

java阻塞队列得实现

阻塞队列与普通队列的不同在于.当队列是空的时候,从队列中获取元素的操作将会被阻塞,或者当队列满时,往队列里面添加元素将会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列,下图展示了如何通过阻塞队列来合作: 线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素 从5.0开始,JDK在Java.util.co

java 阻塞队列BlockingQueue

java中阻塞BlockingQueue 接口实现类中用的较多的通常是ArrayBlockingQueue,LinkedBlockingQueue.它们都是线程安全的.ArrayBlockingQueue以数组的形式存储,LinkedBlockingQueue以node节点的方式进行存储. 开发中如果队列的插入操作比较频繁建议使用LinkedBlockingQueue,因为每个node节点都有一个前后指针,插入新元素仅需要变更前后的指针引用即可, ArrayBlockingQueue插入新元素,

java 阻塞队列

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** 本例介绍一个特殊的队列:BlockingQueue,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待

Java多线程 阻塞队列和并发集合

转载:大关的博客 Java多线程 阻塞队列和并发集合 本章主要探讨在多线程程序中与集合相关的内容.在多线程程序中,如果使用普通集合往往会造成数据错误,甚至造成程序崩溃.Java为多线程专门提供了特有的线程安全的集合类,通过下面的学习,您需要掌握这些集合的特点是什么,底层实现如何.在何时使用等问题. 3.1 BlockingQueue接口 java阻塞队列应用于生产者消费者模式.消息传递.并行任务执行和相关并发设计的大多数常见使用上下文. BlockingQueue在Queue接口基础上提供了额外