探索C#之微型MapReduce

MapReduce近几年比较热的分布式计算编程模型,以C#为例简单介绍下MapReduce分布式计算。

阅读目录

  1. 背景
  2. Map实现
  3. Reduce实现
  4. 支持分布式
  5. 总结

背景

某平行世界程序猿小张接到Boss一项任务,统计用户反馈内容中的单词出现次数,以便分析用户主要习惯。文本如下:

        const string hamlet = @"Though yet of Hamlet our dear brother‘s death
The memory be green, and that it us befitted
To bear our hearts in grief and our whole kingdom
To be contracted in one brow of woe,
Yet so far hath discretion fought with nature
That we with wisest sorrow think on him,
Together with remembrance of ourselves.
Therefore our sometime sister, now our queen,
The imperial jointress to this warlike state,
Have we, as ‘twere with a defeated joy,--
With an auspicious and a dropping eye,
With mirth in funeral and with dirge in marriage,
In equal scale weighing delight and dole,--
Taken to wife: nor have we herein barr‘d
Your better wisdoms, which have freely gone
With this affair along. For all, our thanks.
Now follows, that you know, young Fortinbras,
Holding a weak supposal of our worth,
Or thinking by our late dear brother‘s death
Our state to be disjoint and out of frame,
Colleagued with the dream of his advantage,
He hath not fail‘d to pester us with message,
Importing the surrender of those lands
Lost by his father, with all bonds of law,
To our most valiant brother. So much for him.
Now for ourself and for this time of meeting:
Thus much the business is: we have here writ
To Norway, uncle of young Fortinbras,--
Who, impotent and bed-rid, scarcely hears
Of this his nephew‘s purpose,--to suppress
His further gait herein; in that the levies,
The lists and full proportions, are all made
Out of his subject: and we here dispatch
You, good Cornelius, and you, Voltimand,
For bearers of this greeting to old Norway;
Giving to you no further personal power
To business with the king, more than the scope
Of these delated articles allow.
Farewell, and let your haste commend your duty.";

