spark中的scalaAPI之RDDAPI常用操作

package com.XXX
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
//spark中的RDD测试
object RddTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd api test")
    val sc = SparkContext.getOrCreate(conf)
//    mapTest(sc)
//    distinctTest(sc)
//    filterTest(sc)
//    keyByTest(sc)
//    sortByTest(sc)
//    topNTest(sc)
//    repartitionTest(sc)
//    groupByTest(sc)
    aggSumTest(sc)
    sc.stop()
  }

  def mapTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val mapResult = file.map(x =>{//map的特点是一个输入对应一条输出,没有返回值,对应的返回值会是() NIL
      val info = x.split("\\t")
      (info(0),info(1))//转换成了元组
    })
    //take是一个action,作用是取出前n条数据发送到driver,一般用于开发测试
    mapResult.take(10).foreach(println)

    //map和mapPartition的区别:map是一条记录一条记录的转换,mapPartition是
    //一个partition(分区)转换一次
    val mapPartitionResult = file.mapPartitions(x => {//一个分区对应一个分区
    var info = new Array[String](3)
     for(line <- x) yield{//yield:作用:有返回值,所有的记录返回之后是一个集合
        info = line.split("\\t")
        (info(0),info(1))
      }
    })
    mapPartitionResult.take(10).foreach(println)
    // 把一行转为多行记录,使用flatMap展平,把一条new_tweet记录转成两条login记录
    val flatMapTest = file.flatMap(x=>{
      val info = x.split("\\t")
      info(1) match {
        case "new_tweet"=> for (i <- 1 to 2) yield s"${info(0)} login ${info(2)}"
        case _ => Array(x)
      }
    })
    flatMapTest.take(10).foreach(println)
    println(file.count())
    println(flatMapTest.count())
  }
  //distinct:排重,把重复的数据去掉,不是数据的转换,属于数据的聚合
  def distinctTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val userRdd = file.map(x=>x.split("\\t")(0)).distinct()
    userRdd.foreach(println)
  }
  //filter:过滤
  def filterTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val loginFilter = file.filter(x=>x.split("\\t")(1)=="login")
    loginFilter.take(10).foreach(println)
    println(loginFilter.count())
  }

  //keyBy,输入作为value,key由算计计算而来
  def keyByTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val userActionType = file.keyBy(x=>{
      val info = x.split("\\t")
      s"${info(0)}--${info(1)}"
    })
    userActionType.take(10).foreach(println)
  }
  //sortBy排序
  def sortByTest(sc:SparkContext) = {
    val file = sc.textFile("file:///C:\\Users\\zuizui\\Desktop\\README.txt")
    //数据量小的话,想进行群排序,吧numPartitions设置成1
    //默认为圣墟,姜旭吧第二个参数设置为false
//    val sortBy = file.sortBy(x=>x.split("\\s+")(1).toInt,numPartitions = 1)//后面有不同数量的空格时,使用\\s+来split
    val sortBy = file.sortBy(x=>x.split("\\s+")(1).toInt,false,numPartitions = 1)//后面有不同数量的空格时,使用\\s+来split
    sortBy.foreach(println)
  }

  def topNTest(sc:SparkContext) = {
    val list = List(1,23,34,54,56,100)//把集合转化为RDD使用parallelize,或者mkRDD
    val rdd = sc.parallelize(list,2)
//添加饮食准换,使takeOrdered,和top的排序顺序变反
    implicit  val tonordered = new Ordering[Int]{
      override def compare(x: Int, y: Int): Int = y.compareTo(x)
    }
    val takeOrdered = rdd.takeOrdered(3)//从小到大取出前三条
    takeOrdered.foreach(println)
    val topN = rdd.top(3)//从大到小取出前三条
    topN.foreach(println)
  }
  //重新分区
  def repartitionTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt")
    val result  = file.repartition(5)//repartition是宽依赖,所谓宽依赖就是
    //原来RDD的每一个分区中的数据都会分别吧部分数据写入到新的RDD的每个分区中
    //窄依赖:就是原来RDD的分区中的一个分区数据完全写入到新的RDD中的一个分区中
    //窄依赖减少网络间的传输
    file.foreachPartition(x=>{
      var sum = 0
      x.foreach(x=>sum+=1)
      println(s"该分区的数据有${sum}")
    })

    result.foreachPartition(x=>{
      var sum = 0
      x.foreach(x=>sum+=1)
      println(s"该分区的数据有${sum}")
    })

    val coalesce = result.coalesce(3)//使用窄依赖,原来有五个分区,现在变成三个的话,
    //其中的一个不变,另外四个分区中的两两分别通过窄依赖添加到另外两个新的分区中
    coalesce.foreachPartition(x=>{
      var sum = 0
      x.foreach(x=>sum+=1)
      println(s"coalesce该分区的数据有${sum}")
    })
  }

  def groupByTest(sc:SparkContext)= {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt")
    val groupedBy = file.groupBy(x=>x.split("\\t")(0))
    //group by 容易发生数倾斜
    groupedBy.foreachPartition(x=>{
      println(s"groupByRDD分区,该分区共有:${x.size}条记录")
    })
    groupedBy.foreach(x=>{
      println(s"groupByRDD的一条记录,key为${x._1},value上集合记录条数是:${x._2.size}")
    })
    groupedBy.foreach(x => {
      var sum = 0
      x._2.foreach(line => {
        line.split("\\t")(1) match {
          case "login" => sum += 1
          case _ =>
        }
      })
      println(s"用户:${x._1}的登录次数是:$sum")
    })
  }

  def aggSumTest(sc:SparkContext) = {
    val list = List(1,2,4,5)
    val rdd = sc.parallelize(list,3)
      //reduce 计算sum
    val reduceResult = rdd.reduce((v1,v2)=>v1+v2)
    //fold计算sum
    val flodResult = rdd.fold(0)((v1,v2)=>v1+v2)
    //aggregate把元素连接成一个字符串
    val aggResult = rdd.aggregate("")((c,v)=>{
      c match {
        case "" => v.toString
        case _ => s"$c,$v"
      }
    },(c1,c2)=>{
      c1 match {
        case ""=> c2
        case _=>s"$c1,$c2"
      }
    })

    println(s"reduceResult:$reduceResult")
    println(s"flodResult:$flodResult")
    println(s"aggResult:$aggResult")
  }

  def persistTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt")
