再识spark

一.示例

1.统计PV和UV

1.1统计PV

 val conf = new SparkConf()    conf.setMaster("local").setAppName("pvuv")    val sc = new SparkContext(conf)    val lineRDD = sc.textFile("./pvuv.txt")?    lineRDD.map(x=>{      val sp=x.split("\\s")      (sp(5),1)    }).reduceByKey(_+_).foreach(println)

1.2统计UV

lineRDD.map(x=>{     val sp=x.split("\\s")     (sp(5),sp(0))   }).distinct().countByKey().foreach(println)

2.二次排序

?SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("SecondarySortTest");final JavaSparkContext sc = new JavaSparkContext(sparkConf);?JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");?JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {?    /**     *      */    private static final long serialVersionUID = 1L;?    @Override    public Tuple2<SecondSortKey, String> call(String line) throws Exception {           String[] splited = line.split(" ");           int first = Integer.valueOf(splited[0]);           int second = Integer.valueOf(splited[1]);           SecondSortKey secondSortKey = new SecondSortKey(first,second);           return new Tuple2<SecondSortKey, String>(secondSortKey,line);    }});?pairSecondRDD.sortByKey(false).foreach(new                 VoidFunction<Tuple2<SecondSortKey,String>>() {

    /**     *      */    private static final long serialVersionUID = 1L;?    @Override    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {             System.out.println(tuple._2);    }});?public class SecondSortKey  implements Serializable,Comparable<SecondSortKey>{    /**     *      */    private static final long serialVersionUID = 1L;    private int first;    private int second;    public int getFirst() {        return first;    }    public void setFirst(int first) {        this.first = first;    }    public int getSecond() {        return second;    }    public void setSecond(int second) {        this.second = second;    }    public SecondSortKey(int first, int second) {        super();        this.first = first;        this.second = second;    }    @Override    public int compareTo(SecondSortKey o1) {        if(getFirst() - o1.getFirst() ==0 ){            return getSecond() - o1.getSecond();        }else{            return getFirst() - o1.getFirst();        }    }}?

3.分组取topN

SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOps");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> linesRDD = sc.textFile("scores.txt");?JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {?/** *  */private static final long serialVersionUID = 1L;?@Overridepublic Tuple2<String, Integer> call(String str) throws Exception {    String[] splited = str.split("\t");    String clazzName = splited[0];    Integer score = Integer.valueOf(splited[1]);    return new Tuple2<String, Integer> (clazzName,score);        }});?pairRDD.groupByKey().foreach(new             VoidFunction<Tuple2<String,Iterable<Integer>>>() {?    /**     *      */    private static final long serialVersionUID = 1L;?    @Override    public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {    String clazzName = tuple._1;    Iterator<Integer> iterator = tuple._2.iterator();

    Integer[] top3 = new Integer[3];

    while (iterator.hasNext()) {         Integer score = iterator.next();?           for (int i = 0; i < top3.length; i++) {         if(top3[i] == null){                top3[i] = score;                break;          }else if(score > top3[i]){                 for (int j = 2; j > i; j--) {                top3[j] = top3[j-1];                 }                top3[i] = score;                break;         }       } } System.out.println("class Name:"+clazzName); for(Integer sscore : top3){      System.out.println(sscore);  }}});

一.广播变量和累加器

1.广播变量

  • 广播变量理解图

  • 源码
    • val conf = new SparkConf()conf.setMaster("local").setAppName("brocast")val sc = new SparkContext(conf)val list = List("hello xasxt")val broadCast = sc.broadcast(list)val lineRDD = sc.textFile("./words.txt")lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}sc.stop()
  • 注意
    • 广播变量只能在driver端定义,不能Executor端定义
    • 只能在driver端修改变量的值

2.累加器

  • 累加器理解图

  • 源码
    • val conf = new SparkConf()conf.setMaster("local").setAppName("accumulator")val sc = new SparkContext(conf)val accumulator = sc.longAccumulatorsc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}println(accumulator.value)sc.stop()

二.调度源码分析

三.SparkShuffle

1.shuffle概念

  • reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>,每一个可以对应一个聚合起来的value
  • 由于聚合之key对应的value有可能在不同的partition上,name该如何聚合??
    • shuffle write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。
    • shuffle read:reduce task就会从上一个stage的所有task所在的机器上寻找属于己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合
  • 常见的shuffle有两种类型:HaskShuffle和SortShuffle

2.HashShuffle

1.普通机制

示意图

执行流程

每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K,buffer起到数据缓存的作用

每一个buffer文件最后对应一个磁盘小文件

reduce task来拉取对应的磁盘小文件

总结

map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去,ReduceTask会 去map端拉取响应的磁盘小文件

产生磁盘小文件的个数

M(map task的个数)*R(reduce task的个数)

存在的问题

在shuffle write过程中会产生很多写磁盘小文件的对象

在shuffle read过程中会产生很多读取磁盘小文件的对象

在JVM堆内存中对象过多就会造成频繁的GC,若GC还无法解决运行所需要的内存的话,就会产生OOM问题