小张作为蓝翔高材生,很快就实现了:

   var content = hamlet.Split(new[] { " ", Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);
            var wordcount=new Dictionary<string,int>();
            foreach (var item in content)
            {
                if (wordcount.ContainsKey(item))
                    wordcount[item] += 1;
                else
                    wordcount.Add(item, 1);
            }

作为有上进心的青年,小张决心对算法进行抽象封装,并支持多节点计算。小张把这个统计次数程序分成两个大步骤:分解和计算。
第一步:先把文本以某维度分解映射成最小独立单元。 (段落、单词、字母维度)。
第二部:把最小单元重复的做合并计算。
小张参考MapReduce论文设计Map、Reduce如下:

Map实现

Mapping

Mapping函数把文本分解映射key,value形式的最小单元,即<单词,出现次数(1)>、<word,1>。

    public IEnumerable<Tuple<T, int>> Mapping(IEnumerable<T> list)
        {
            foreach (T sourceVal in list)
                yield return Tuple.Create(sourceVal, 1);
        }

使用,输出为(brow, 1), (brow, 1), (sorrow, 1), (sorrow, 1):

            var spit = hamlet.Split(new[] { " ", Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);
            var mp = new MicroMapReduce<string>(new Master<string>());
            var result= mp.Mapping(spit);

Combine

为了减少数据通信开销,mapping出的键值对数据在进入真正的reduce前,进行重复键合并。也相对于提前进行预计算一部分,加快总体计算速度。 输出格式为(brow, 2), (sorrow, 2):

 public Dictionary<T, int> Combine(IEnumerable<Tuple<T, int>> list)
        {
            Dictionary<T, int> dt = new Dictionary<T, int>();
            foreach (var val in list)
            {
                if (dt.ContainsKey(val.Item1))
                    dt[val.Item1] += val.Item2;
                else
                    dt.Add(val.Item1, val.Item2);
            }
            return dt;
        }

Partitioner

Partitioner主要用来分组划分,把不同节点的统计数据按照key进行分组。
其输出格式为: (brow, {(brow,2)},(brow,3)), (sorrow, {(sorrow,10)},(brow,11)):

public IEnumerable<Group<T, int>> Partitioner(Dictionary<T, int> list)
        {
            var dict = new Dictionary<T, Group<T, int>>();
            foreach (var val in list)
            {
                if (!dict.ContainsKey(val.Key))
                    dict[val.Key] = new Group<T, int>(val.Key);
                dict[val.Key].Values.Add(val.Value);
            }
            return dict.Values;
        }

Group定义:

    public class Group<TKey, TValue> : Tuple<TKey, List<TValue>>
    {
        public Group(TKey key)
            : base(key, new List<TValue>())
        {
        }

        public TKey Key
        {
            get
            {
                return base.Item1;
            }
        }

        public List<TValue> Values
        {
            get
            {
                return base.Item2;
            }
        }
    }

Reduce实现

Reducing函数接收,分组后的数据进行最后的统计计算。

 public Dictionary<T, int> Reducing(IEnumerable<Group<T, int>> groups)
        {
            Dictionary<T, int> result=new Dictionary<T, int>();
            foreach (var sourceVal in groups)
            {
                result.Add(sourceVal.Key, sourceVal.Values.Sum());
            }
            return result;
        }

封装调用如下:

 public IEnumerable<Group<T, int>> Map(IEnumerable<T> list)
        {
            var step1 = Mapping(list);
            var step2 = Combine(step1);
            var step3 = Partitioner(step2);
            return step3;
        }

  public Dictionary<T, int> Reduce(IEnumerable<Group<T, int>> groups)
        {
            var step1 = Reducing(groups);
            return step1;
        }

  public  Dictionary<T, int> MapReduce(IEnumerable<T> list)
        {
            var map = Map(list);
            var reduce = Reduce(map);
            return reduce;
        }

整体计算步骤图如下:

支持分布式

小张抽象封装后,虽然复杂度上去了。但暴露给使用者是非常清晰的接口,满足MapReduce的数据格式要求,即可使用。

            var spit = hamlet.Split(new[] { " ", Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);
            var mp = new MicroMapReduce<string>(new Master<string>());
            var result1= mp.MapReduce(spit);

小张完成后脑洞大开,考虑到以后文本数据量超大。 所以fork了个分支,准备支持分布式计算,以后可以在多个服务器节点跑。

数据分片

数据分片就是把大量数据拆成一块一块的,分散到各个节点上,方便我们的mapReduce程序去计算。 分片主流的有mod、consistent hashing、vitual Buckets、Range Partition等方式。 关于consistent hashing上篇有介绍(探索c#之一致性Hash详解)。在Hadoop中Hdfs和mapreduce是相互关联配合的,一个存储和一个计算。如果自行实现的话还需要个统一的存储。所以这里的数据源可以是数据库也可以是文件。小张只是满足boss需求,通用计算框架的话可以直接用现成的。

模拟分片

public List<IEnumerable<T>> Partition(IEnumerable<T> list)
        {
            var temp =new List<IEnumerable<T>>();
            temp.Add(list);
            temp.Add(list);
            return temp;
        }

Worker节点

小张定义了Master,worker角色。 master负责汇集输出,即我们的主程序。 每一个worker我们用一个线程来模拟,最后输出到master汇总,master最后可以写到数据库或其他。

 public void WorkerNode(IEnumerable<T> list)
        {
            new Thread(() =>
            {
                var map = Map(list);
                var reduce = Reduce(map);
                master.Merge(reduce);
            }).Start();
        } 

 public class Master<T>
    {
        public Dictionary<T, int> Result = new Dictionary<T, int>();
        public  void Merge(Dictionary<T, int> list)
        {
            foreach (var item in list)
            {
                lock (this)
                {
                    if (Result.ContainsKey(item.Key))
                        Result[item.Key] += item.Value;
                    else
                        Result.Add(item.Key, item.Value);
                }
            }
        }
    }

分布式计算步骤图:

总结

MapReduce模型从性能速度来说并不是非常好的,它优势在于隐藏了分布式计算的细节、容灾错误、负载均衡及良好的编程API,包含HDFS、Hive等在内一整套大数据处理的生态框架体系。在数据量级不是很大的话,企业自行实现一套轻量级分布式计算会有很多优点,比如性能更好、可定制化、数据库也不需要导入导出。从成本上也节省不少,因为hadoop开发、运维、服务器都需要不少人力物力。

时间: 2024-12-08 06:41:00

探索C#之微型MapReduce的相关文章

探索C#之xxx系列--导航篇

1. 探索c#之函数创建和闭包 2. 探索c#之尾递归编译器优化 3. 探索c#之不可变数据类型 4. 探索c#之递归CPS和APS 5. 探索c#之微型MapReduce 6. 探索c#之Jacobi迭代法 7. ...... 这个系列没有什么计划或目标,想到那就写到那吧. 不过主要应该是C#中不常见的技术点,及用C#去写各种比较好玩的,比如函数式风格,数据分析,科学计算等.

浅谈分布式计算的开发与实现(一)

阅读目录: 介绍 利用分片算法 利用消息队列 Hadoop简介 MapReduce 离线计算 介绍 分布式计算简单来说,是把一个大计算任务拆分成多个小计算任务分布到若干台机器上去计算,然后再进行结果汇总. 目的在于分析计算海量的数据,从雷达监测的海量历史信号中分析异常信号(外星文明),淘宝双十一实时计算各地区的消费习惯等. 海量计算最开始的方案是提高单机计算性能,如大型机,后来由于数据的爆发式增长.单机性能却跟不上,才有分布式计算这种妥协方案. 因为计算一旦拆分,问题会变得非常复杂,像一致性.数

MapReduce Shuffle过程详解

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce,Shuffle是必须要了解的.我看过很多相关方面的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越乱.前端时间在做MapReduce job性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.考虑到之前我在看相关资料而看不懂时很恼火,所以在这里我尽最大的可能试着把Shuffle说清楚,让每一位想了解它原理的朋友都能有所收获.如果你对这篇文章有

【转】MapReduce:详解Shuffle过程

——转自:{http://langyu.iteye.com/blog/992916} Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混.前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.考虑到之前我在看相关资料而看不懂时很恼火,所以在这里我尽最大的可能

MapReduce:详解Shuffle过程

MapReduce:详解Shuffle过程[转] 博客分类: mapreduce MapreduceITeye数据结构多线程Hadoop Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混.前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.考虑到之前我在

对于操作系统的探索和minix的思考

首先声明这里的文字都是我个人的一些菜鸟型的思考,没有任何保证,不喜勿喷! 这个学期学习操作系统,有点喜欢,写系列文章记之!先做一些理论复习,后结合minix谈! 用的是现代操作系统,第一章无非是对于操作系统的一些历史探索: 学习操作系统前,首先提出一个问题,什么是操作系统?为什么会有操作系统这样一个东西存在? 我觉得明白这点是很重要的,你不知道这个东西有什么意义,一味的跟着老师学,最后越学越怀疑自己,你TMD到底在学什么?我们先假设没有操作系统,这个是可以的,现在的很多嵌入式设备(相当于微型PC

【Big Data - Hadoop - MapReduce】通过腾讯shuffle部署对shuffle过程进行详解

摘要: 通过腾讯shuffle部署对shuffle过程进行详解 摘要:腾讯分布式数据仓库基于开源软件Hadoop和Hive进行构建,TDW计算引擎包括两部分:MapReduce和Spark,两者内部都包含了一个重要的过程—Shuffle.本文对Shuffle过程进行解析,并对两个计算引擎的Shuffle过程进行比较. 腾讯分布式数据仓库(Tencent distributed Data Warehouse, 简称TDW)基于开源软件Hadoop和Hive进行构建,并且根据公司数据量大.计算复杂等

Hadoop学习笔记—11.MapReduce中的排序和分组

一.写在之前的 1.1 回顾Map阶段四大步凑 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出,在Step1.4也就是第四步中,需要对不同分区中的数据进行排序和分组,默认情况下,是按照key进行排序和分组. 1.2 实验场景数据文件 在一些特定的数据文件中,不一定都是类似于WordCount单次统计这种规范的数据,比如下面这类数据,它虽然只有两列,但是却有一定的实践意义. 3 3 3 2 3 1 2 2 2 1 1 1 (1)如果按照第一列升序排列,当

Hadoop学习基础之三:MapReduce

现在是讨论这个问题的不错的时机,因为最近媒体上到处充斥着新的革命所谓“云计算”的信息.这种模式需要利用大量的(低端)处理器并行工作来解决计算问题.实际上,这建议利用大量的低端处理器来构建数据中心,而不是利用数目少的多的高端服务器来构建. 举例来说,IBM和Google已经宣布计划用1000台处理器构建的集群提供给部分大学,传授学生们如何使用MapReduce工具在这些集群上编程.加利福尼亚大学伯克利分校甚至打算开设使用MapReduce框架编程的课程.我们对MapReduce支持者大肆炒作它如何