1.spark的wordcount解析

一、Eclipse(scala IDE)开发local和cluster

(一). 配置开发环境

  1. 要在本地安装好java和scala。

    由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要需要安装好java和scala并在环境变量中配置好。

  2. 下载scala IDE for eclipse安装 连接:http://scala-ide.org/download/sdk.html 

    打开ide新建scala project

    点击file -> new ->Scala Project ,在弹出的对话框中弹性project name 为“WordCount”,默认点击next,点击finish的。

  3. 修改Scala版本 

    项目创建完成后默认使用的是scala的2.11.7
    版本。要手动将版本换成2.10.X。在项目名称右击选择properties,在弹出窗口点击,scala Compiler,在右侧窗口,选中Use
    Project settings, 将scala Installation 修改为Latest 2.10
    bundle(dynamic).点击apply,点击ok。scala版本变成2.10.6。

  4. 找到依赖的spark jar文件并导入到eclipse中。 

    所依赖的jar文件是

    spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。

    在项目名称上右击,选择build path ->configure build path。在弹出框中点击library,点击右侧的addExternalJARs,然后选择

    park-assembly-1.6.0-hadoop2.6.0.jar点击打开,然后点击ok。

(二)、spark程序开发步骤

1. 在src下建立spark程序工程包

在src上右击new ->package 填入package的name为com.dt.spark。

2. 创建scala的入口类。

在包的名字上右击选择new ->scala class 。在弹出框中Name 中,在增加WordCount。点击finish。

在方法内部讲关键字class 改成object ,然后创建main方法。