在数据传输中会有频繁的网络通信,出现通信故障的可能性大大增加,通信故障a) 导致的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。

2.合并机制

示意图

总结

产生磁盘小文件的个数:C(core的个数)*R(reduce的个数)

3.SortShuffle

1.普通机制

示意图

执行流程

map task的计算结果会写入到一个内存数据结构里面,内存数据结构默认5M

在shuffle的时候还有一个定时器,不定期的取估算这个内存结构的大小,当内存结构中的数据超过5M,会申请更多的资源给内存数据结构

如果申请成功不会进行溢写,如果申请不成功,就会发生溢写磁盘

在溢写之前内存结构中的数据会进行排序分区

开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是一万条数据

map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件

reduce task去map端拉取数据时,首先解析索引文件,根据索引文件再去拉取对应的数据

产生磁盘小文件的个数: 2*M(map task的个数)

2.bypass机制

示意图

总结

bypass运行机制的触发条件: shuffle reduce task 的数量小于spark.shuffle.sort.bypassMergeThreshold的参数值。默认值是200

不需要进行map端的预聚合

产生的磁盘小文件为: 2*M(map task的个数)

4.shuffle文件寻址

1.主要对象

mapoutputtracker spark架构中的一个模块,是一个主从架构,管理磁盘小文件的地址

mapoutputtrackermaster是主队象,存在于driver中

mapoutputtrackerworker是从队象,存在于executor中

blockmanagerpark架构中的一个模块,是一个主从架构,块管理者

BlockManagerMaster,主对象,存在于Driver中。会在及群众有用到广播变量或缓存数据或删除缓存数据的时候,通知BlockManagerSlave传输或者删除数据。

BlockManagerworker,从对象,存在于executor中。BlockManagerworker与BlockManagerworker之间通信无论在Driver端的BlockManager还是在Excutor端的BlockManager都含有四个对象:

diskstore:负责磁盘的管理

memorystore:负责内存的管理

connectionmanager:负责连接其他的blockmanagerworker

blocktransferservice:负责数据的传输

2.shuffle寻址流程

寻址图

寻址流程

当map task执行完成后,会将task的执行情况和磁盘小文件的地址封装到MpStatus对象中通过mapoutputtrackerworker对象向mapoutputtrackermaster汇报

在所有的map task执行完毕后,driver中就掌握所有的磁盘小文件的地址

在reduce task执行之前,会通过executor中mapoutputtrackerworker向driver端的mapoutputtrackermaster获取磁盘小文件的地址

获取到磁盘小文件的地址后,会通过blockmanager中的connectionmanager连接数据所在节点上的connectionmanager,然后通过blocktransferservice进行数据的传输

blocktransferservice默认启动5个task去节点拉取数据.默认情况下,5个task拉取数据量不能超过48M

5.内存管理

Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责创建SparkContext上下文,提交任务,task的分发等。Executor负责task的计算任务,并将结果返回给Driver。同时需要为需要持久化的RDD提供储存。Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。

使用静态内存可以通过参数spark.memory.useLegacyMode 设置为true(默认为false)使用静态内存管理。

1.静态内存管理

中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。

整个内存被分为三部分:

task计算占20%

shuffle聚合内存占20%,其中的10%预留,防止OOM

剩下60%中的10%预留,防止OOM问题,其中这60%中的90%中的80%用于存储RDD的缓存数据和广播变量,剩下的90%中的20%用于解压序列化数据

2.统一内存管理

与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。

主要分四部分:

预留总内存的重的300M,用于JVM自身运行

总内存-300M的25%用于task计算(spark2.0以后是40%)

总内存-300M的75%中的50%用于shuffle聚合,剩下的用于存储RDD缓存数据和广播变量.这里的两部分可以相互动态借用

reduce 中OOM如何处理?

减少每次拉取的数据量

提高shuffle聚合的内存比例

提高Excutor的总内存

6.shuffle调优

SparkShuffle调优配置项如何使用?

在代码中,不推荐使用,硬编码。

new SparkConf().set(“spark.shuffle.file.buffer”,”64”)

在提交spark任务的时候,推荐使用。

spark-submit --conf spark.shuffle.file.buffer=64 –conf ….

在conf下的spark-default.conf配置文件中,不推荐,因为是写死后所有应用程序都要用。

部分调优参数

spark.shuffle.file.buffer默认值:32k参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。?spark.reducer.maxSizeInFlight默认值:48m参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。?spark.shuffle.io.maxRetries默认值:3参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage?spark.shuffle.io.retryWait默认值:5s参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。?spark.shuffle.memoryFraction默认值:0.2参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。?spark.shuffle.manager默认值:sort|hash参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。?spark.shuffle.sort.bypassMergeThreshold默认值:200参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。?spark.shuffle.consolidateFiles默认值:false参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

原文地址:https://www.cnblogs.com/ruanjianwei/p/12119356.html

时间: 2024-10-12 21:41:28

再识spark的相关文章

