大数据-spark理论(2)算子,shuffle优化

导读目录

第一节:代码层面

  1:RDD创建

  2:算子

  3:数据持久化算子

  4:广播变量

  5:累加器

  6:开发流程

第二节:Shuffle优化层面

  1:Shuffle

  2:调优

第一节:代码层面

  (1)RDD创建:

    Java:

      sc.textfile

      sc.parallelize()

      sc.parallelizePairs(得到KV格式的RDD)

    Scala:

      sc.textfile

      sc.parallelize //如果不指定分区数,用的是系统的默认分区数

      makeRDD //如果不指定分区数,会为每个集合对象创建最佳分区

    窄依赖:

      一个Partition中的数据,或多个Partition中的数据放在一个Partition中。

    宽依赖:

      一个Partition中的数据分发到多个Partition中。

  (2)算子:Transformation(转换算子)与 Action(执行算子)

    32个算子:https://blog.csdn.net/Fortuna_i/article/details/81170565

  (3)数据持久化算子:

    3.1 意义:

      对于迭代计算,交互运算之类的,应用可以不用再运行之前的RDD,这样效率会大大提升。

      即:源数据---经过操作--->action A

        源数据---经过操作--->action B

        如果在经过操作之后把数据持久化了,那么后面的action B就不用从源数据再走一遍了。

    3.2 概念:

        控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partitioncache和persist都是懒执行的

      必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系

    3.3 使用:

      3.3.1 cache

        直接将数据保存到内存,相当于无参数的persist()

          cache()=persist()=persist(StorageLevel.memory_only)

        例子:

          JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");

          lines = lines.cache(); //使用cache()算子。

          long count = lines.count(); //count是action算子,到这里才能触发cache执行,所以这一次coun加载是从磁盘读数据, 然后拉回到drive端。

          long countrResult = lines.count(); //这一次是从内存中读数据。

      3.3.2 Persist

        可以设置数据保存的级别

          1.MEMORY_ONLY

              使用未序列化的Java对象格式,将数据保存在内存中如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,

            那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。

          2.MEMORY_AND_DISK

              使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,

            持久化在磁盘文件中的数据会被读取出来使用。

          3.MEMORY_ONLY_SER

              基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,

            从而可以避免持久化的数据占用过多内存导致频繁GC。

          4.MEMORY_AND_DISK_SER

              基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,

            从而可以避免持久化的数据占用过多内存导致频繁GC。

          5.DISK_ONLY

              使用未序列化的Java对象格式,将数据全部写入磁盘文件中。

          6.MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等

        总结:对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制

           主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。

           如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

            1、MEMORY_AND_DISK 意思是先往内存中放数据,内存不够再放磁盘

            2、最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。

            3、选择的原则是:首先考虑内存,然后考虑序列化之后再放入内存,最后考虑内存加磁盘。

            4、尽量避免使用“_2”和DISK_ONLY级别。

            5、deserialized是不序列化的意思。

      3.3.3 Checkpoint:sc.setCheckpointPath(“持久化的路径”)

        应用:

          将数据保存到制定的目录中,使用于非常长的RDD迭代的情况

        原理:

          1.Spark job执行完之后,spark会从finalRDD从后往前回溯。

          2.当回溯到对某个RDD进行了checkpoint,会对这个RDD标记。

          3.回溯完成之后,Spark会重新计算标记RDD的结果,然后将结果保存到Checkpint目录中。

        优化:

          1.因为最后是要触发当前application的action算子,所以在触发之前加一层cache操作,一样会往前执行cache操作,实现对数据的cache ,

        所以考虑将cache优化到checkpoin的优化流程里。

          2.对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job(回溯完成之后重新开的job)只需要将内存中的数据

        (cache缓存好的checkpoint那个点的数据)拷贝到HDFS上就可以。

          3.省去了重新计算这一步,不需要重头开始来走到checkpoint这个点了。

      以上持久化算子的不同:

        Cache

          内存中,如果数据丢失,可以依靠前面的RDD血统关系恢复;

          属于懒执行,需要action算子才能触发;

          cache算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。单位是partition。

          cache算子后面不能直接跟执行算子

          rdd.cache().count()(因为cache算子返回的是一个RDD,直接跟执行算子返回值就变了,Persist同是)。

        Persist

          有级别,可以持久化磁盘,但是实际是持久化到了blockManager中,如果执行结束,数据将会消失;

          属于懒执行,需要action算子才能触发;

          Persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。单位是partition。

          Persist算子后面不能直接跟执行算子。

        Checkpoint

          磁盘中,永久保存在hdfs文件系统中,只有手动删除;

          懒执行;

          Checkpoin不仅存储结果,还会存储逻辑,还可以存储元数据;

          切断了RDD之间的依赖关系;

  (4)广播变量

    4.1 定义:

      一个变量,大家要用,那么就要每个节点都要有一份,这样不好,所以广播一下,这样就只会有一份,大家共用一份。

    4.2 语法:

      broadcast:广播变量

      broadcast.val:获取广播变量的值

    4.3 例子:

