Spark 学习(四)RDD自定义分区和缓存

一,简介

二,自定义分区规则

  2.1 普通的分组TopN实现

  2.2 自定义分区规则TopN实现

三,RDD的缓存

  3.1 RDD缓存简介

  3.2 RDD缓存方式

正文

一,简介

  在之前的文章中,我们知道RDD的有一个特征:就是一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。这个分配的规则我们是可以自己定制的。同时我们一直在讨论Spark快,快的方式有那些方面可以体现,RDD缓存就是其中的一个形式,这里将对这两者进行介绍。

二,自定义分区规则

  分组求TopN的方式有多种,这里进行简单的几种。这里尊卑一些数据:点击下载

  2.1 普通的分组TopN实现

  实现思路一:先对数据进行处理,然后聚合。最后进行分组排序。

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNone {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/", 2)
        // 对每一行数据进行整理
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        // 聚合,将学科和老师联合当做key
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        //分组排序(按学科进行分组)
        //[学科,该学科对应的老师的数据]
        val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
        // 这里取出的是每一组的数据
        // 为什么可以调用scala的sortby方法呢?因为一个学科的数据已经在一台机器上的一个scala集合里面了
        // 弊端,调用scala的sortBy当数据量过大时,有内存溢出的缺陷
        val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(4))
        println(result.collect.toBuffer)
    }
}

  实现思路二:先对数据进行处理,然后聚合,然后对数据进行单学科过滤,最后进行排序,提交。

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        // 获取所有学科
        val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
        // 对所有的reduce后的数据进行单学科过滤,在进行排序
        for(sb <- subjects){
            val filter: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb)
            // 这里进行了多次提交
            val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)
            print(result.toBuffer)
        }
        sc.stop()
    }
}

  2.2 自定义分区规则TopN实现

  实现方式一:先对数据进行处理,然后聚合,而后对按照学科进行分区,然后对每一个分区进行排序。

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNthree {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        //聚合,将学科和老师联合当做key ---> 这里有一次shuffle
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        //计算有多少学科
        val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
        //partitionBy按照指定的分区规则进行分区
        //调用partitionBy时RDD的Key是(String, String) --->这里也有一次shuffle
        val partioned: RDD[((String, String), Int)] = reduced.partitionBy(new SubPartitioner(subjects))
        //如果一次拿出一个分区(可以操作一个分区中的数据了)
        val sorted: RDD[((String, String), Int)] = partioned.mapPartitions(it => {
            //将迭代器转换成list,然后排序,在转换成迭代器返回
            it.toList.sortBy(_._2).reverse.take(3).iterator
        })
        val result: Array[((String, String), Int)] = sorted.collect()
        print(result.toBuffer)
    }
}

// 自定义分区规则,需要继承Partitioner
class SubPartitioner(subs: Array[String]) extends Partitioner{
    //相当于主构造器(new的时候回执行一次)
    //用于存放规则的一个map
    private val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for(sub <- subs){
        rules.put(sub, i)
        i += 1
    }
    //返回分区的数量(下一个RDD有多少分区)
    override def numPartitions: Int = subs.length
    //根据传入的key计算分区标号
    //key是一个元组(String, String)
    override def getPartition(key: Any): Int = {
        //获取学科名称
        val s: String = key.asInstanceOf[(String, String)]._1
        //根据规则计算分区编号
        rules(s)
    }
}

  实现方式二:上面的过程可以将聚合和分区操作进行合并,减少shuffle的次数

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNfour {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val subjects: Array[String] = sbToTeacherOne.map(_._1._1).distinct().collect()
        // 在这里传入分区规则,即聚合时就分区
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(new SubPartinerTwo(subjects), _+_)
        // 对每个分区进行排序
        val result: RDD[((String, String), Int)] = reduced.mapPartitions(it => {
            it.toList.sortBy(_._2).reverse.take(3).iterator
        })
        print(result.collect().toBuffer)
    }
}

class SubPartinerTwo(subs: Array[String]) extends Partitioner{
    private val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for(sub <- subs){
        rules.put(sub, i)
        i += 1
    }
    override def numPartitions: Int = subs.length
    override def getPartition(key: Any): Int = {
        val subject: String = key.asInstanceOf[(String, String)]._1
        rules(subject)
    }
}

