从.net parallel角度解读spark

对于我这样一个一直工作在.net平台上的developer来讲,Hadoop,Spark,HBase等这些大数据名词比较陌生,对于分布式计算,.net上也有类似的Parallel(我说的不是HDInsight), 这篇文章是我尝试从.net上的Parallel类库的角度去讲述什么是spark。

我们先从C#的一个烂大街的例子(不是Helloworld),统计一篇文章单词出现的频率。

下面C#代码是利用.net Parallel来写的统计单词出现频率。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6
 7 namespace WordCountDemo
 8 {
 9     using System.IO;
10     using System.Threading;
11     class Program
12     {
13         /// <summary>
14         /// 我们以计算一篇文章中单词的个数为例子
15         /// (计算文章单词个数的demo简直就是各种大数据计算的HelloWorld)。
16         ///
17         /// WordCountFlow是数单词程序
18         /// WordCountDetail对WordCountFlow函数每一行进行拆解并做了详细解释。
19         /// </summary>
20         /// <param name="args"></param>
21         static void Main(string[] args)
22         {
23             string filePath = @"D:\BigDataSoftware\spark-2.1.0-bin-hadoop2.7\README.md";
24
25             WordCountFlow(filePath);
26             Console.WriteLine("----------------------");
27             WordCountDetail(filePath);
28         }
29
30         /// <summary>
31         /// 数单词的程序流程
32         /// </summary>
33         /// <param name="filePath"></param>
34         static void WordCountFlow(string filePath)
35         {
36             File.ReadAllLines(filePath).AsParallel()
37                 .SelectMany(t => t.Split(‘ ‘))
38                 .Select(t => new { word = t, tag = 1 })
39                 .GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) })
40                 // 如果对Aggregate函数不熟悉,上面代码等同于下行
41                 //.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Sum(p => p.tag) });
42                 .ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId}   ({t.word}-{t.count})"));
43         }
44
45         /// <summary>
46         /// 数单词程序流程的详细解释
47         /// </summary>
48         /// <param name="filePath"></param>
49         static void WordCountDetail(string filePath)
50         {
51             // 读取整篇文章,文章每一行将作为一个string存储到数组lines
52             string[] lines = File.ReadAllLines(filePath);
53             // AsParallel()是Parallel类库的核心方法,具体的意思是将string[] lines这个数组分割成几个分区(Partition)。
54             // 假设这篇文章有500行,那么这个方法会会把string[500]-lines分解成 (string[120] partitionA),
55             // (string[180] partitionB), (string[150] partitionC),(...) 等几个Partition
56             // .net runtime将当前程序的负载(主要是cpu使用情况)情况为依据的分区算法来确定到底要分成几个Partition,
57             // 我们可以大概认为cpu有几个逻辑核(不准确),就会被分解成几个Partition。
58             // 后续的计算中.net runtime将会针对每一个partition申请一个单独的线程来处理.
59             // 比如:partitionA由001号线程处理,partitionB由002号线程处理。。。
60             ParallelQuery<string> parallelLines = lines.AsParallel();
61             // linesA,linesB,linesC...数组中存储的每一行根据空格分割成单词,结果仍然是存放在ParallelQuery<string>这种分块的结构中
62             // 下面带有****的注释,如果对函数式编程没有了解,可以直接忽略。
63             // ****如果对函数式编程有所了解,会知道lambda天生lazy的,如果下面这行代码打个断点,当debug到这行代码的时候,
64             // ****鼠标移动到parallelWords上时,我们不会看到每一个单词,
65             // ****runtime并没有真正将每一行分解成单词,这行代码仅仅是一种计算逻辑。
66             ParallelQuery<string> parallelWords = parallelLines.SelectMany(t => t.Split(‘ ‘));
67             // 将每一个单子加上标记1,这行代码返回的类型为ParallelQuery<var>,var为runtime自动判断,此处var的类型的实际应该为
68             // class 匿名类型
69             // {
70             //        public word {get;set;}
71             //        public tag {get;set}
72             //}
73             var wordparis = parallelWords.Select(t => new { word = t, tag = 1 });
74             // 根据单词进行分组,同一个分组中的单词个数求和,类似于如下sql  select word,count(tag) from wordparis group by word
75             // 注意,此处同样的单词可能分布在不同的分区中,比如英语中常见的"the",可能partitionA中有3个"the",partitionB中有2个“the",
76             // 但是partitionA和partitionB分别被不同的线程处理,如果runtime足够聪明的话,他应该先计算partitionA的the的个数(the,3),
77             // 然后计算partitionB的the的个数(the,2),最后将整个partition合并并且重新分割(shuffle),在做后续的计算
78             // shuffle后partition的分区和之前partition里面的数据会不同。
79             // 此处wordcountParis的类型为
80             // class 匿名类型
81             // {
82             //        public word {get;set;}
83             //        public count {get;set}
84             //}
85             var wordcountParis = wordparis.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) });
86             // 打印结果。由于线程执行的乱序,可以看到输出的partitionId也是乱序。
87             wordcountParis.ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId}   ({t.word}-{t.count})"));
88         }
89     }
90 }

  程序运行结果

  

  通过上面的c#的例子,我们看到parallel如何将一篇文章分解成多个Partition来并且在不同Partition上进行并行计算的,在计算过程中,可能需要"shuffle",需要对原来的Partition进行重新洗牌。

  我们假设,如果这个程序运行在集群上,这些Partition分布在不同的机器上,这样就可以利用多台机器的力量而非一台机器多个线程的力量去做计算了,yeah!,你猜对了,这就是spark,下面的scala的wordCountFlow函数是在spark上统计单词出现频率的函数,与c#的WordCountFlow一样,也是五行代码,并且这五行代码的逻辑也完全相同。只不过spark将数据分布在不同的机器上,并且让机器进行计算,当然,如你所想,某些情况下需要shuffle,不同机器上的数据将会被汇聚并重新分割成新的分区。虽然Spark中的partition和net parallel中的partition并不完全对应(spark中的一台机器上可能有多个paratition) ,shuffle也是spark的专用词汇,但基本的原理是类似的。