val list = List("hello java")
val broadcast = sc.broadcast(list) //广播
broadcast.value //调用

    4.4 注意:

      1:只有变量值才会被广播,RDD不会

      2:广播的变量在Driver生成,只有修改了driver的变量才会生效,如果executor端修改了,不会产生影响。

  (5)累加器

      5.1 定义:因为executor值的变化,不会对driver产生影响,所以有了累加器,可以统计所有executor上值的和

      5.2 语法:

        accumulator:定义

        accumulator.val:获取值

      5.3 例子:

val accumulator = sc.accumulator(0); //创建accumulator并初始化为0
accumulator.add(1) //有一条数据就增加1
accumulator.value //获取值

  (6)开发流程

    6.1 Jar包引入:spark-assembly-1.6.0-hadoop2.6.0

    6.2 创建:main:

SparkConf() //配置
  .setAppName(“wc”) //api名称
  .setMaster(“local”) //运行模式:local本地(用于测试),standalone自身的分布式,yarn,mesos不用
SparkContext() //上下文

SQLContext() //启动sparksql
Sqlc.read().format(“json”).load(“path”) //读取json格式文件,生成dataframe

HiveContext() //读取hive中的数据

Sc.stop() //关闭上下文对象

第二节:优化层面

  1、Shuffle

    源码阅读:https://blog.csdn.net/weixin_41705780/article/details/79399192

    2.0可以用sparkRDMA,优化shuffle

    (1)shuffle寻址 https://blog.csdn.net/LHWorldBlog/article/details/80822717

    (2)内存管理

      静态内存管理(将被淘汰)

      统一资源管理

  

    (3)方式

      HashShuffle //已经淘汰,内存比例固定,容易导致OOM,也会导致内存浪费

      SortShuffle

        普通模式:

        Bypass模式:partition个数小于200时候触发,触发这个是不需要进行聚合操作

  2、调优

      1:sparkconf.set("spark.shuffle.file.buffer","64K") --不建议使用,因为这么写相当于硬编码 --最高

      2:在conf/spark-defaults.conf ---不建议使用,相当于硬编码 --第三

      3:./spark-submit --conf spark.shuffle.file.buffer=64 --conf spark.reducer.maxSizeInFlight=96 --建议使用 --第二

      spark.shuffle.file.buffer

      默认值:32k

      参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

      调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,

           进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

      spark.reducer.maxSizeInFlight

      默认值:48m

      参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。

      调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

           在实践中发现,合理调节该参数,性能会有1%~5%的提升。

      spark.shuffle.io.maxRetries

      默认值:3

      参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。

           如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

      调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。

           在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

      shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage

      spark.shuffle.io.retryWait

      默认值:5s

      参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

      调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

      spark.shuffle.memoryFraction 静态 |统一 spark.memory.storageFraction 0.5

      默认值:0.2

      参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。

      调优建议:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。

           在实践中发现,合理调节该参数可以将性能提升10%左右。

      spark.shuffle.manager

      默认值:sort|hash

      参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,

           但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

      调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑

           不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。

           这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

      spark.shuffle.sort.bypassMergeThreshold----针对SortShuffle

      默认值:200

      参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,

           而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

      调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,

           map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

      spark.shuffle.consolidateFiles----针对HashShuffle

      默认值:false

      参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,

           对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

      调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,

           使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

原文地址:https://www.cnblogs.com/dblog/p/12172215.html

时间: 2024-08-07 06:46:37

大数据-spark理论(2)算子,shuffle优化的相关文章

大数据-spark理论(3)sparkSql,sparkStreaming,spark调优