//    file.cache()
    file.persist(StorageLevel.MEMORY_ONLY)//相当于cache(),智加载在内存中
    //计算用户数量
    //计算ip数量
    //计算每个用户在每一个ip上的数量
  }
}
时间: 2024-10-27 06:47:50

spark中的scalaAPI之RDDAPI常用操作的相关文章

linux 中解压与压缩 常用操作详细讲解

平时有时候 会在服务器进行一些文件的操作,比如安装一些服务与软件等等,都有解压操作,一般在 导出一些简单的服务器文件,也是先压缩后再导出,因此,在这里根据平时用到解压与压缩命令的频率来记录下: 1.最常用的当属 tar 命令了,(常针对于 tar.gz 文件) 压缩 : tar -zcvf [被压缩后的文件名] [目录或者文件] eg: tar zcvf redis.tar.gz  redis-2.8.12 解压 :tar -zxvf  [压缩包的文件名] eg: tar zxvf redis-

Java中对String字符串的常用操作

这周遇到了一个需要处理String字符串的问题,用到了split将字符串解析为一个String的数组,还用到了某些替换字符的操作. 1 /* 2 **将String source按','间隔开,再分别对array的每个元素进行操作 3 **注意转义的换行符应该是'\\\\n' 4 */ 5 String[] array = source.split("\\,"); 6 7 /* 8 **将所有a替换为b或者删去所有换行符 9 */ 10 source = source.replacea

Js 中对 Json 数组的常用操作

我们首先定义一个json数组对象如下: var persons = [ {name: "tina", age: 14}, {name: "timo", age: 15}, {name: "lily", age: 16}, {name: "lucy", age: 16} ] 一. 根据对象属性值得到相应对象 //1. 获取 name 等于 lily 的对象 var lily = persons.filter((p) =>

五、mysql中sql语句分类及常用操作

1.sql语句分类: DQL语句 数据查询语言 select DML语句 数据操作语言 insert delete update DDL语句 数据定义语言 create drop alter TCL语句 事务控制语言 commit rollback 2.创建一个新的数据库,create database database_name; 3.导入数据库脚本,source url(该路径可直接拖动文件到dos命令窗口获得) 4.查看该数据中所有的表,show tables; 5.查看某张表的结构,de

Java中对Array数组的常用操作

目录: 声明数组: 初始化数组: 查看数组长度: 遍历数组: int数组转成string数组: 从array中创建arraylist: 数组中是否包含某一个值: 将数组转成set集合: 将数组转成list集合: Arrays.fill()填充数组: 数组排序: 复制数组: 比较两个数组: 去重复: 查询数组中的最大值和最小值: 备注:文内代码具有关联性. 1.声明数组: String [] arr; int arr1[]; String[] array=new String[5]; int sc

Java中对List集合的常用操作

目录: list中添加,获取,删除元素: list中是否包含某个元素: list中根据索引将元素数值改变(替换): list中查看(判断)元素的索引: 根据元素索引位置进行的判断: 利用list中索引位置重新生成一个新的list(截取集合): 对比两个list中的所有元素: 判断list是否为空: 返回Iterator集合对象: 将集合转换为字符串: 将集合转换为数组: 集合类型转换: 备注:内容中代码具有关联性. 1.list中添加,获取,删除元素: 添加方法是:.add(e): 获取方法是:

Spark中GraphX图运算pregel详解

由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释: package com.txq.spark.test import org.apache.spark.graphx.util.GraphGeneratorsimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext, SparkException, gra

spark 中的RDD编程 -以下基于Java api

1.RDD介绍:     RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化. Spark中的RDD就是一个不可变的分布式对象集合.每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上.RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象. 用户可以使用两种方法创建RDD:读取一个

Spark中的键值对操作-scala

1.PairRDD介绍     Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD. 2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairR