Spark 算子实现

1.map,flatmap,filter用的是scala的内部实现。

2.cogroup,intersection,join,leftOuterJoin,rightOuterJoin,fullOuterJoin

rdd1:[(1,2,3),(2,3,4)]

rdd2:[(1,3,5),(2,4,6)]

rdd1.cogroup(rdd2)

对rdd1调用cogroup: rdd1->cogroup(rdd2)->CoGroupRDD(rdd1,rdd2)->mapValues()->MapPartitionsRDD

cogroup首先会用rdd1和rdd2来new一个CoGroupRDD,然后在对这个CoGroupRDD调用mapValues生成MapPartitionsRDD。

2.1intersection的实现

map()->MapPartitionsRDD->cogroup()->CoGroupRDD->mapvalues()->MapPartitionsRDD->filter()->MapPartitionsRDD->keys()

先用map操作,将key转换为<key,null>,因为cogroup只对key-value做处理。然后在对<key,null>进行cogroup算子操作,最后筛选出非空的元素。

2.2join的实现

5.combineByKey

reduceByKey

groupByKey

每一个RDD由如下几个部分组成:

1.Partition[]

2.compute(Parition,context)

3.Dependency[]

4.prefreed locations

5.Partitioner.

Dependency

RDD之间的依赖:一个RDD可以依赖于一个或者多个RDD。

Partition之间的依赖:一个RDD可能依赖一个或者多个Partition

每一个RDD都有一个Dependency[],Dependency有如下几种:OneToOneDependency,RangeDependency,NarrowDependency,ShuffleDependency。

map操作生成的MapPartitionsRDD里的Partition,依赖于父RDD中的一个Partition。1:1关系。

reduceByKey生成的ShuffleRDD里的Partition,依赖于父RDD中的所有Partition。N:1

RDD依赖关系的建立

rdd1->rdd2->rdd3.

RDD的物理执行

task.run():rdd.iterator(parttion,context),如果使用了本地化级别,那么久去缓存里面找。如果使用了checkpoint,就使用它。否则的就调用rdd.compute(partition)来计算了。compute是一个抽象方法,具体实现在派生的RDD中。

我们来看一下这两个方法:

  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ‘‘not‘‘ be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }
  def compute(split: Partition, context: TaskContext): Iterator[T]

iterator(partition)对partition中的record进行计算,最后返回一个Iterator迭代器,也就是说呢,他将结果抽象成一个scala容器了。compute也是如此。

现在的问题是,compute如何对partition中的数据进行计算,并生成Iterator呢。我们以map操作为例:firstParent.iterator(partition,context)拿到父RDD

的计算结果(Iterator),然后再对这个Iterator调用map来生成结果。

比如rdd1.map(f)这个操作,map里面首先生成MapPartitionsRDD。然后当执行MapPartitionsRDD的compute(partition)时,就会执行如下操作:

1.firstParent.iterator(partition)拿到父RDD也就是rdd1的结算结果,这个结果是一个Iterator

2.f(Iterator),这个f就是用户向map传递的那个闭包函数。

时间: 2024-10-09 21:21:32

Spark 算子实现的相关文章

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现 测试数据 java代码 1 package com.hzf.spark.study; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaPairRDD; 8 import org.apache.s

Spark算子---实战应用

Spark算子实战应用 数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase 相关数据文件 : users.dat ---UserID::Gender::Age::Occupation::Zip-code movies.dat --- MovieID::Title::Genres ratings.dat ---UserID::MovieID::Rating::Timestamp SogouQ.mini 完成以下业务需求

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

Spark算子:RDD基本转换操作(1)–map.flatMap.distinct 关键字:Spark算子.Spark RDD基本转换.map.flatMap.distinct map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素. 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区. hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive //读取HDFS文件到RDD sca

Spark算子选择策略

摘要  1.使用reduceByKey/aggregateByKey替代groupByKey 2.使用mapPartitions替代普通map 3.使用foreachPartitions替代foreach 4.使用filter之后进行coalesce操作 5.使用repartitionAndSortWithinPartitions替代repartition与sort类操作 6.使用broadcast使各task共享同一Executor的集合替代算子函数中各task传送一份集合 7.使用相同分区方

(三)spark算子 分为3大类

ation算子通过sparkContext执行提交作业的runJob,触发rdd的DAG执行 (foreach) foreach(f) 会对rdd中的每个函数进行f操作,下面的f操作就是打印输出没有元素 saveAsTextFile 将rdd保存到hdfs指定的路径,将rdd中每一个分区保存到hdfs上的block saveAsObjectFile 将rdd中每10个元素组成一个array,然后将这个array序列化,映射为(null,bytesWritable(y)) 写入hdfs为Seque

spark算子介绍

1.spark的算子分为转换算子和Action算子,Action算子将形成一个job,转换算子RDD转换成另一个RDD,或者将文件系统的数据转换成一个RDD 2.Spark的算子介绍地址:http://spark.apache.org/docs/2.3.0/rdd-programming-guide.html 3.Spark操作基本步骤[java版本,其他语言可以根据官网的案例进行学习] (1)创建配置文件,将集群的运行模式设置好,给作业起一个名字,可以使用set方法其他配置设入. SparkC

(一)spark算子 分为3大类

value类型的算子 处理数据类型为value型的算子(也就是这个算子只处理数据类型为value的数据),可以根据rdd的输入分区与输出分区的关系分为以下几个类型 (1)输入分区与输出分区一对一型 map型:对rdd的每个数据项,通过用户自定义的函数映射转换成一个新的rdd 上面4个方框表示4个rdd分区,当第一个方框中的rdd经过用户自定义的map函数从v1映射为v,1.这种操作只有等到action算子触发后,这个函数才会和其他的函数在一个stage中对数据进行运算 flagMap型:将原来的

(二)spark算子 分为3大类

transgormation的算子对key-value类型的数据有三种: (1)输入 与 输出为一对一关系 mapValue();针对key-value类型的数据并只对其中的value进行操作,不对key进行操作 (2)对单个rdd聚集 combineByKey 相当于将(v1,2 v1,1)转为(v1,Seq(1,2))的rdd reduceByKey 就是将相同的key合并,算出他们的和 partitionBy 对rdd进行分区,如果原有的rdd与现在的rdd一致则不进行分区:如果不一致则根

spark 算子分析

别的不说先上官网: action 这些算子中需要注意: 1.reduce 和 reduceByKey 虽说都有reduce,但是一个是action级别,一个是transformation级别,速度上会有很大的差异 2.groupBy的使用如下 groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组. val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 ==