导读目录 第一节:sparksql 1:简介 2:核心 3:与hive整合 4:dataFrame 5:函数 第二节:spark Streaming 1:对比strom 2:DStream的算子 3:代码 4:driver HA 5:读取数据 第三节:spark调优 第一节:sparksql (1)简介: Shark:shark是sparksql的前身,hive是shark的前身 快的原因:不仅是内存,还有谓词下移(减少一定量的数据IO) 正常 谓词下移 (先关联表在切割) (先将表中的字段过滤

王家林 大数据Spark超经典视频链接全集[转]

压缩过的大数据Spark蘑菇云行动前置课程视频百度云分享链接 链接:http://pan.baidu.com/s/1cFqjQu SCALA专辑 Scala深入浅出经典视频 链接:http://pan.baidu.com/s/1i4Gh3Xb 密码:25jc DT大数据梦工厂大数据spark蘑菇云Scala语言全集(持续更新中) http://www.tudou.com/plcover/rd3LTMjBpZA/ 1 Spark视频王家林第1课:大数据时代的“黄金”语言Scala 2 Spark视

(转)大数据量高并发的数据库优化与sql优化

大数据量高并发的数据库优化 一.数据库结构的设计 如果不能设计一个合理的数据库模型,不仅会增加客户端和服务器段程序的编程和维护的难度,而且将会影响系统实际运行的性能.所以,在一个系统开始实施之前,完备的数据库模型的设计是必须的. 在一个系统分析.设计阶段,因为数据量较小,负荷较低.我们往往只注意到功能的实现,而很难注意到性能的薄弱之处,等到系统投入实际运行一段时间后,才发现系统的性能在降低,这时再来考虑提高系统性能则要花费更多的人力物力,而整个系统也不可避免的形成了一个打补丁工程. 所以在考虑整

以慕课网日志分析为例 进入大数据 Spark SQL 的世界

详情请交流  QQ  709639943 01.以慕课网日志分析为例 进入大数据 Spark SQL 的世界 02.漫谈spring cloud分布式服务架构 03.Spring Cloud微服务实战视频课程 04.漫谈spring cloud 与 spring boot 基础架构 05.Java秒杀系统方案优化 高性能高并发实战 06.Java深入微服务原理改造房产销售平台 07.快速上手Linux 玩转典型应用 08.快速上手Ionic3 多平台开发企业级问答社区 09.Java Sprin

[转]浅析大数据量高并发的数据库优化

链接:http://www.uml.org.cn/sjjm/201308264.asp 高并发数据库可以同时处理海量信息,应用范围很广.今天我们将讨论的是大数据量高并发的数据库优化,希望对大家有所帮助. 一.数据库结构的设计 如果不能设计一个合理的数据库模型,不仅会增加客户端和服务器段程序的编程和维护的难度,而且将会影响系统实际运行的性能.所以,在一个系统开始实施之前,完备的数据库模型的设计是必须的. 在一个系统分析.设计阶段,因为数据量较小,负荷较低.我们往往只注意到功能的实现,而很难注意到性

2016年大数据Spark“蘑菇云”行动之flume整合spark streaming

近期,听了王家林老师的2016年大数据Spark"蘑菇云"行动,需要将flume,kafka和Spark streaming进行整合. 感觉一时难以上手,还是先从简单着手吧:我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111).Spark streaming的处理是直接输出有几个events. 一.配置文件 Flume 配

DevExpress ChartControl大数据加载时有哪些性能优化方法

DevExpress ChartControl加载大数据量数据时的性能优化方法有哪些? 关于图表优化,可从以下几个方面解决: 1.关闭不需要的可视化的元素(如LineMarkers, Labels等): Series.View.LineMarkerOptions.Visible =false. 2. 关闭图表的滚动与缩放功能,手动调整范围,这样将大大减少所需计算的个数. 3. 将 ChartControl.RefreshDataOnRepaint属性设为false 4. 将 ChartContr

如何成为云计算大数据Spark高手

Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手.Spark采用一个统一的技术堆栈解决了云计算大数据的如流处理.图技术.机器学习.NoSQL查询等方面的所有核心问题,具有完善的生态系统,这直接奠定了其一统云计算大数据领域的霸主地位. 伴随Spark技术的普及推广,对专业人才的需求日益增加.Spark专业人才在未来也是炙手可热,轻而易举可以拿到百万的

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs