多线程处理大量文件

上周做了一个多线程处理大量文件的功能 一是记录 二是分享 三是请博友指出不足 更多的了解多线程。

1.任务:将大量(大约5G)一目录下有日期规则命名的html文件按照年月日三个层次目录存放,目的是为了提高文件检索效率。

2.具体实现:开启10个线程 将文件拷贝到目标文件夹;不符合要求的文件拷贝到别处;记录错误信息和不符合要求的信息;循环判断状态 执行完毕给出提示。

3.开始设想和后来出现问题:

开了10个线程 处理所有文件,如果一文件已在目标文件下存在 则不处理,但是线程间几乎是同时进行的 这样就会报错"该文件已被另一线程占用"。这样处理是不行的 于是就改善了。让每个线程按顺序处理固定条数的文件,则每个线程不会处理处理同个文件。

4.代码实现:

 声明变量

    /    /App.config 配置路径
        //源文件路径
        string sourcePath = ConfigurationManager.AppSettings["sourcePath"];
        //目标路径
        string toPath = ConfigurationManager.AppSettings["toPath"];
        //错误文件存放路径
        string errorLogPath = "\\log";
        //文件总数
        int totalFileCount;
        //复制文件数
        private int operaFileCount;
        //未复制文件数
        int notCopyFileCount;
        //文件名称过长文件数
        int longNameFileCount;
        //线程数
        int threadCount = 10;
将所有文件名称进行分批处理 分页方法
        /// <summary>
        /// 将所有文件名称进行分页
        /// </summary>
        /// <param name="PageSize">每页条数</param>
        /// <param name="CurPage">当前页</param>
        /// <param name="objs">集合</param>
        /// <returns></returns>
        public static List<string> QueryByPage(int PageSize, int CurPage, List<string> objs)
        {
            return objs.Take(PageSize * CurPage).Skip(PageSize * (CurPage - 1)).ToList();
        }

        public class Page
        {
            public int pageIndex { get; set; }
            public int pageCount { get; set; }
            public int pageSize { get; set; }
            public List<string> list { get; set; }
        }