三,RDD的缓存

  3.1 RDD缓存简介

  Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

  3.2 RDD缓存方式

  RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

  

  通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

  

  缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

  实例:

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        // 这里讲reduced的数据集到缓存中
        val cached: RDD[((String, String), Int)] = cached.cache()
        // 获取所有学科
        val subjects: Array[String] = cached.map(_._1._1).distinct().collect()
        // 对所有的reduce后的数据进行单学科过滤,在进行排序
        for(sb <- subjects){
            // 因为这里的多次提交和过滤,所以添加到缓存就有必要了
            val filter: RDD[((String, String), Int)] = cached.filter(_._1._1 == sb)
            // 这里进行了多次提交
            val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)
            print(result.toBuffer)
        }
        sc.stop()
    }
}

原文地址:https://www.cnblogs.com/tashanzhishi/p/10989078.html

时间: 2024-10-05 16:11:50

Spark 学习(四)RDD自定义分区和缓存的相关文章

Spark学习四:网站日志分析案例

Spark学习四:网站日志分析案例 标签(空格分隔): Spark Spark学习四网站日志分析案例 一创建maven工程 二创建模板 三日志分析案例 一,创建maven工程 1,执行maven命令创建工程 mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scal

Spark学习之RDD的理解

转自:http://www.infoq.com/cn/articles/spark-core-rdd/ 感谢张逸老师的无私分享 RDD,全称为Resilient Distributed Datasets,是一个容错的.并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区.同时,RDD还提供了一组丰富的操作来操作这些数据.在这些操作中,诸如map.flatMap.filter等转换操作实现了monad模式,很好地契合了Scala的集合操作.除此之外,RDD还提供了诸如joi

大数据入门第二十二天——spark(三)自定义分区、排序与查找

一.自定义分区 1.概述 默认的是Hash的分区策略,这点和Hadoop是类似的,具体的分区介绍,参见:https://blog.csdn.net/high2011/article/details/68491115 2.实现 package cn.itcast.spark.day3 import java.net.URL import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext} import s

spark学习四

5.sc.textFiles() 与 sc.wholeTextFiles() 的区别 sc.textFile()是将path 里的所有文件内容读出,以文件中的每一行作为一条记录的方式,文件的每一行 相当于 列表 的一个元素,因此可以在每个partition中用for i in data的形式遍历处理数据. sc.wholeTextFiles()返回的是[(key, val), (key, val)...]的形式,其中key是文件路径,val是文件内容,每个文件作为一个记录!这说明这里的 val

spark 学习(二) RDD及共享变量

声明:本文基于spark的programming guide,并融合自己的相关理解整理而成 Spark应用程序总是包含着一个driver program(驱动程序),它执行着用户的main方法,并且执行大量的并行操作(parallel operations)在集群上. 概述 Spark最主要的抽象就是RDD(resilient distributed dataset) 弹性分布式数据集,RDD  就是分割元素的集合,他被分发在集群的各个节点上,并且能够进行并行操作. RDD的创建有三种方式: H

spark自定义分区及示例代码

有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写入到part-00001 . . . 19写入到part-00009 给读者提供一个自定义分区的思路 import org.apache.spark.{Partitioner, SparkContext, SparkConf} //自定义分区类,需继承Partitioner类 class Usrid

Spark学习之路 (十七)Spark分区

讨论QQ:1586558083 目录 一.分区的概念 二.为什么要进行分区 三.Spark分区原则及方法 3.1 本地模式 3.2 YARN模式 四.分区器 正文 回到顶部 一.分区的概念 分区是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每一个分片称为分区,分区的格式决定了并行计算的粒度,而每个分区的数值计算都是在一个任务中进行的,因此任务的个数,也是由RDD(准确来说是作业最后一个RDD)的分区数决定. 回到顶部 二.为什么要进行分区 数据分区,在分布式集群里,

MVC系列——MVC源码学习:打造自己的MVC框架(四:自定义视图)

前言:通过之前的三篇介绍,我们基本上完成了从请求发出到路由匹配.再到控制器的激活,再到Action的执行这些个过程.今天还是趁热打铁,将我们的View也来完善下,也让整个系列相对完整,博主不希望烂尾.对于这个系列,通过学习源码,博主也学到了很多东西,在此还是把博主知道的先发出来,供大家参考. 本文原创地址:http://www.cnblogs.com/landeanfen/p/6019719.html MVC源码学习系列文章目录: MVC系列——MVC源码学习:打造自己的MVC框架(一) MVC

Spark自定义分区(Partitioner)

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景.但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略.为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法: 01 package org.apache.spark 02 03 /** 04 * An object that defines how the element