RDD的转换操作,分三种:单value,双value交互,(k,v)对

import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Transformation {

  def main(args: Array[String]): Unit = {

    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Transformation")

    val sc = new SparkContext(config)

    val listRDD = sc.makeRDD(1 to 10)
    val listRDD2 = sc.makeRDD(Array(List(1, 2), List(3, 4)))
    val listRDD3 = sc.makeRDD(5 to 14)

    /***************************单value*****************************/

    /**
      * map(func)
      * 每次处理1条数据
      */

//    val mapRDD = listRDD.map(_ * 2)

    /**
      * mapPartitions(func)
      * 每次处理一组分区数据,效率高,但可能出现内存溢出(因为处理完一组分区后再释放)
      */

//     val mapPartitionsRDD = listRDD.mapPartitions(datas=>{
//       datas.map(data => data * 2)
//     })

    /**
      * mapPartitionsWithIndex(func)
      * 函数的输入多了分区号
      */

//    val tupleRDD: RDD[(Int, String)] = listRDD.mapPartitionsWithIndex {
//      case (num, datas) => {
//        datas.map((_, " 分区号:" + num))
//      }
//    }

    /**
      *  flatMap(func)
      *  将map后的数据扁平
      */

//    val flatMAPRDD: RDD[Int] = listRDD2.flatMap(datas => datas)

    /**
      * glom()
      * 将一个分区的数据放在一个数组里
      */

//    val glomRDD: RDD[Array[Int]] = listRDD.glom()

    /**
      * groupBy(func)
      * 按照函数的返回值进行分组,分组后的数据(K:分组的key,V:分组的集合)
      */

//    val groupByRDD: RDD[(Int, Iterable[Int])] = listRDD.groupBy(i => i%2)
//    groupByRDD.collect().foreach(println)

    /**
      * filter(func)
      * 按照返回值为true的过滤
      */

//    val filterRDD: RDD[Int] = listRDD.filter(x => x % 2 ==0)
//    filterRDD.collect().foreach(println)

    /**
      * sample(withReplacement : scala.Boolean, fraction : scala.Double, seed : scala.Long)
      * 随机抽样
      */

//    val sampleRDD: RDD[Int] = listRDD.sample(false, 0.4, 1)
//    sampleRDD.collect().foreach(println)

    /**
      * distinct()
      * 去重,且去重后会shuffler,可以指定去重后的分区数
      */

//    val distinctRDD: RDD[Int] = listRDD.distinct()
//    distinctRDD.collect().foreach(println)

    /**
      * coalesce(n)
      * 缩减分区的数量,可以简单的理解为合并分区,默认,没有shuffler,可以加参数true指定shuffler
      */

//    println("缩减分区前 = " + listRDD.partitions.size)
//    val coalesceRDD: RDD[Int] = listRDD.coalesce(2)
//    println("缩减分区前 = " + coalesceRDD.partitions.size)

    /**
      * repartition()
      * 重新分区,有shuffler。它其实就是带true的coalesce
      */

//    listRDD.glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })
//    val repartitionRDD: RDD[Int] = listRDD.repartition(2)
//    repartitionRDD.glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * sortBy(f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length))
      * 根据函数排序
      */

//    val sortByRDD: RDD[Int] = listRDD.sortBy(n => n % 2, false)
//    sortByRDD.collect().foreach(println)

    /**************************双value交互*****************************/

    /**
      * 双value交互
      * A.union(B)         对A、B合并。(不去重)
      * A.subtract(B)      对A减去和B中的相同的
      * A.cartesian(B)     对A、B求笛卡尔乘积
      * A.zip(B)           将A、B组成(k,v),个数、分区数要相等
      * A.union(B) 对A、B求并集
      */

//    listRDD.union(listRDD3).collect().foreach(println)
//    listRDD.subtract(listRDD3).collect().foreach(println)
//    listRDD.intersection(listRDD3).collect().foreach(println)
//    listRDD.cartesian(listRDD3).collect().foreach(println)
//    listRDD.zip(listRDD3).collect().foreach(println)

    /**************************(k,v)对*******************************/

    val pairRDD1: RDD[(Int, String)] = sc.parallelize(Array((1, "aa"), (1, "bb"), (3, "cc"), (3, "dd")),  4)
    val pairRDD2: RDD[(String, Int)] = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4),
                                                            ("b", 3), ("c", 6), ("c", 8)),  2)
    val pairRDD3: RDD[(Int, String)] = sc.parallelize(Array((1, "zzz"), (3, "xxx")))

    /**
      * partitionBy(partitioner: Partitioner)
      * 按照分区器进行分区
      */

//    pairRDD1.partitionBy(new org.apache.spark.HashPartitioner(2))
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

//    pairRDD1.partitionBy(new MyPartitioner(3))
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * groupByKey()
      * 单纯把key相等的value放在一起,生成序列
      */
//    pairRDD1.groupByKey().collect().foreach(println)

    /**
      * reduceByKey(func)
      * 按key聚合,并且按函数对key相等的value进行操作
      */

//    pairRDD1.reduceByKey(_ + _)
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
      * zeroValue:每个分区的每一个key的初始值
      * seqOp:每个分区里的聚合函数
      * seqOp:分区间的聚合函数
      */

    // 取出每个分区相同对key的最大值,在相加
//    pairRDD2.aggregateByKey(0)(math.max(_,_), _+_)
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * foldByKey(zeroValue: V)(func: (V, V) => V)
      * 其实就是aggregateByKey的简化版,seqOp和seqOp相同
      */

//    pairRDD2.foldByKey(0)(_ + _)
//      .glom().collect().foreach(arrays => {
//      println(arrays.mkString(","))
//    })

    /**
      * combineByKey[C](
      * createCombiner: V => C,
      * mergeValue: (C, V) => C,
      * mergeCombiners: (C, C) => C,
      * partitioner: Partitioner,
      * mapSideCombine: Boolean = true,
      * serializer: Serializer = null)
      *
      * 主要就是比aggregateByKey多了一个createCombiner,用于计算初始值
      */

    // 计算相同key的value的均值
//    pairRDD2.combineByKey(
//      (_, 1),
//      (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1),
//      (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
//      .map{case (key, value) => (key, value._1 / value._2.toDouble)}
//      .collect().foreach(println)

    /**
      * sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      * 按key排序
      */

//    pairRDD1.sortByKey(true)
//      .collect().foreach(println)

    /**
      * mapValues(func)
      * 只对value做转换
      */

//    pairRDD1.mapValues(value => value + "|||")
//      .collect().foreach(println)

    /**
      * A.join(B, numP)
      * 把key相同的value组合在一起(性能较低)
      */

//    pairRDD1.join(pairRDD3)
//      .collect().foreach(println)

    /**
      * A.cogroup(B)
      * (k, v1) 和 (k, v2)cogroup 后,得到(k, v1集合,v2集合)
      */

    pairRDD1.cogroup(pairRDD3)
      .collect().foreach(println)

    sc.stop()

  }
}

// 自定义分区器
class MyPartitioner (partitions: Int) extends Partitioner {
  override def numPartitions: Int = {
    partitions
  }

  override def getPartition(key: Any): Int = {
    1
  }
}

  //只写代码不让我发出来--忽略这一行

原文地址:https://www.cnblogs.com/liangyan131/p/12019351.html

时间: 2024-12-10 23:20:48

RDD的转换操作,分三种:单value,双value交互,(k,v)对的相关文章

javascript浮点数转换成整数三种方法

将浮点数转换成整数方法有很多,分享三种常用方法. Summary 暂时我就想到3个方法而已.如果读者想到其他好用方法,也可以交流一下 parseInt位运算符Math.floor Math.ceil Description 一.parseInt 1. 实例 parseInt("13nash");//13 parseInt("")// NaN parseInt("0xA") //10(十六进制) parseInt(" 13")/

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