拷贝文件的方法

        #region DoWork方法
        private string errorFieName;
        public void DoWork(object obj)
        {
            Object locker = new object();
            Page page = (Page)obj;
            Console.WriteLine(ThreadName());
            int PageSize = page.pageSize;
            int CurPage = page.pageIndex;
            var grouList = QueryByPage(PageSize, CurPage, page.list);
            foreach (string file in grouList)
            {
                string fileName = Path.GetFileName(file);
                errorFieName = fileName;

                if (fileName != null && fileName.Length != 18)
                {

                    Console.WriteLine(fileName + "文件名称不符合要求!");
                    CopyErrorFile(fileName);
                    Write(fileName + "文件名称不符合要求!");
                    continue;
                }
                //Console.WriteLine(fileName.Length);
                try
                {
                    //截取文件名称 源文件名称规则 ABC2014200...html 意思是:2014年第200天 可以推算出年月日
                    string subYearMonth = fileName.Substring(3, 5);
                    int yearNum = 0;
                    int dayNum = 0;
                    int.TryParse(subYearMonth.Substring(0, 2), out yearNum);
                    int.TryParse(subYearMonth.Substring(2, 3), out dayNum);
                    int.TryParse("20" + yearNum, out yearNum);
                    if (yearNum < 1 || dayNum < 1)
                    {
                        Console.WriteLine(fileName + "文件名称不符合要求!");
                        CopyErrorFile(fileName);
                        //Write(fileName + "文件名称不符合要求!");
                        continue;
                    }
                    //声明日期
                    DateTime date = new DateTime();
                    date = date.AddYears(yearNum).AddYears(-1);
                    date = date.AddDays(dayNum).AddDays(-1);
                    string fullSavePath = string.Format("{0}\\{1}\\{2}\\{3}\\", toPath, yearNum, date.Month, date.Day);
                    if (!Directory.Exists(fullSavePath))
                    {
                        Directory.CreateDirectory(fullSavePath);
                    }
                    lock (fullSavePath)
                    {

                        File.Copy(file, fullSavePath + fileName, true);
                        operaFileCount++;
                        //Write("处理完成:" + fileName);
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine("文件名称:" + errorFieName + "处理错误:" + ex.Message);
                    Write(ex.Message);
                    throw new Exception(ex.Message);
                }
            }
        }
        #endregion    

循环执行线程

        public void CopyFile()
        {
            //开始方法时删除上次记录
            DeleteLog();
            List<Thread> threadList = new List<Thread>(); ;
            for (int i = 0; i < threadCount; i++)
            {
                try
                {
                    if (!Directory.Exists(toPath))
                    {
                        Directory.CreateDirectory(toPath);
                    }
                    //string[] fileNames = Directory.GetFileSystemEntries(sourcePath);
                    string[] fileNames = Directory.GetFiles(sourcePath);
                    var fileNameList = fileNames.ToList();
                    totalFileCount=fileNames.Length;

                    //共threadCount个线程 每个线程执行oneThreadNameCount条
                    int oneThreadNameCount = (int)Math.Ceiling(((double)fileNames.Length) / threadCount);
                    Page page;
                    //当前执行的线程
                    page = new Page { pageIndex = i + 1, pageSize = oneThreadNameCount, list = fileNameList };
                    threadList.Add(new Thread(new ParameterizedThreadStart(DoWork)));
                    threadList[i].Start(page);
                    threadList[i].Name = i + "_thread";
                }

                catch (Exception ex)
                {
                    Console.WriteLine("错误信息:" + ex.Message);
                    Write(ex.Message);
                }
            }
            //判断线程执行情况
            bool isRanning;
            bool isComplete = false;
            while (!isComplete)
            {
                //2秒判断一次
                Thread.Sleep(2000);
                isRanning = false;
                foreach (var item in threadList)
                {
                    if (item.ThreadState == ThreadState.Running)
                    {
                        isRanning = true;
                    }
                }
                if (!isRanning)
                {
                    isComplete = true;
                    Console.WriteLine();
                    Console.WriteLine("处理结果 共" + totalFileCount+";已处理:"+operaFileCount);
                    Console.WriteLine("不符合要求:" + notCopyFileCount + "(已拷贝到errorfile文件夹);无法拷贝名称过长文件:" + longNameFileCount);
                    Console.WriteLine("请查看文件夹" + toPath);
                }
            }
            Console.ReadKey();
        }

辅助方法

        #region copy不符合要求文件
        private void CopyErrorFile(string fileName)
        {
            //实际长度超过222报错,并且无法抛出异常
            if (fileName.Length > 180)
            {
                longNameFileCount += 1;
                Write(fileName + "名称过长!");
                return;
            }
            notCopyFileCount += 1;
            string strErrorPath  =toPath+"\\errorfile";
            if(!Directory.Exists(strErrorPath)) Directory.CreateDirectory(strErrorPath);
            File.Copy(sourcePath+"\\"+fileName, toPath+"\\errorfile\\" + fileName, true);
        }
        #endregion

        private void DeleteLog()
        {
            try
            {
                if (!Directory.Exists(toPath + errorLogPath)) return;
                foreach (string var in Directory.GetFiles(toPath + errorLogPath))
                {
                    File.Delete(var);
                }
            }
            catch
            {
                //Directory.CreateDirectory(toPath + errorLogPath);
            }

        }
        private void Write(string message)
        {
            object obj = new object();
            lock (obj)
            {
                try
                {
                    string logPath=toPath+errorLogPath;
                    if (!Directory.Exists(logPath)) Directory.CreateDirectory(logPath);
                    Thread primaryThread = Thread.CurrentThread;
                    //当前线程名称
                    string threadName = primaryThread.Name;
                    using (TextWriter sw = new StreamWriter(logPath+"\\"+ThreadName()+"_error.txt", true))
                    {
                        sw.WriteLine("错误信息:" + message);
                        sw.Dispose();
                    }

                }
                catch (Exception ex)
                {
                    Console.WriteLine("错误信息:" + ex.Message);
                }
            }

        }
时间: 2024-10-14 15:16:32

多线程处理大量文件的相关文章

java 导出 excel 最佳实践,java 大文件 excel 避免OOM(内存溢出) exce

产品需求 产品经理需要导出一个页面的所有的信息到 EXCEL 文件. 需求分析 对于 excel 导出,是一个很常见的需求. 最常见的解决方案就是使用 poi 直接同步导出一个 excel 文件. 客户体验 & 服务性能 客户体验 如果导出的文件比较大,比如几十万条数据,同步导出页面就会卡主,用户无法进行其他操作. 服务性能 导出的时候,任务比较耗时就会阻塞主线程. 如果导出的服务是暴露给外部(前后端分离),这种大量的数据传输十分消耗性能. 解决方案 使用异常处理导出请求,后台 MQ 通知自己进

最佳vim技巧

最佳vim技巧----------------------------------------# 信息来源----------------------------------------www.vim.org         : 官方站点comp.editors        : 新闻组http://www.newriders.com/books/opl/ebooks/0735710015.html : Vim书籍http://vimdoc.sourceforge.net/cgi-bin/vim

PYTHON多线程处理文件

一个几十G的文件想用Python多线程读取提高处理效率,得到的结果总是不如预期.在毛帅的提醒下才发现一个进程启动的线程将共享文件句柄,A线程对文件的操作(即使是读)也将影响到B线程.如图,图片来自毛帅: 测试代码如下: # -*- coding: UTF-8 -*- def threadFunc1(demo, threadnum, startlinenum, deallinenum):     # 行数计数器     line = 0     # skip若干行     while line <

多进程、多线程处理文件对比

分别通过多进程.多线程方式处理文件,将结果保存到一个list中: 1.多进程: import multiprocessing,cjson,os,collections from multiprocessing import Process,freeze_support,Manager,Pool,Queue def handlefile(lock,rst,fp): lst_tmp=[] #print type(rst) with open(fp,'rb') as fo: for line in f

C# 多线程处理文件

问题: 存在多组数据,每组数据保存成一个文件, 用多线程的方式实现 class MultiProcessDemo { // 创建数组容器 List<EventWaitHandle> _listWait = new List<EventWaitHandle>(); // 每条记录启动一个线程,将记录保存到csv文件 foreach (string entity in listRecord) { Thread th = new Thread(()=>TodoFun()); th.

MySQL数据库安装,配置My.ini文件

最近在做项目开发时用到了MySql数据库,在看了一些有关MySql的文章后,很快就上手使用了.在使用的过程中还是出现了一些问题,因为使用的是绿色免安装版的MySql所以在配置的时候出现了一些问题,该篇文章就主要针对MySql绿色版的配置及其使用进行讨论. 一.MySql概述 MySql数据库是有瑞典MySql AB公司开发,现在该公司被Oracle收购属于Oracle所有.同SQL Server类似,它也是基于关系型数据库的数据库管理系统,在Web应用方面MySQL是最好的RDBMS之一,因为它

java 中使用线程池处理文件夹下面的子文件

读取某个文件夹下面的所有文件,使用多线程处理,例如读取E盘下面的文件内容: package thread; import java.io.File; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;

java并发读&写文件

最近在看Brian Goetz 的<<Java并发实战>>,这本书有两个版本,电子工业出版社的译本很糟糕,建议使用机械工业出版社出版出版的书籍. 在看到第三四章的时候突然想到了多线程读写文件,同时遇到一些书中没有的问题 1, 如何保证组合对象的安全性? 2, 如何判断不变性的约束条件 3, 如何不通过synchronized关键字和锁进行同步处理? 下面是一段代码, 用来从source 读取数据,通过多线程写入target文件中 思路: 1, 如何read/write文件? 2,

[02]WPF异步响应,自定义事件、委托——多线程处理

题记 在编写有GUI的程序时,会遇到这样一种情形:用户点击了一个按钮,程序处理这个事件,然而这个处理过程耗时间较长.我们不想让软件卡在这里,而是让用户可以继续使用其他的软件功能.这种问题可以用多线程的事件响应来解决.这里,我就WPF的多线程事件响应做一个简单的归纳. 一.简单的异步的事件响应 在WPF中,针对简单的多线程处理过程,我们可以使用.NET自带的BackgroundWork完成.BackgroundWork的处理过程就是异步的,不会让用户界面停止响应. using System.Com