大数据教程:Transformation和Action算子演示

一、Transformation算子演示


val conf = new SparkConf().setAppName("Test").setMaster("local")
      val sc = new SparkContext(conf)

//通过并行化生成rdd
    val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10))

//map:对rdd里面每一个元乘以2然后排序
    val rdd2: RDD[Int] = rdd.map(_ * 2)
    //collect以数组的形式返回数据集的所有元素(是Action算子)
    println(rdd2.collect().toBuffer)

//filter:该RDD由经过func函数计算后返回值为true的输入元素组成
    val rdd3: RDD[Int] = rdd2.filter(_ > 10)
    println(rdd3.collect().toBuffer)

val rdd4 = sc.parallelize(Array("a b c","b c d"))
    //flatMap:将rdd4中的元素进行切分后压平
    val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))
    println(rdd5.collect().toBuffer)
    //假如: List(List(" a,b" ,"b c"),List("e c"," i o"))
    //压平 flatMap(_.flatMap(_.split(" ")))
    
    //sample随机抽样
    //withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
    //fraction抽样比例例如30% 即0.3 但是这个值是一个浮动的值不准确
    //seed用于指定随机数生成器种子 默认参数不传
    val rdd5_1 = sc.parallelize(1 to 10)
    val sample = rdd.sample(false,0.5)
    println(sample.collect().toBuffer)

//union:求并集
    val rdd6 = sc.parallelize(List(5,6,7,8))
    val rdd7 = sc.parallelize(List(1,2,5,6))
    val rdd8 = rdd6 union rdd7
    println(rdd8.collect.toBuffer)

//intersection:求交集
    val rdd9 = rdd6 intersection rdd7
    println(rdd9.collect.toBuffer)

//distinct:去重出重复
    println(rdd8.distinct.collect.toBuffer)

//join相同的key会被合并
    val rdd10_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))
    val rdd10_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))
    val rdd10_3 = rdd10_1 join rdd10_2
    println(rdd10_3.collect().toBuffer)
    
    //左连接和右连接
    //除基准值外是Option类型,因为可能存在空值所以使用Option
    val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 //以左边为基准没有是null
    val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 //以右边为基准没有是null
    println(rdd10_4.collect().toList)
    println(rdd10_5.collect().toBuffer)

val rdd11_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))
    val rdd11_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))
    //笛卡尔积
    val rdd11_3 = rdd11_1 cartesian rdd11_2
    println(rdd11_3.collect.toBuffer)
  
   //根据传入的参数进行分组
    val rdd11_5_1 = rdd11_4.groupBy(_._1)
    println(rdd11_5_1.collect().toList)

//按照相同key进行分组,并且可以制定分区
    val rdd11_5_2 = rdd11_4.groupByKey
    println(rdd11_5_2.collect().toList)

//根据相同key进行分组[分组的话需要二元组]
    //cogroup 和 groupBykey的区别
    //cogroup不需要对数据先进行合并就以进行分组 得到的结果是 同一个key 和不同数据集中的数据集合
    //groupByKey是需要先进行合并然后在根据相同key进行分组
    val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2
    println(rdd11_6)

二、Action算子演示


val conf = new SparkConf().setAppName("Test").setMaster("local[*]")
    val sc = new SparkContext(conf)
    /* Action 算子*/
    //集合函数
    val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
    val rdd1_1 = rdd1.reduce(_+_)
    println(rdd1_1)
    //以数组的形式返回数据集的所有元素
    println(rdd1.collect().toBuffer)
    //返回RDD的元素个数
    println(rdd1.count())
    //取出对应数量的值 默认降序, 若输入0 会返回一个空数组
    println(rdd1.top(3).toBuffer)
    //顺序取出对应数量的值
    println(rdd1.take(3).toBuffer)
    //顺序取出对应数量的值 默认生序
    println(rdd1.takeOrdered(3).toBuffer)
    //获取第一个值 等价于 take(1)
    println(rdd1.first())
    //将处理过后的数据写成文件(存储在HDFS或本地文件系统)
    //rdd1.saveAsTextFile("dir/file1")
    //统计key的个数并生成map k是key名 v是key的个数
    val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2)
    val rdd2_1: collection.Map[String, Long] = rdd2.countByKey()
    println(rdd2_1)
    //遍历数据
    rdd1.foreach(x => println(x))

/*其他算子*/
    //统计value的个数 但是会将集合中的一个元素看做是一个vluae
    val value: collection.Map[(String, Int), Long] = rdd2.countByValue
    println(value)
    //filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据
    val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1)))
    val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//包括开始和结束的
    println(rdd3_1.collect.toList)
    //flatMapValues对参数进行扁平化操作,是value的值
    val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4")))
    println( rdd3_2.flatMapValues(_.split(" ")).collect.toList)
    //foreachPartition 循环的是分区数据
    // foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储
    val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
    rdd4.foreachPartition(x => println(x.reduce(_+_)))
    //keyBy 以传入的函数返回值作为key ,RDD中的元素为value 新的元组
    val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3)
    val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length)
    println(rdd5_1.collect.toList)
    //keys获取所有的key  values 获取所有的values
    println(rdd5_1.keys.collect.toList)
    println(rdd5_1.values.collect.toList)
    //collectAsMap  将需要的二元组转换成Map
    val map: collection.Map[String, Int] = rdd2.collectAsMap()
    println(map)