3. local模式代码方法

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. def main(args: Array[String]): Unit ={
    4. * 集群的master的URL,如果设置为local则在本地运行。
    5. val conf = new SparkConf()
    6. conf.setMaster("local")
    7. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
    8. * */
    9. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
    10. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition
    11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
    12. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
    13. (word, 1)} //在单词拆分基础上对每个单词实例计数为1
    14. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
    15. }
    16. 在运行过程中会出现WARN NativeCodeLoader: Unable to load native-Hadoop library for your platform... using builtin-Javaclasses where applicable。Java.io.IOException: Could not locate executable null\bin\winutils.exe in the
      Hadoop binaries. 这个错误。但是在local模式下,这个是正常的。因为spark是和hadoop编译在一起的,我们在window 下开发,缺少hadoop的配置。这不是程序错误,也不影响我们的任何功能。

      4.编写Cluster模式代码

      1. import org.apache.spark.SparkContext
      2. def main(args: Array[String]){
      3. * 集群的master的URL,如果设置为local则在本地运行。
      4. val conf = new SparkConf() //创建SparkConf对象
      5. // conf.setMaster("spark://master:7077")
      6. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
      7. * */
      8. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
      9. * */
      10. val lines = sc.textFile("/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partions
      11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
      12. val words = lines.flatMap { line =>line.split(" ")} //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合
      13. val pairs = words.map { word => (word, 1) }
      14. wordCounts.collect.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
      15. }
      16. 将程序达成jar 包 

        在项目名称上右击点击export选择java 下的jar file,点击next,选择输出目录,输入文件名,点击next,点击next,然后点击完成。导出jar 包。

        将jar 放到Linux系统某个目录中。执行 
        ./spark-submit --class com.dt.spark.WordCount_Cluster --master spark://worker1:7077 ./wordcount.jar

        也可以将以上命令保存到.sh文件中,直接执行sh文件即可。

        二、使用idea开发spark的Local和Cluster

        (一)、配置开发环境

        1. 要在本地安装好java和scala。

        由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要需要安装好java和scala并在环境变量中配置好

        2. 下载IDEA 社区版本,选择windows 版本并按照配置。

        安装完成以后启动IDEA,并进行配置,默认即可,然后点击ok以后,设置ui风格,然后点击next 会出现插件的选择页面,默认不需求修改,点击next,选择安装scala语言,点击install 按钮(非常重要,以为要开发spark程序所以必须安装),等安装完成以后点击start启动IDEA。

        3. 创建scala项目

        点击 create new project ,然后填写project name为“Wordcount”,选择项目的保存地址project location。

        然后设置project sdk即java 的安装目录。点击右侧的new 按钮,选择jdk,然后选择java 的安装路径即可。

        然后选择scalasdk。点击右侧的create ,默认出现时2.10.x 版本的scala,点击ok即可。然后点击finish。

        4. 设置spark的jar 依赖。

        点击file->project
        structure 来设置工程的libraries。核心是添加spark的jar依赖。选择Libraries
        ,点击右侧的加号,选择java,选择spark1.6.0
        的spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。点击ok。稍等片刻后然后点击ok(Libraries作用于WordCount),然后点击apply,点击ok。(这一步很重要,如果没有无法编写spark的代码)

        (二)、编写代码

        1. 在src下建立spark程序工程包

        在src上右击new ->package 填入package的name为com.dt.spark。

        2. 创建scala的入口类。

        在包的名字上右击选择new ->scala class 。在弹出框中填写Name ,并制定kind为object ,点击ok。

        3. 编写local代码

        1. import org.apache.spark.SparkConf
        2. import org.apache.spark.rdd.RDD
        3. def main(args: Array[String]): Unit ={
        4. * 集群的master的URL,如果设置为local则在本地运行。
        5. val conf = new SparkConf()
        6. conf.setMaster("local")
        7. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
        8. * */
        9. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
        10. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition
        11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
        12. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
        13. (word, 1)} //在单词拆分基础上对每个单词实例计数为1
        14. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
        15. }
        16. 在代码去右击选择点击run”wordCount”来运行程序。在生成环境下肯定是写自动化shell 脚本自动提交程序的。 

          注意:如果val sc = new SparkContext(conf)报错,并且没有运行结果,需要将scala的module改成scala
          2.10版本的。具体操作:File->project structure -> Dependencies ->删除scala
          2.11.x的module.-> 左上角的“+” -> scala ->选中scala2.10.4 -> apply

          4. 编写Cluster模式代码

          1. import org.apache.spark.SparkConf
          2. import org.apache.spark.rdd.RDD
          3. def main(args: Array[String]): Unit ={
          4. * 集群的master的URL,如果设置为local则在本地运行。
          5. val conf = new SparkConf()
          6. //conf.setMaster("spark://master:7077")
          7. * 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
          8. val sc = new SparkContext(conf)
          9. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
          10. * */
          11. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
          12. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
          13. (word, 1)} //在单词拆分基础上对每个单词实例计数为1
          14. pairs._2, pairs._1)).sortByKey(false).map(pair=>(pair._1, pair._2))//相同的key,value累加并且排名
          15. println(wordNumberPair._1 + ":" + wordNumberPair._2))
          16. }
          17. 将程序达成jar 包 

            点击file->project structure,在弹出的页面点击Artifacts,点击右侧的“+”,选择jar –> from
            modules with dependencies,在弹出的页面中,设置好main class
            然后点击ok,在弹出页面修改Name(系统生成的name不规范)、导出位置并删除scala和spark的jar(因为集群环境中已经存在)点击ok
            。然后在菜单栏中点击build –> Artifacts ,在弹出按钮中,点击bulid,会自动开始打包。

            在spark中执行wordcount方法。

            将jar 放到linux系统某个目录中。执行

            1. 注意事项: 

              为什么不能再ide开发环境中,直接发布spark程序到spark集群中?

              1. 开发机器的内存和cores的限制,默认情况情况下,spark程序的dirver在提交spark程序的机器上,如果在idea中提交程序的话,那idea机器就必须非常强大。

              2. Dirver要指挥workers的运行并频繁的发生同学,如果开发环境和spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种不必要的问题。

              3. 这是不安全的。

              三、WordCount的java开发版本

            2. 安装jdk并配置环境变量 

              系统变量→新建 JAVA_HOME 变量。

              变量值填写jdk的安装目录(本人是 E:\Java\jdk1.7.0)

              系统变量→寻找 Path 变量→编辑

              在变量值最后输入 %JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;(注意原来Path的变量值末尾有没有;号,如果没有,先输入;号再输入上面的代码)

              系统变量→新建 CLASSPATH 变量值填写 .;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar(注意最前面有一点)

            3. Maven的安装和配置 

              解压apache-maven-3.1.1-bin.zip,并把解压后的文件夹下的apache-maven-3.1.1文件夹移动到D:\Java下,如果没有Java这个文件夹的话,请自行创建

              新建系统变量 MAVEN_HOME 变量值:D:\Java\apache-maven-3.1.1。编辑系统变量 Path 添加变量值: ;%MAVEN_HOME%\bin。

              在mave 的目录中修改conf/settings.xml,在localRepository属性后添加D:/repository修改maven下载jar 的位置。

            4. eclipse 中java 和maven 的配置 

              点击 window ->java ->Installed JREs ->add ->standard vm ,点击next ,然后选择jdk 的安装路径点击finish即可。

              点击window ->Maven ->Installations ->add 在弹出页面选择mave 的安装路径,然后点击finish。然后在列表中选择我们自己刚添加的那个maven信息。

              然后点击window ->Maven ->User Setings 在右侧的User Settings 点击browse 现在mavenconf目录下的setttings.xml .(主要是修改maven下载依赖包存放的位置)

            (二). 创建maven项目

            1. 创建maven项目

              点击file ?->new ->others ->maven project
              点击next,选择maven-archetype-quickstart ,点击next,group id 为
              com.dt.spark,artifact id 为 sparkApps,然后点击finish。

            2. 修改jdk和pom文件 

              创建maven项目后,默认的jdk是1.5要改成我们前面安装好的jdk1.8。在项目上右击build path ->configure
              build path 。在弹出页面点击Libraries,选中jre system library
              。点击edit,在弹出框选择workspace default jre
              ,然后点击finish。然后在点击ok。将pom文件修改为如下内容,然后等待eclipse下载好maven依赖的jar包,并编译工程。编译好工程后有个错误提示,在此错误列上,右击选择quick
              fix ,在弹出页面点击finish即可。

            1. xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            2. 4.0.0</modelVersion>
            3. <groupId>com.dt.spark</groupId>
            4. SparkApps</artifactId>
            5. 0.0.1-SNAPSHOT</version>
            6. jar</packaging>
            7. <name>SparkApps</name>
            8. http://maven.apache.org</url>
            9. <properties>
            10. UTF-8</project.build.sourceEncoding>
            11. <dependencies>
            12. junit</groupId>
            13. junit</artifactId>
            14. 3.8.1</version>
            15. test</scope>
            16. org.apache.spark</groupId>
            17. spark-core_2.10</artifactId>
            18. 1.6.0</version>
            19. org.apache.spark</groupId>
            20. spark-sql_2.10</artifactId>
            21. 1.6.0</version>
            22. org.apache.spark</groupId>
            23. spark-hive_2.10</artifactId>
            24. 1.6.0</version>
            25. org.apache.spark</groupId>
            26. spark-streaming_2.10</artifactId>
            27. 1.6.0</version>
            28. org.apache.hadoop</groupId>
            29. hadoop-client</artifactId>
            30. 2.6.0</version>
            31. org.apache.spark</groupId>
            32. spark-streaming-kafka_2.10</artifactId>
            33. 1.6.0</version>
            34. org.apache.spark</groupId>
            35. spark-graphx_2.10</artifactId>
            36. 1.6.0</version>
            37. <build>
            38. src/main/java</sourceDirectory>
            39. src/main/test</testSourceDirectory>
            40. <plugins>
            41. maven-assembly-plugin</artifactId>
            42. jar-with-dependencies</descriptorRef>
            43. make-assembly</id>
            44. package</phase>
            45. single</goal>
            46. org.codehaus.mojo</groupId>
            47. exec-maven-plugin</artifactId>
            48. 1.3.1</version>
            49. exec</goal>
            50. java</executable>
            51. false</includeProjectDependencies>
            52. compile</classpathScope>
            53. com.dt.spark.SparkApps.WordCount</mainClass>
            54. org.apache.maven.plugins</groupId>
            55. maven-compiler-plugin</artifactId>
            56. 1.6</source>
            57. 1.6</target>
            58. </project>
            1. 创建包路径以及java代码

              在包路径com.dt.spark.SparkApps上右击 new ->package 在弹出页面name中填写com.dt.spark.SparkApps.cores,点击finish的。

              在包路径下com.dt.spark.SparkApps.cores上右击 new ->class ,在弹出窗口中name中填写WordCount ,点击finish。然后在WordCount 中编写如下代码。

            (三). local版本

            1. import java.util.Arrays;
            2. import scala.Function;
            3. public static void main(String[] args){
            4. //其底层就是scala的SparkContext
            5. String> lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md");
            6. String> words = lines.flatMap(new FlatMapFunction<String, String>(){
            7. public Iterable<String> call(String line)throws Exception{
            8. });
            9. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
            10. public Tuple2<String, Integer> call(String word)throws Exception{
            11. String, Integer>(word, 1);
            12. });
            13. JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
            14. public Integer call(Integer v1, Integer v2)throws Exception{
            15. });
            16. wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
            17. public void call(Tuple2<String, Integer>pair)throws Exception{
            18. });
            19. }

          在代码区右击run as -> java application 。来运行此程序并查看运行结果。

          (四). cluster版本的代码

          1. import java.util.Arrays;
          2. import scala.Function;
          3. public static void main(String[] args){
          4. String> lines = sc.textFile("/library/wordcount/input/Data");
          5. String> words = lines.flatMap(new FlatMapFunction<String, String>(){
          6. public Iterable<String> call(String line)throws Exception{
          7. });
          8. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
          9. public Tuple2<String, Integer> call(String word)throws Exception{
          10. String, Integer>(word, 1);
          11. });
          12. JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
          13. public Integer call(Integer v1, Integer v2)throws Exception{
          14. });
          15. wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
          16. public void call(Tuple2<String, Integer>pair)throws Exception{
          17. });
          18. }

        四、彻底解析wordcount运行原理

        1. 从数据流动视角解密WordCount

        即用Spark作单词计数统计,数据到底是怎么流动的,参看一图:

        从数据流动的视角分析数据到底是怎么被处理

        1. word,1)).reduceByKey(_+_).saveAsTextFile(outputPathwordcount)

        简单实验

        (1)在IntelliJ IDEA中编写下面代码:

        1. import org.apache.spark.SparkConf
        2. object WordCount {
        3. valconf = new SparkConf()
        4. conf.setMaster("local")
        5. val lines = sc.textFile("D://tmp//helloSpark.txt", 1)
        6. line.split(" ") }
        7. (word,1) }
        8. wordCounts.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
        9. }
        10. (2)在D盘下地tmp文件夹下新建helloSpark.txt文件,内容如下:
        11. Hello Hadoop
        12. Spark is awesome
        13. Flink : 1
        14. is : 1
        15. awesome : 1
        16. Scala : 1

        Spark有三大特点:

        1. 分布式。无论数据还是计算都是分布式的。默认分片策略:Block多大,分片就多大。但这种说法不完全准确,因为分片切分时有的记录可能跨两个Block,所以一个分片不会严格地等于Block的大小,例如HDFS的Block大小是128MB的话,分片可能多几个字节或少几个字节。一般情况下,分片都不会完全与Block大小相等。

          分片不一定小于Block大小,因为如果最后一条记录跨两个Block的话,分片会把最后一条记录放在前一个分片中。

        2. 基于内存(部分基于磁盘)
        3. 迭代

        查看在SparkContext.scala中的testFile源码

        1. path: String,
        2. assertNotStopped()
        3. minPartitions).map(pair => pair._2.toString)
        4. 可以看出在进行了hadoopFile之后又进行了map操作。 

          HadoopRDD从HDFS上读取分布式文件,并且以数据分片的方式存在于集群之中。

          RDD.scala中的map源码

          1. * Return a new RDD by applying a function to all elements of this RDD.
          2. def map[U: ClassTag](f: T => U): RDD[U] = withScope {
          3. new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
          4. 读取到的一行数据(key,value的方式),对行的索引位置不感兴趣,只对其value事情兴趣。pair时有个匿名函数,是个tuple,取第二个元素。 

            此处又产生了MapPartitionsRDD。MapPartitionsRDD基于hadoopRDD产生的Parition去掉行的KEY。

            注:可以看出一个操作可能产生一个RDD也可能产生多个RDD。如sc.textFile就产生了两个RDD:hadoopRDD和MapParititionsRDD。

            下一步:

            1. line.split(" ") }

            对每个Partition中的每行进行单词切分,并合并成一个大的单词实例的集合。

            FlatMap做的一件事就是对RDD中的每个Partition中的每一行的内容进行单词切分。

            这边有4个Partition,对单词切分就变成了一个一个单词,

            下面是FlatMap的源码(RDD.scala中)

            1. * Return a new RDD by first applying a function to all elements of this
            2. */
            3. TraversableOnce[U]): RDD[U] = withScope {
            4. new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
            5. 可以看出flatMap又产生了一个MapPartitionsRDD,此时的各个Partition都是拆分后的单词。 

              下一步:

              1. (word,1) }

              将每个单词实例变为形如word=>(word,1)

              map操作就是把切分后的每个单词计数为1。

              根据源码可知,map操作又会产生一个MapPartitonsRDD。此时的MapPartitionsRDD是把每个单词变成Array(""Hello",1),("Spark",1)等这样的形式。

              下一步:

              1. reduceByKey是进行全局单词计数统计,对相同的key的value相加,包括local和reducer同时进行reduce。所以在map之后,本地又进行了一次统计,即local级别的reduce。

                shuffle前的Local Reduce操作,主要负责本地局部统计,并且把统计后的结果按照分区策略放到不同的File。

                下一Stage就叫Reducer了,下一阶段假设有3个并行度的话,每个Partition进行Local Reduce后都会把数据分成三种类型。最简单的方式就是用HashCode对其取模。

                至此都是stage1。

                Stage内部完全基于内存迭代,不需要每次操作都有读写磁盘,所以速度非常快。

                reduceByKey的源码(PairRDDFunctions.scala中):

                1. V): RDD[(K, V)] = self.withScope {
                2. v, func, func, partitioner)
                3. * Merge the values for each key using an associative and commutative reduce function. This will
                4. * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
                5. def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
                6. }
                7. /**
                8. * also perform the merging locally on each mapper before sending results to a reducer, similarly
                9. * parallelism level.
                10. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
                11. }

                可以看到reduceByKey内部有combineByKeyWithClassTag。combineByKeyWithClassTag的源码如下:

                1. createCombiner: V => C,
                2. C,
                3. C,
                4. mapSideCombine: Boolean = true,
                5. require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
                6. if (mapSideCombine) {
                7. }
                8. throw new SparkException("Default partitioner cannot partition array keys.")
                9. }
                10. self.context.clean(createCombiner),
                11. self.context.clean(mergeCombiners))
                12. self.mapPartitions(iter => {
                13. new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
                14. } else {
                15. .setSerializer(serializer)
                16. .setMapSideCombine(mapSideCombine)
                17. }

                可以看出在combineByKeyWithClassTag内又new 了一个ShuffledRDD。

                ReduceByKey有两个作用:

                1. 进行Local级别的Reduce,减少网络传输。

                2. 把当前阶段的内容放到本地磁盘上供shuffle使用。

                下一步是shuffledRDD,

                产生Shuffle数据就需要进行分类,MapPartitionsRDD时其实已经分好类了,最简单的分类策略就是Hash分类。

                ShuffledRDD需要从每台机上抓取同一单词。

                reduceByKey发生在哪里?

                Stage2全部都是reduceByKey

                最后一步:保存数据到HDFS(MapPartitionsRDD)

                统计完的结果:(“Hello”,4)只是一个Value,而不是Key:"Hello",value:4。但输出到文件系统时需要KV的格式,现在只有Value,所以需要造个KEY。

                saveAsTextFile的源码:

                1. this.map(x => (NullWritable.get())),new Text(x.toStirng))
                2. }

                this.map把当前的值(x)变成tuple。tuple的Key是Null,Value是(“Hello”,4)。

                为什么要为样?因为saveAsHadoopFile时要求以这样的格式输出。Hadoop需要KV的格式!!

                map操作时把key舍去了,输出时就需要通过生成Key。

                第一个Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD

                第二个Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD

                只有Collect 或saveAsTextFile会触发作业,其他的时候都没有触发作业(Lazy)

                2. 从RDD依赖关系的视角解密WordCount。Spark中的一切操作皆RDD,后面的RDD对前面的RDD有依赖关系。

                3. DAG与Lineage的思考。依赖关系会形成DAG。

时间: 2024-10-12 04:21:30

1.spark的wordcount解析的相关文章

spark 的 wordcount

记录spark的Wordcount小程序: 前提:hdfs已经打开 创建一个name为wc.input的文件,上传到hdfs中的/user/hadoop/spark/中,内容如上图 [[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -put wc.input /user/hadoop/spark/            上传 [[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs

Spark的wordcount程序产生多少个RDD?

val rdd = sc.textFile("hdfs://Master.hdp:9000/wc").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collectrdd.saveAsTextFile("hdfs://Master.hdp:9000/out01") 思考:在spark的wordcount过程一共产生多少个RDD? 通过该命令(scala>

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6.2 Hadoop 2.6.4 IntelliJ IDEA 2016.1.1 2. 创建项目1) 新建Maven项目 2) 在pom文件中导入依赖pom.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> &l

Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法

上篇文章< Spark 源码解析 : DAGScheduler中的DAG划分与提交 >介绍了DAGScheduler的Stage划分算法. 本文继续分析Stage被封装成TaskSet,并将TaskSet提交到集群的Executor执行的过程 在DAGScheduler的submitStage方法中,将Stage划分完成,生成拓扑结构,当一个stage没有父stage时候,会调用DAGScheduler的submitMissingTasks方法来提交该stage包含tasks. 首先来分析一下

Spark之SQL解析(源码阅读十)

如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么.之前总结的已经写了传统数据库与Spark的sql解析之间的差别.那么我们下来直切主题~ 如今的Spark已经支持多种多样的数据源的查询与加载,兼容了Hive,可用JDBC的方式或者ODBC来连接Spark SQL.下图为官网给出的架构.那么sparkSql呢可以重用Hive本身提供的元数据仓库(MetaStore).HiveQL.以及用户自定义函数(UDF)及序列化和反序列化的工具(SerDes). 下来我们来

hadoop 启动后执行wordcount解析(No such file or directory错误)

hadoop 启动后执行wordcount解析 第一个  hadoop fs -mkdir input 结果出现了错误No such file or directory 查资料,应该是 执行命令为:hadoop fs -mkdir /input 1.x是可以执行的,而2.x的执行命令为:hadoop fs -mkdir /

用SBT编译Spark的WordCount程序

问题导读: 1.什么是sbt? 2.sbt项目环境如何建立? 3.如何使用sbt编译打包scala? sbt介绍 sbt是一个代码编译工具,是scala界的mvn,可以编译scala,java等,需要java1.6以上. sbt项目环境建立 sbt编译需要固定的目录格式,并且需要联网,sbt会将依赖的jar包下载到用户home的.ivy2下面,目录结构如下: |--build.sbt |--lib |--project |--src | |--main | | |--scala | |--tes

spark 例子wordcount topk

spark 例子wordcount topk 例子描述: [单词计算wordcount ] [词频排序topk] 单词计算在代码方便很简单,基本大体就三个步骤 拆分字符串 以需要进行记数的单位为K,自己拼个数字1为V,组成一个映射或者元组 分组(groupByKey) 词频排序 将分组后的数据进行排序 代码片段: /* 单词计算wordcount */ val input = Source.fromFile("E:/test.txt").getLines.toArray val wc

Spark 加强版WordCount ,统计日志中文件访问数量

原文地址:http://blog.csdn.net/whzhaochao/article/details/72416956 写在前面 学习Scala和Spark基本语法比较枯燥无味,搞搞简单的实际运用可以有效的加深你对基本知识点的记忆,前面我们完成了最基本的WordCount功能的http://blog.csdn.net/whzhaochao/article/details/72358215,这篇主要是结合实际生产情况编写一个简单的功能,功能就是通过分析CDN或者Nginx的日志文件,统计出访问