package wordCountExample

import org.apache.spark.{SparkConf, SparkContext, TaskContext}

/**
  * Created by StevenChennet on 2017/3/10.
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    // 文件路径
    val filePath="D:\\BigDataSoftware\\spark-2.1.0-bin-hadoop2.7\\README.md"

    wordCountFlow(filePath)
  }
  def wordCountFlow(filePath:String ):Unit={
    // sparkContext对象使用一个SparkConf对象来构造
    // SparkConf主要进行一些设置,比如说local【*】表示尽量开启更多线程并行处理
    // SparkContext是spark执行任务的核心对象
    // 下面五行代码与C#的WordCountFlow五行代码一一对应
    new SparkContext(new SparkConf().setAppName("WordCount").setMaster("local[*]")).textFile(filePath)
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .foreach(t=>println( s"Partition: ${ TaskContext.getPartitionId() }  (${t._1}}-${t._2}})"))
  }
}

  程序运行结果

  

  在net parallel中,如果某个线程在计算过程中崩溃了,那可能导致整个程序都crash掉,如果是集群运算,因为一台宕机而让整个集群崩溃可不是一个好决策,spark可以在计算之前先对要计算的内容持久化,如果一台机器crash,可以将这台机器的计算任务拉到另外一台机器上进行重新计算。

时间: 2024-10-24 03:26:07

从.net parallel角度解读spark的相关文章

第3课:解读spark –streaming运行机制

感谢DT大数据梦工厂支持提供以下内容,DT大数据梦工厂专注于Spark发行版定制.详细信息请查看 联系邮箱[email protected] 电话:18610086859 QQ:1740415547 微信号:18610086859 定制班:第三课 解读spark –streaming运行机制 一 从实战出发 首先我们运行以下的程序,然后通过这个程序的运行过程进一步加深理解Spark Streaming流处理的Job的执行的过程,代码如下:   def main(args: Array[Strin

我看小程序系列文章:1 不一样的角度 解读微信小程序

大家好,我是Beta007. 最近一直在研究小程序,会在这里整理出一系列的文章,和大家交流. 第一篇文章首发在了知乎专栏:小楼昨夜又秋风:https://zhuanlan.zhihu.com/p/22891188 知乎ID:七月在夏天  (头像是只喵~) 不一样的角度 解读微信小程序 七月在夏天· 2 天前 前段时间看完了雨果奖中短篇获奖小说<北京折叠>.很有意思的是,张小龙最近也要把应用折叠到微信里,这些应用被他称为:小程序. 含着金钥匙的小程序,还未展现全貌,就已经成了开发界的头条大事儿.

Spark发行笔记8:解读Spark Streaming RDD的全生命周期

本节主要内容: 一.DStream与RDD关系的彻底的研究 二.StreamingRDD的生成彻底研究 Spark Streaming RDD思考三个关键的问题: RDD本身是基本对象,根据一定时间定时产生RDD的对象,随着时间的积累,不对其管理的话会导致内存会溢出,所以在BatchDuration时间内执行完RDD操作后,需对RDD进行管理. 1.DStream生成RDD的过程,DStream到底是怎么生成RDD的? 2.DStream和RDD到底什么关系? 3.运行之后怎么对RDD处理? 所

从物理执行的角度透视Spark Job(DT大数据梦工厂)

内容: 1.再次思考pipeline: 2.窄依赖物理执行内幕: 3.宽依赖物理执行内幕: 4.Job提交流程: 物理执行是更深层次的角度. ==========再次思考pipeline ============ 即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录: 2.f(records), f一次性作用于集合的全部数据: Spark运行的时候用的是第一种方式,为什么呢? 1.无需等待,

从物理执行角度透视Spark Job(23)

一.再次思考pipeline 即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1, f(record),f作用于集合的每一条记录,每次只作用于一条记录: 2, f(records),f一次性作用于集合的全部数据: Spark采用是是第一种方式,原因: 1, 无需等待,可以最大化的使用集群的计算资源: 2, 减少OOM的发生: 3, 最大化的有利于并发: 4, 可以精准的控制每一Partition本身(Dependency)及其内部的计算(compute):

从实现的角度解读区块链

前言 随着比特币的起起伏伏,区块链技术越来越受到关注.区块链和比特币是当下和人工智能一样风靡的领域.人们开始寻找区块链技术的用武之地,已经有了不少的尝试.但是区块链的价值所在众说纷纭,特别是一些媒体胡乱吹嘘或者是故意贬低,给大家都带来了不小的困惑.其实作为一名技术人员只需要理解其底层实现原理和运行机制,之后其应用场景和发展前景相信都会有自己的见解. 在阅读完本篇文章对区块链有了一定的了解后可以参考区块链的一个简单的模拟实现来加深自己的理解:(toychain) 欢迎star :) 区块链和比特币

详细解读Spark的数据分析引擎:Spark SQL

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一.spark SQL:类似于Hive,是一种数据分析引擎 什么是spark SQL? spark SQL只能处理结构化数据 底层依赖RDD,把sql语句转换成一个个RDD,运行在不同的worker上 特点: 1.容易集成:SQL语句 2.对不同的数据源提供统一的访问方式:DataFrame 用DataFrame屏蔽数据源的差别 3.兼容Hive

海纳百川、有容乃大:从4个角度解读京东3C新方向

4月6日,京东集团在北京召开媒体沟通会并正式推出"3C购物节"活动,除了多位京东集团高管出席该会议之外,来自手机行业.PC领域等知名厂商的代表均悉数出席.在会议上,京东高管胡胜利.熊青云等核心人物还就京东3C在当前及未来的长远发展提出了新的4个发展方向,同时还就京东在2016年的政策.规则等细节条款作了详尽解答.笔者本人作为受邀出席的自媒体,有幸记录并思考了京东提及的这4个新方向,用现在的思路来看,笔者认为这4个新方向对于京东或整个电商产业而言,或许将意味着新的变革方向. 1.平台服务

从软件工程的角度解读任正非的新年公开信

转自:https://www.cnblogs.com/dotey/p/10220520.html 昨天被任正非的那封<全面提升软件工程能力与实践,打造可信的高质量产品>的公开信刷屏了,作为一个软件工程专业科班出身的软件开发从业者,自然是引起了我(@宝玉xp)的好奇,仔细阅读之下确实让我大吃一惊,看似八股官方文,但细看之下是作者对于软件工程的理解确实非常深刻,各种专业术语信手拈来,比喻恰到好处. 我对华为的研发其实一直挺好奇的,从传统的硬件公司,到现在软硬件齐头并进,华为手机销量都已经超过了苹果