import java.io.File; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 多线程抓取数据的简单程序 */ public class MultithreadFetcher { /** 阻塞队列的最大长度,防止内存溢出。 */ public static final int MAX_QUEUE_SIZE = 100; /** 最大递归深度,防止递归死循环 */ public static final int RECURSION_LEVEL = Integer.MAX_VALUE; /** 致命毒药,优雅关闭后续的工作线程 */ private static final File DEADLY_POISON = new File("./deadly.tmp"); /** * 递归遍历文件夹,将遍历的文件放入队列中。 * @param folder 目标文件夹 * @param fileQueue 文件队列 * @param level 递归深度 */ private static void visitFolder(File folder, BlockingQueue<File> fileQueue, int level) throws InterruptedException{ if(level<=0){//控制递归深度,防止递归死循环。 return; } File[] files = folder.listFiles(); for(File file : files){ if(file.isDirectory()){ visitFolder(file,fileQueue,level-1); }else if(file.getName().toLowerCase().endsWith(".xml")){ fileQueue.put(file); }else{ //do nothing ... } } } /** * 创建目标文件。通过原文件的名称创建一个新的文件。 * @param file 原始文件 * @param targetFolder 目标文件夹 * @return 新的文件,目标文件 */ private static File createTargetFile(File file, File targetFolder){ String targetFileName = file.getName(); return new File(targetFolder,targetFileName); } /** * 处理文件的操作,可以在这个里面读取文件数据,解析文件,抓取网页,写入备份。 * @param file 原始文件,待解析的文件 * @param target 目标文件,备份文件 */ private static void travelFile(File file, File target) throws Throwable{ //详细操作从略 } /** 递归文件夹的线程。不支持多线程并发递归。 */ static class VisitFolderThread extends Thread{ private File folder; private BlockingQueue<File> fileQueue; public VisitFolderThread(File folder, BlockingQueue<File> fileQueue) { super("visit-folder-thread"); this.folder = folder; this.fileQueue = fileQueue; } @Override public void run() { try { visitFolder(folder, fileQueue, RECURSION_LEVEL); fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭 } catch (InterruptedException e) { // 在这里可以做一些异常处理 e.printStackTrace(); } } } /** 处理文件的线程,可以多线程并发处理,每个线程处理一个文件 */ static class TravelFileThread extends Thread{ private static final AtomicInteger ThreadCount = new AtomicInteger(); private File targetFolder; private BlockingQueue<File> fileQueue; public TravelFileThread(File targetFolder, BlockingQueue<File> fileQueue) { super("travel-file-thread-"+ThreadCount.incrementAndGet()); this.targetFolder = targetFolder; this.fileQueue = fileQueue; } @Override public void run() { File file = null; try { while((file=fileQueue.take())!=DEADLY_POISON){ File target = createTargetFile(file, targetFolder); try { travelFile(file, target); } catch (Throwable e) { onException(e,file,target); } } fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭 } catch (InterruptedException e) { // 在这里可以做一些异常处理 e.printStackTrace(); } } /** 在处理文件的过程中,如果抛出异常,则进入下面的处理程序,从略。 */ private void onException(Throwable e, File file, File target) { // 如果travelFile抛出异常,则在此处进行处理。 e.printStackTrace(); } } private BlockingQueue<File> fileQueue = new LinkedBlockingQueue<File>(MAX_QUEUE_SIZE); private Thread visitFolderThread; private Thread[] travelFileThreads; public MultithreadFetcher(File sourceFolder, File targetFolder, int travelThreads) { super(); visitFolderThread = new VisitFolderThread(sourceFolder, fileQueue); travelFileThreads = new TravelFileThread[travelThreads]; for(int i=0;i<travelFileThreads.length;i++){ travelFileThreads[i] = new TravelFileThread(targetFolder, fileQueue); } } /** * 开始执行 */ public void start(){ visitFolderThread.start(); for(int i=0;i<travelFileThreads.length;i++){ travelFileThreads[i].start(); } } /** * 强行终止。请慎用。程序会自动关闭 */ public void terminate(){ visitFolderThread.interrupt(); for(int i=0;i<travelFileThreads.length;i++){ travelFileThreads[i].interrupt(); } } /** * 测试用例 */ public static void main(String[] args) { final File sourceFolder = new File(""); final File targetFolder = new File(""); final int travelThreads = 20; MultithreadFetcher fetcher = new MultithreadFetcher(sourceFolder,targetFolder,travelThreads); fetcher.start(); } }
采取了数据分离的形式,消除了线程间互斥。效率不错。
时间: 2024-11-05 16:30:36