Spark 调优之RDD持久化级别及kryo序列化性能测试

我们上篇文章中讲了,RDD的持久化是spark优化中必须掌握的,并且,在内存不足的情况下,我们可以将持久化类型选择为MEMORY_ONLY_SER,减少内存的占用,持久化更多的partition,并且不同的序列化方法也会影响序列化性能。
下面,我们就来测试下,持久化级别和序列化方法的选择对RDD持久化大小的影响。
我选择了一个170.9MB的日志文件,传到了百度网盘
提取码:ffae
测试环境是windows,
IDEA参数配置

MEMORY_ONLY

代码为

case class CleanedLog(cdn:String,region:String,level:String,date:String,ip:String, domain:String, url:String, traffic:String)

  object KyroTest {
    def main(args: Array[String]) {

      val inputPath=new Path(args(0))
      val outputPath=new Path(args(1))

      val fsConf=new Configuration()
      val fs= FileSystem.get(fsConf)

      if (fs.exists(outputPath)) {
        fs.delete(outputPath,true)
        val path=args(1).toString

        println(s"已删除已存在的路径$path")
      }

      val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
      //conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //conf.set("spark.kryo.registrationRequired", "true")
      val sc = new SparkContext(conf)

      val logs = sc.textFile(args(0))
      //logs.filter(_.split("\t").length==8).take(10).foreach(println(_))
      val logsCache=logsCahe(logs)
      //序列化的方式将rdd存到内存
      saveAtLocal(logsCache,args(1))
      Thread.sleep(100000)
    }
    def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={
      logs.filter(_.split("\t").length==8).map(x=>{
        val fields=x.split("\t")

        CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))
      }).cache()
    }
    def saveAtLocal(logsCache: RDD[CleanedLog], outputPath: String) = {
      logsCache.map(x=>{

        x.cdn+"\t"+x.region+"\t"+x.level+"\t"+x.date+"\t"+x.ip+"\t"+x.domain+"\t"+x.url+"\t"+x.traffic

      }).repartition(1).saveAsTextFile(outputPath)
    }
  }

代码逻辑就是输入是什么内容,输就是什么内容,在中间我将输入的文本RDD进行了memory_only持久化,我们就看这个持久化内存占多少


显然,input size大小是170.9 MB,但是持久化之后是908.5 MB,显然占据内存空间增大了好几倍,如果在生产上,内存资源不足的情况下,这种方式显然缓存不了不少partition
时间耗费14s

MEMORY_ONLY_SER 未使用kryo序列化

def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={
      logs.filter(_.split("\t").length==8).map(x=>{
        val fields=x.split("\t")

        CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))
      }).persist(StorageLevel.MEMORY_ONLY_SER)

代码仅更改了persist(StorageLevel.MEMORY_ONLY_SER)


显然,input size大小是170.9 MB,但是持久化之后是有204.9MB,所以序列化对于节约内存空间是很有帮助的。

时间耗费11s

MEMORY_ONLY_SER 使用kryo序列化未注册

 val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

与上一代码相比,为SparkConf设置了开启kryo序列化,不是默认的java序列化了,但是没有进行具体的类注册!


显然,input size大小是170.9 MB,但是持久化之后是有230.8MB,使用未注册的kryo序列化竟然比使用java序列化还臃肿!原因是:每一个对象实例的序列化结果都会包含一份完整的类名,造成了大量的空间浪费!

时间是9s,比java序列化快了一些。

MEMORY_ONLY_SER 使用kryo序列化并注册

val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))

添加了String类和自定义样例类的kryo注册


显然,input size大小是170.9 MB,使用注册的kryo序列化之后,只有175.7MB,时间也才9秒,很舒服!

所以在目前为止,使用kryo序列化并注册是性能最好得了!!!

如果CPU还是那么悠闲的话,我们还有另外一个进一步优化点!

注册kryo序列化并开启RDD压缩

注意:RDD压缩只能存在于序列化的情况下

val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))
      conf.set("spark.rdd.compress","true")



持久化的大小仅有45.6MB!!!

spark.rdd.compress

这个参数决定了RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上。当然是为了进一步减小Cache数据的尺寸,对于Cache在磁盘上而言,绝对大小大概没有太大关系,主要是考虑Disk的IO带宽。而对于Cache在内存中,那主要就是考虑尺寸的影响,是否能够Cache更多的数据,是否能减小Cache数据对GC造成的压力等。

这两者,前者通常不会是主要问题,尤其是在RDD Cache本身的目的就是追求速度,减少重算步骤,用IO换CPU的情况下。而后者,GC问题当然是需要考量的,数据量小,占用空间少,GC的问题大概会减轻,但是是否真的需要走到RDD Cache压缩这一步,或许用其它方式来解决可能更加有效。

所以这个值默认是关闭的,但是如果在磁盘IO的确成为问题或者GC问题真的没有其它更好的解决办法的时候,可以考虑启用RDD压缩。

原文地址:https://blog.51cto.com/14309075/2397012

时间: 2024-11-10 11:40:23

Spark 调优之RDD持久化级别及kryo序列化性能测试的相关文章

大数据-spark理论(3)sparkSql,sparkStreaming,spark调优

导读目录 第一节:sparksql 1:简介 2:核心 3:与hive整合 4:dataFrame 5:函数 第二节:spark Streaming 1:对比strom 2:DStream的算子 3:代码 4:driver HA 5:读取数据 第三节:spark调优 第一节:sparksql (1)简介: Shark:shark是sparksql的前身,hive是shark的前身 快的原因:不仅是内存,还有谓词下移(减少一定量的数据IO) 正常 谓词下移 (先关联表在切割) (先将表中的字段过滤

【Spark学习】Apache Spark调优

Spark调优 本文系根据官方文档翻译而来,转载请注明本文链接 http://www.oschina.net/translate/spark-tuning?print 数据序列化 内存优化 确定内存用量 调整数据结构 序列化RDD存储 垃圾收集调整 其他考虑因素 并行化水平 Reduce任务的内存用量 Broadcasting large variables 总结 因为大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU.网络带宽或者是内存都有可能成为Spark程序的瓶颈.

spark调优之开发调优

(1)避免重复的RDD 案例: val rdd1 = sc.textFile("hdfs://zzy/hello.txt") rdd1.map(...) val rdd2 = sc.textFile("hdfs://zzy/hello.txt") rdd2.reduce(...) 这里条用了两次textFile,并且读取的是同一个文件,造成了多次的磁盘读取,如果是hi同一个文件,读取一次即可. (2)尽可能多的复用一个RDD 错误演示: //由于业务需要,对rdd1

spark调优经验(待续)

spark调优是须要依据业务须要调整的,并非说某个设置是一成不变的,就比方机器学习一样,是在不断的调试中找出当前业务下更优的调优配置.以下零碎的总结了一些我的调优笔记. spark 存储的时候存在严重的分配不均的现象,有几台机器在过渡使用, 有几台机器却非常少被使用.有几台机器缓存了几十个上百个RDD blocks  有的机器一个RDD blocks 都没有.这样存储有RDD blocks 的能够进行运算.运算的tasks 最多为该机器core数. spark.storage.memoryFra

【Spark调优】小表join大表数据倾斜解决方案

[使用场景] 对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(例如几百MB或者1~2GB),比较适用此方案.. [解决方案] 小表join大表转为小表broadcast+map大表实现.具体为: 普通的join是会shuffle的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join,此时如果发生数据倾斜,影响处理性能,而此时恰好

【Spark调优】大表join大表,少数key导致数据倾斜解决方案

[使用场景] 两个RDD进行join的时候,如果数据量都比较大,那么此时可以sample看下两个RDD中的key分布情况.如果出现数据倾斜,是因为其中某一个RDD中的少数几个key的数据量过大,而另一个RDD中的所有key都分布比较均匀,此时可以考虑采用本解决方案. [解决方案] 对有数据倾斜那个RDD,使用sample算子采样出一份样本,统计下每个key的数量,看看导致数据倾斜数据量最大的是哪几个key. 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个ke

【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优

一.前述 Spark中调优大致分为以下几种 ,代码调优,数据本地化,内存调优,SparkShuffle调优,调节Executor的堆外内存. 二.具体    1.代码调优 1.避免创建重复的RDD,尽量使用同一个RDD 2.对多次使用的RDD进行持久化 如何选择一种最合适的持久化策略? 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据.因为不进行序列化与反序列化操作,就避免了这部分的性能开销:对这个RDD的后续算子操作,

Spark调优秘诀

1.诊断内存的消耗 在Spark应用程序中,内存都消耗在哪了? 1.每个Java对象都有一个包含该对象元数据的对象头,其大小是16个Byte.由于在写代码时候,可能会出现这种情况:对象头比对象本身占有的字节数更多,比如对象只有一个int的域.一般这样设计是不合理的,造成对象的"浪费",在实际开发中应避免这种情况. 2.Java的String对象,会比它内部的原始数据要多出40个字节.因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息.而且String使

Spark学习笔记6:Spark调优与调试

1.使用Sparkconf配置Spark 对Spark进行性能调优,通常就是修改Spark应用的运行时配置选项. Spark中最主要的配置机制通过SparkConf类对Spark进行配置,当创建出一个SparkContext时,就需要创建出一个SparkConf实例. Sparkconf实例包含用户要重载的配置选项的键值对.调用set()方法来添加配置项的设置,然后把这个对象传给SparkContext的构造方法. 调用setAppName()和setMaster()来分别设置spark.app