Spark算子:RDD基本转换操作(1)–map.flatMap.distinct 关键字:Spark算子.Spark RDD基本转换.map.flatMap.distinct map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素. 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区. hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive //读取HDFS文件到RDD sca

PHP实现链式操作的三种方法详解

这篇文章主要介绍了PHP实现链式操作的三种方法,结合实例形式分析了php链式操作的相关实现技巧与使用注意事项,需要的朋友可以参考下 本文实例讲述了PHP实现链式操作的三种方法.分享给大家供大家参考,具体如下: 在php中有很多字符串函数,例如要先过滤字符串收尾的空格,再求出其长度,一般的写法是: strlen(trim($str)) 如果要实现类似js中的链式操作,比如像下面这样应该怎么写? $str->trim()->strlen() 下面分别用三种方式来实现: 方法一.使用魔法函数__ca

创建一对多表结构实例 /操作的三种方式

例 1.注册App01  完成各项配置 2. 写完后自动生成一个id自增列(主键) 如果不想生成 自己写 创建两张表 3.执行创建语句 (其中还进行了一个小修改) 4.按照之前的方法 打开数据库 并输入数据 5.修改表结构 法一: 在更新时 遇到选择 因为已经存入数据 新建列默认不能为Null 默认为sa 注意输入的是字符串 刷新 法二: 法三: ====================== 接下来进行view 应该先看到业务线  再看到主机 1.urls 注意:如果同时有 bussiness

使用单例时的三种单例写法

单例:一个类只有一个实例,在外部创建对象时,不能用alloc.(只要alloc,就会在堆区开辟空间,就意味着有多个对象)所以我们要提供一个创建对象的方法: 1.加号方法 2.返回值类型为当前类 3.方法名以default ,standared,main,shared等开头 + 当前类名 下面以Person类为例 在.h文件中声明 + (Person *)sharePerson; 在.m文件实现 第一种模式(单线程使用这种模式) + (Person *)sharePerson { 声明为stati

Spring实现初始化和销毁bean之前进行的操作,三种方式

关于在spring  容器初始化 bean 和销毁前所做的操作定义方式有三种: 第一种:通过@PostConstruct 和 @PreDestroy 方法 实现初始化和销毁bean之前进行的操作 第二种是:通过 在xml中定义init-method 和  destory-method方法 第三种是:通过bean实现InitializingBean和 DisposableBean接口 下面演示通过  @PostConstruct 和 @PreDestory 1:定义相关的实现类: [java] v

利用python找出两文件夹里相同的文件并保存在新的文件夹下(分三种情况)

原文件夹A,B,新文件夹C,下图中的情况以图片为例 A:00001.jpg  00002.jpg   00003.jpg  00147.jpg B : 00001.jpg  000000002.jpg   00147.json 第一种情况:找出两文件夹下相同内容的文件,保存并输出到文件夹C 思路:判断内容是否一致,因此需要读取整个文件,判断两者是否一样 由于文件内容错综复杂,而其md5是唯一的,如果两者内容一致,则两者的md5值应该为一样.由于图片是二进制存储,在读取时采用'rb'.这里是对文件

RDD的转换操作---RDD转换过程

1) union(otherRDD)RDD-->UnionRDD2) groupByKey(numPartitions)RDD-->ShuffledRDD-->MapPartitionsRDDgroupByKey() 只需要将 Key 相同的 records 聚合在一起,一个简单的 shuffle 过程就可以完成.3) reduceyByKey(func, numPartitions)reduceyByKey() 相当于传统的 MapReduceRDD-->MapPartition

Egret中的三种单例写法

1 普通的单例写法 class Single{ private static instance:Single; public static getInstance():Single{ if(this.instance == null){ this.instance = new Single(); } return this.instance; } public run(){ } } Single.getInstance().run(); 2 Module写法.仿照的Egret中Res资源类写法.