再识数据库

      好长时间没有总结数据库了,温习一下 数据库到底是用来干嘛的?带着这个问题来进行我们的探讨 一.简介 数据库是从文件管理系统发展而来的,是对数据的管理的高级阶段.我们都知道它是用来存储数据的,可 以说是电子的文件柜,能对数据进行增删改查操作,但是它不仅仅是用来存储数据的,随着数据库技术的发 展,海量数据库和大型数据库系统已经广泛的应用. 二.三层 这三层是物理数据层.概念数据层和用户数据层,想想和现在学习的三层还是很像的 1.物理数据层:存储最原始的数据 可参考数据访问层(D) 2.逻

Play再识 - 不放弃的执着

从写Play初识时,前面各种称赞play如何如何解放java web开发,最后因为网络被墙而无法正常编译,从而想到放弃.从来都有成为web开发高手的想法,今天又再一次进行尝试,惊喜的是有新的进展. 首先,你得FQ,得打破国内网络的桎梏,推荐使用mxvpn,免费得话使用日本线路,但是用的人特别多,导致网络特别慢,建议购买一个月的套餐试试,我反正是买了,效果不错. 其次,你最好把activator的完整包下载下来.这个完整包是包含一个本地的play开发的网站ide,输入activator ui即可,

JS魔法堂:再识Number type

Brief 本来只打算理解JS中0.1 + 0.2 == 0.30000000000000004的原因,但发现自己对计算机的数字表示和运算十分陌生,于是只好恶补一下.以下是恶补后的成果: 基础野:细说原码.反码和补码(http://www.cnblogs.com/fsjohnhuang/p/5060242.html) 基础野:细说无符号整数(http://www.cnblogs.com/fsjohnhuang/p/5078290.html) 基础野:细说有符号整数(http://www.cnbl

软工视频再识

前一段时间已经对视频刚开始的一段做了一个总结,最近一直忙着自考视频进度有点慢,现在才做总结.每一章只有总结才能有收获要不然感觉跟没有看差不多,总结让我把知识串成一串珍珠. 第四.五章讲的面向过程的设计方法,从工程管理的角度分为概要设计和详细设计,概要设计是在总体设计的基础上对系统总体结构的细化,将系统分为很多的子系统和模块,就好像我们要爬上一样,一看这么高的上,就感觉很恐惧,但我们可以把它分成若干段,然后就是再分为每一小节,这样当到达山顶的时候就会特别有成就感.我们设计系统先进行概要设计分为多个

JS魔法堂:再识IE的内存泄露

一.前言 IE6~8除了不遵守W3C标准和各种诡异外,我想最让人诟病的应该是内存泄露的问题了.这阵子趁项目技术调研的机会好好的再认识一回,以下内容若有纰漏请大家指正,谢谢! 目录一大坨! 二.内存泄漏到底是哪里漏了? 2.1. JS Engine Object.DOM Element 和 BOM Element 2.2. JS Engine Object的内存回收机制 2.3. DOM Element的内存回收机制 2.4. 两种泄漏方式 三.4种泄漏模式  3.1. Circular Refe

VB.net学习笔记(二十三)再识委托

一.调用静态方法 1.声明 委托须使用前声明定义,可以带参数(一个或多个),可以有返回值. '位于一个模块或类的声明部分 Delegate Sub OneArgSub{ByVal msg As String) '带一个参数,且无返回类型 定义了一个委托的类.后台创建了一个名为OneArgSub的新类,这个类是从System.Delegate类继承而来的.(更准确地说从 Systetn.MuhicastDelegate 继承而来的,而 System.MulticastDelegate 则又是从 S

C#再识委托

C# 1 1.什么是委托 委托是一种定义方法签名的类型.当实例化委托时,您可以将其实例与任何具有兼容签名的方法相关联. 您可以通过委托实例调用方法.(MSDN) 委托类似于 C++函数指针,但它们是类型安全的 委托允许将方法作为参数进行传递 委托可用于定义回调方法 委托可以链接在一起 方法不必与委托签名完全匹配.(协变与逆变) C# 2.0 版引入了匿名方法的概念,此类方法允许将代码块作为参数传递,以代替单独定义的方法. C#3.0引入了Lambda表达式,利用它们可以更简练地编写内联代码块.匿

再识网络编程

---------------------不定时的更新开始了,且更且珍惜------------------------ 我要每次都写一遍:前面的还没补完,以此催促不定时更新的我 ----------------- 嗯,今天没带耳机,没边听歌边更博,没灵感改歌词 ------------------ socket(套接字)编程 基于socket实现客户端与服务端编程: 想象成一个打电话的过程,要想打电话我们必须要先有手机,所以要先去买手机,然后插手机卡,开机等一系列步骤 服务端必备的三要素: 1

再识iptables规则

实验机器 测试机:192.168.1.140 CentOS release 6.5 客户端:192.168.1.179 CentOS release 6.5 描述,客户端通过nmap知悉测试机的22.80.443.3306tcp接口有服务. 1.基础部分 查看是否开启状态:service iptables status -nvL 就是查看规则, -F 是临时清除当前规则,重启系统或者重启 iptalbes 服务后还会加载已经保存的规则,所以需要使用 /etc/init.d/iptables sa