统计大文件里,频数最高的10个单词,(C# TPL DataFlow版)

最近公司搞了一个写程序的比赛,要求从2G的文件里统计出出现频率最高的10个单词。

最开始的想法是使用字典树,后来发现字典树更适合用在找前缀上,在查找没有hash表效率高。

之后使用Hash表+DataFlow完成了功能,2G的文件处理在20秒以内(其实我有信心优化到10秒以内,但是太折腾了)。

这是我的设计图:

为什么要形成那么多结果?因为我不想写锁,写锁会降低很多效率,而且也失去了线程的意义,每个线程做自己的工作,

最后在把每个线程处理的结果汇总起来,这样也符合fork join 的设计。

而且我也试过,如果写锁的话,效率会降低10秒以上,我也尝试过微软提供的ConcurrentDictionary 原子哈希表,但是效果都不是

很理想,而且,在并行的年代,在写锁这个东西,感觉很恶心,好像在代码里加了一坨屎一样,我以前就很讨厌锁,也出现过代码死锁的情况。

最后我选择了使用微软的TPL 库来解决并行的问题。

使用DataFlow解决了我处理时多线程管理的问题,还有线程等待消息队列的问题,

使用BufferBlock 进行主控与工作线程之间消息传递,这是我的设计图:

读取文件之后使用BufferBlock.Post发送给工作线程,工作线程使用TryReceive接收消息,并且处理。

在MSDNhttps://msdn.microsoft.com/zh-cn/library/hh228601(v=vs.110).aspx 里有详细的介绍。

这是典型的单生产者,多使用者的列子。

代码方面首先是读取文件:

  public class FileBufferBlock
    {

        private string _fileName;
        BufferBlock<WordStream> _buffer = null;
        public FileBufferBlock(BufferBlock<WordStream> buffer,string fileName)
        {
            this._fileName = fileName;
            this._buffer = buffer;
        }

        /// <summary>
        /// 按32M读取文件,循环发送给WordBufferBlock
        /// </summary>
        public void ReadFile()
        {
            using (FileStream fs = new FileStream(_fileName, FileMode.Open, FileAccess.Read))
            {
                using (StreamReader sr = new StreamReader(fs))
                {
                    while (!sr.EndOfStream)
                    {

                        char[] charBuffer = new char[32 * 1024 * 1024];
                        sr.ReadBlock(charBuffer, 0, charBuffer.Length);
                        _buffer.Post(new WordStream(charBuffer));
                    }
                }
            }
            _buffer.Complete();
        }

在这里使用BufferBlock.Post 发送消息给工作线程,如果不用它,你得去找个能阻塞的消息队列。

下面是我的接收方的代码,使用BufferBlock.TryReceive 接收消息,然后处理,在这里可以开多个个线程去处理。

而且线程是它帮你管理的:

// --------------------------------------------------------------------------------------------------------------------
// <copyright file="WordProcessBufferBlock.cs" company="yada">
//   Copyright (c) yada Corporation. All rights reserved.
// </copyright>
// change by qugang 2015.4.18
// 描述:用于截取单词的工作线程
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace WordStatistics
{
    public class WordProcessBufferBlock
    {
        private int _taskCount = 1;
        BufferBlock<WordStream> _buffer = null;
        private List<Task<Dictionary<string, int>>> _list = new List<Task<Dictionary<string, int>>>();

        /// <summary>
        /// 单词处理类
        /// </summary>
        /// <param name="taskCount">工作线程数</param>
        /// <param name="buffer">DataFlow的BufferBlock</param>
        public WordProcessBufferBlock(int taskCount, BufferBlock<WordStream> buffer)
        {
            _taskCount = taskCount;
            this._buffer = buffer;
        }

        public void StartWord()
        {
            for (int i = 0; i < _taskCount; i++)
            {
                _list.Add(Process());
            }
        }
        /// <summary>
        /// 等待所有工作完成
        /// </summary>
        /// <param name="f">完成后的工作函数</param>
        public void WaitAll(Action<Dictionary<string,int>> f)
        {
            Task.WaitAll(_list.ToArray());
            foreach (var row in _list)
            {
                f(row.Result);
            }
        }

        /// <summary>
        /// 使用BufferBlock.TryReceive循环从消息里取从FileBufferBlock发送的buffer
        /// </summary>
        /// <returns>工作结果</returns>
        private async Task<Dictionary<string, int>> Process()
        {
            Dictionary<string, int> dic = new Dictionary<string, int>();
            while (await _buffer.OutputAvailableAsync())
            {
                WordStream ws;
                while (_buffer.TryReceive(out ws))
                {
                    foreach (string value in ws)
                    {
                        if (dic.ContainsKey(value))
                        {
                            dic[value]++;
                        }
                        else
                        {
                            dic.Add(value, 1);
                        }
                    }
                }
            }
            return dic;
        }
    }
}

WordStrem是我自己写的一个单词枚举流,继承了IEnumerable接口,将找单词的算法写到枚举器里面,实现流化。

// --------------------------------------------------------------------------------------------------------------------
// <copyright file="WordStatistics.cs" company="yada">
//   Copyright (c) yada Corporation. All rights reserved.
// </copyright>
// change by qugang 2015.4.18
// 单词枚举器:算法从开始找字母,如果不是字母,则返回从pos 到end 的组成单词
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace WordStatistics
{
    /// <summary>
    /// 单词枚举器
    /// </summary>
    public class WordStream : IEnumerable
    {
        private char[] buffer;
        public WordStream(char[] buffer)
        {
            this.buffer = buffer;
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return (IEnumerator)GetEnumerator();
        }

        public WordStreamEnum GetEnumerator()
        {
            return new WordStreamEnum(this.buffer);
        }

    }

    public class WordStreamEnum : IEnumerator
    {
        private char[] buffer;
        int pos = 0;
        int endCount = 0;
        int index = -1;

        public WordStreamEnum(char[] buffer)
        {
            this.buffer = buffer;
        }

        public bool MoveNext()
        {
            while (index < buffer.Length - 1)
            {
                index++;
                char buff = buffer[index];
                if ((buff >= ‘a‘ && buff <= ‘z‘) || (buff >= ‘A‘ && buff <= ‘Z‘))
                {
                    if (endCount == 0)
                    {
                        pos = index;
                        endCount++;
                    }
                    else
                    {
                        endCount++;
                    }
                }
                else
                {
                    if (endCount != 0)
                        return true;
                }
                if (buff == ‘\0‘)
                {
                    return false;
                }
            }
            return false;
        }

        public object Current
        {
            get
            {
                int tempInt = endCount;
                endCount = 0;
                return new string(buffer, pos, tempInt);
            }
        }

        public void Reset()
        {
            index = -1;
        }
    }

}

到这里就完成了,然后再Main函数里添加调用

  static void Main(string[] args)
        {
            DateTime dt = DateTime.Now;

            var buffer = new BufferBlock<WordStream>();

            //创建工作BufferBlock
            WordProcessBufferBlock wb = new WordProcessBufferBlock(8, buffer);
            wb.StartWord();

            //创建读取文件,发送的BufferBlock
            FileBufferBlock fb = new FileBufferBlock(buffer, @"D:\content.txt");
            fb.ReadFile();

            Dictionary<string,int> dic = new Dictionary<string,int>();

            //等待工作完成汇总结果
            wb.WaitAll(p =>
                {
                    foreach (var row in p)
                    {
                        if (!dic.ContainsKey(row.Key))
                            dic.Add(row.Key, row.Value);
                        else
                        {
                            dic[row.Key] += row.Value;
                        }
                    }
                }
                );

            var myList = dic.ToList();
            myList.Sort((p, v) => v.Value.CompareTo(p.Value));
            foreach (var row in myList.Take(10))
            {
                Console.WriteLine(row);
            }

            Console.WriteLine(DateTime.Now - dt);

        }

最后2G的文件,我的机器跑出来是19秒多。

如果代码没有包,请从NuGet上下载Dataflow包。

代码下载:http://files.cnblogs.com/files/qugangf/WordStatistics.rar

时间: 2024-10-18 07:34:20

统计大文件里,频数最高的10个单词,(C# TPL DataFlow版)的相关文章

统计大文件里单词

转载统计大文件里,频数最高的10个单词,(C# TPL DataFlow版) 最近公司搞了一个写程序的比赛,要求从2G的文件里统计出出现频率最高的10个单词. 最开始的想法是使用字典树,后来发现字典树更适合用在找前缀上,在查找没有hash表效率高. 之后使用Hash表+DataFlow完成了功能,2G的文件处理在20秒以内(其实我有信心优化到10秒以内,但是太折腾了). 这是我的设计图: 为什么要形成那么多结果?因为我不想写锁,写锁会降低很多效率,而且也失去了线程的意义,每个线程做自己的工作,

一篇英文文档中找出频数最多的10个单词

"""一篇英文文档中找出频数最多的10个单词collections: Counter 提供计数器工具以支持方便和快速的计数 most_common(n) 返回n个最常见元素及其计数的列表,从最常见到最少. 如果省略nNone,则 most_common()返回计数器中的所有元素."""import refrom collections import Counter# print(dir(Counter))with open('english.tx

读写文件:每次读入大文件里的一行、读写.CSV文件

读文件: 传统的读法.所有读出,按行处理: fp=open("./ps.txt", "r"); alllines=fp.readlines(); fp.close(); for eachline in alllines: print eachline 推荐读取方法,使用文件迭代器 , 每次仅仅读取和显示一行.读取大文件时应该这样: fp=open("./ps.txt", "r"); for eachline in fp: pr

统计日志文件里访问量前十的ip并按从多到少排列

Apache座位web服务器.访问日志名为ex.log 先找出IP段,然后进行数量统计,最后对数量进行数量排序,取出最大值. # cut -d "" -f 1 ex.log|uniq -c|sort -r|head -1|awk '{print $2}' 1.取出IP段: # cut -d " " -f 1 clientuser.log 127.0.0.1 127.0.0.1 192.168.1.100 192.168.1.100 192.168.1.100 192

统计大文件日志访问量

日志格式: #Version: 1.0 #Fields: date time time-taken x-c_username bytes c-ip s-ip sc-status cs-method cs-uri-stem cs-uri x-c_sessionid #Software: WebLogic #Start-Date: 2017-03-27 00:01:13 2017-03-27 00:01:13 0.043 - 32 10.33.215.218 10.12.216.147:30573

实现大文件里的快速排序

#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> struct csdn { char name[22]; char password[43]; char email[52]; }; int namemax = -1; int passmax = -1; int mailmax = -1; void init(struct csdn *pdata, char *str) { for (cha

Facebook图片存储系统Haystack——存小文件,本质上是将多个小文件合并为一个大文件来降低io次数,meta data里存偏移量

转自:http://yanyiwu.com/work/2015/01/04/Haystack.html 一篇14页的论文Facebook-Haystack, 看完之后我的印象里就四句话: 因为[传统文件系统的弊端] 因为[缓存无法解决长尾问题] 所以[多个图片信息(Needle)存在同一个文件(SuperBlock)中] 所以[显著提高性能] 传统文件系统的弊端 传统的 POSIX 文件系统不适合高性能的图片存储, 主要原因是基于该文件系统来存储的话,是讲每个图片存储成某目录下的一个文件, 每次

PHP几个几十个G大文件数据统计并且排序处理

诸多大互联网公司的面试都会有这么个问题,有个4G的文件,如何用只有1G内存的机器去计算文件中出现次数最多的数字(假设1行是1个数组,例如QQ号 码).如果这个文件只有4B或者几十兆,那么最简单的办法就是直接读取这个文件后进行分析统计.但是这个是4G的文件,当然也可能是几十G甚至几百G的文 件,这就不是直接读取能解决了的. 同样对于如此大的文件,单纯用PHP做是肯定行不通的,我的思路是不管多大文件,首先要切割为多个应用可以承受的小文件,然后批量或者依次分析统计小文件后再把总的结果汇总后统计出符合要

【学习】大文件统计与排序(转载)

学习:大文件统计与排序 这篇主要记录一下学习陈硕同学的对下面这道题的算法思想与代码. 题目是这样的: 有10个文件,每个文件1G,每个文件的每行存放的都是用户的query(请自己随机产生),每个文件的query都可能重复.要求你按照query的频度排序. (当然,这里的重点是大文件,所以10个1G的文件,或者1个10G的文件,原理都是一样的) 陈硕的代码在这里: https://gist.github.com/4009225 这是一段非常漂亮的代码,解法与代码都非常值得一看. [解法] 基本步骤