原文地址:https://www.cnblogs.com/gcghcxy/p/11040688.html

时间: 2024-08-02 18:02:37

大数据教程:Transformation和Action算子演示的相关文章

大数据教程Scala系列之方法的嵌套和方法多态

大数据教程为大家分享Scala系列之方法的嵌套和方法多态方法里嵌套定义其他方法示例1object EmbedDemo { def add3(x:Int,y:Int,z:Int)={def add2(x:Int,y:Int)={x+y}add2(add2(x,y),z)} def main(args: Array[String]): Unit = {println(add3(1,2,3)) //6}}示例2def factorial(x: Int): Int = {def fact(x: Int,

好程序员大数据教程分享Scala系列之Option_偏函数_String

好程序员大数据教程分享Scala系列之Option_偏函数_StringOption类型在Scala中Option类型样例类用来表示可能存在或也可能不存在的值(Option的子类有Some和None).Some包装了某个值,None表示没有值. object OptionDemo {def main(args: Array[String]) {val map = Map("a" -> 1, "b" -> 2)val v = map.get("b

好程序员大数据教程Scala系列之样例类_Option_偏函数

好程序员大数据教程Scala系列之样例类_Option_偏函数,在Scala中Option类型样例类用来表示可能存在或也可能不存在的值(Option的子类有Some和None).Some包装了某个值,None表示没有值. object?OptionDemo {??def?main(args: Array[String]) {????val?map = Map("a"?-> 1, "b"?-> 2)????val?v = map.get("b&q

好程序员大数据教程分享Scala系列之模式匹配和样例类

好程序员大数据教程分享Scala系列之模式匹配和样例类1.样例类在Scala中样例类是一中特殊的类,样例类是不可变的,可以通过值进行比较,可用于模式匹配.定义一个样例类:1.构造器中每一个参数都是val,除非显示地声明为var 2.伴生对象提供apply ,让你不使用new关键字就能构造出相应的对象case class Point(x: Int, y: Int)创建样例类对象:val point = Point(1, 2)val anotherPoint = Point(1, 2)val yet

好程序员分享大数据教程之线程高级部分

好程序员分享大数据教程之线程高级部分,首先讲一下线程的生命周期 对于一个线程, 在被创建后, 不是立即就进入到了运行状态, 也不是一直处于运行状态, 在线程的声明周期中, 一个线程会在多种状态之间进行切换 new : 新生状态, 线程被实例化, 但是还没有开始执行(start) runnable: 就绪状态, 已经执行过start, 线程已经启动了, 只是没有抢到CPU时间片 running: 运行状态, 抢到了CPU时间片 blocked: 阻塞状态, 线程执行的过程中, 遇到一些特殊情况,

Spark的transformation和action算子简介

transformation算子 map(func) 返回一个新的分布式数据集,由每个原元素经过func函数处理后的新元素组成 filter(func) 返回一个新的数据集,由经过func函数处理后返回值为true的原元素组成 flatMap(func) 类似于map,但是每一个输入元素,会被映射为0个或多个输出元素,(因此,func函数的返回值是一个seq,而不是单一元素) mapPartitions(func) 类似于map,对RDD的每个分区起作用,在类型为T的RDD上运行时,func的函

大数据教程

这是一个为了帮助更多人了解.入门.提升大数据相关领域知识的系列教程,此系列教程是针对已经掌握了一些编程概念的同学朋友而设计的,具有如下特点: ** 对数据分析挖掘有浓厚兴趣,但又无从下手 ** 基于Linux平台开发,从Linux基础讲起 ** 最好拥有一点Java基础,对计算机的操作有些认识. ** 大数据思维.框架学习 ** 举例丰富 大数据能做什么? 请自行百度谷歌. OK,假设你已经百度谷歌了很久,看了很多案例故事和新闻,那我们就开始吧~ 本次开发学习,基于Linux,下一节:Linux

黑马12期大数据教程(hadoop,storm,kafka,hbase,hive,sqoop)

课程目录:weekend110-第1天 01-hadoop职位需求状况 02-hadoop课程安排 03-hadoop应用场景 04-hadoop对海量数据处理的解决思路 05-hadoop版本选择和伪分布式安装 06-hadoop版本选择和伪分布式安装2 07-hdfs&mapreduce测试 08-hdfs的实现机制初始 09-hdfs的shell操作 10-hadoop集群搭建的无密登陆配置weekend110-第2天 01-NN元数据管理机制 02-NN工作机制2 03-DN工作原理 0

RDD案例(DT大数据梦工厂)

内容: 1.map.filter.flatmap等操作回顾: 2.reduceBykey.groupBykey: 3.jion.cogroug: 算子共同特点:都是最常用的算子,构建复杂算法的基石,都是lazy级别的,不属于action 创建SparkContext是Spark的起点,只有创建SparkContext,才能创建RDD ==========map============ 适用于任何元素且对其作用的集合中的每一个元素循环遍历,并调用其作为参数的函数对每一个遍历的元素进行具体化处理 =