一.示例
1.统计PV和UV
1.1统计PV
val conf = new SparkConf() conf.setMaster("local").setAppName("pvuv") val sc = new SparkContext(conf) val lineRDD = sc.textFile("./pvuv.txt")? lineRDD.map(x=>{ val sp=x.split("\\s") (sp(5),1) }).reduceByKey(_+_).foreach(println)
1.2统计UV
lineRDD.map(x=>{ val sp=x.split("\\s") (sp(5),sp(0)) }).distinct().countByKey().foreach(println)
2.二次排序
?SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("SecondarySortTest");final JavaSparkContext sc = new JavaSparkContext(sparkConf);?JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");?JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {? /** * */ private static final long serialVersionUID = 1L;? @Override public Tuple2<SecondSortKey, String> call(String line) throws Exception { String[] splited = line.split(" "); int first = Integer.valueOf(splited[0]); int second = Integer.valueOf(splited[1]); SecondSortKey secondSortKey = new SecondSortKey(first,second); return new Tuple2<SecondSortKey, String>(secondSortKey,line); }});?pairSecondRDD.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() { /** * */ private static final long serialVersionUID = 1L;? @Override public void call(Tuple2<SecondSortKey, String> tuple) throws Exception { System.out.println(tuple._2); }});?public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{ /** * */ private static final long serialVersionUID = 1L; private int first; private int second; public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } public SecondSortKey(int first, int second) { super(); this.first = first; this.second = second; } @Override public int compareTo(SecondSortKey o1) { if(getFirst() - o1.getFirst() ==0 ){ return getSecond() - o1.getSecond(); }else{ return getFirst() - o1.getFirst(); } }}?
3.分组取topN
SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOps");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> linesRDD = sc.textFile("scores.txt");?JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {?/** * */private static final long serialVersionUID = 1L;?@Overridepublic Tuple2<String, Integer> call(String str) throws Exception { String[] splited = str.split("\t"); String clazzName = splited[0]; Integer score = Integer.valueOf(splited[1]); return new Tuple2<String, Integer> (clazzName,score); }});?pairRDD.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {? /** * */ private static final long serialVersionUID = 1L;? @Override public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception { String clazzName = tuple._1; Iterator<Integer> iterator = tuple._2.iterator(); Integer[] top3 = new Integer[3]; while (iterator.hasNext()) { Integer score = iterator.next();? for (int i = 0; i < top3.length; i++) { if(top3[i] == null){ top3[i] = score; break; }else if(score > top3[i]){ for (int j = 2; j > i; j--) { top3[j] = top3[j-1]; } top3[i] = score; break; } } } System.out.println("class Name:"+clazzName); for(Integer sscore : top3){ System.out.println(sscore); }}});
一.广播变量和累加器
1.广播变量
- 广播变量理解图
- 源码
-
val conf = new SparkConf()conf.setMaster("local").setAppName("brocast")val sc = new SparkContext(conf)val list = List("hello xasxt")val broadCast = sc.broadcast(list)val lineRDD = sc.textFile("./words.txt")lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}sc.stop()
-
- 注意
- 广播变量只能在driver端定义,不能Executor端定义
- 只能在driver端修改变量的值
2.累加器
- 累加器理解图
- 源码
-
val conf = new SparkConf()conf.setMaster("local").setAppName("accumulator")val sc = new SparkContext(conf)val accumulator = sc.longAccumulatorsc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}println(accumulator.value)sc.stop()
-
二.调度源码分析
三.SparkShuffle
1.shuffle概念
- reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>,每一个可以对应一个聚合起来的value
- 由于聚合之key对应的value有可能在不同的partition上,name该如何聚合??
- shuffle write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。
- shuffle read:reduce task就会从上一个stage的所有task所在的机器上寻找属于己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合
- 常见的shuffle有两种类型:HaskShuffle和SortShuffle
2.HashShuffle
1.普通机制
示意图
执行流程
每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K,buffer起到数据缓存的作用
每一个buffer文件最后对应一个磁盘小文件
reduce task来拉取对应的磁盘小文件
总结
map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去,ReduceTask会 去map端拉取响应的磁盘小文件
产生磁盘小文件的个数
M(map task的个数)*R(reduce task的个数)
存在的问题
在shuffle write过程中会产生很多写磁盘小文件的对象
在shuffle read过程中会产生很多读取磁盘小文件的对象
在JVM堆内存中对象过多就会造成频繁的GC,若GC还无法解决运行所需要的内存的话,就会产生OOM问题
在数据传输中会有频繁的网络通信,出现通信故障的可能性大大增加,通信故障a) 导致的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。
2.合并机制
示意图
总结
产生磁盘小文件的个数:C(core的个数)*R(reduce的个数)
3.SortShuffle
1.普通机制
示意图
执行流程
map task的计算结果会写入到一个内存数据结构里面,内存数据结构默认5M
在shuffle的时候还有一个定时器,不定期的取估算这个内存结构的大小,当内存结构中的数据超过5M,会申请更多的资源给内存数据结构
如果申请成功不会进行溢写,如果申请不成功,就会发生溢写磁盘
在溢写之前内存结构中的数据会进行排序分区
开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是一万条数据
map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件
reduce task去map端拉取数据时,首先解析索引文件,根据索引文件再去拉取对应的数据
产生磁盘小文件的个数: 2*M(map task的个数)
2.bypass机制
示意图
总结
bypass运行机制的触发条件: shuffle reduce task 的数量小于spark.shuffle.sort.bypassMergeThreshold的参数值。默认值是200
不需要进行map端的预聚合
产生的磁盘小文件为: 2*M(map task的个数)
4.shuffle文件寻址
1.主要对象
mapoutputtracker spark架构中的一个模块,是一个主从架构,管理磁盘小文件的地址
mapoutputtrackermaster是主队象,存在于driver中
mapoutputtrackerworker是从队象,存在于executor中
blockmanagerpark架构中的一个模块,是一个主从架构,块管理者
BlockManagerMaster,主对象,存在于Driver中。会在及群众有用到广播变量或缓存数据或删除缓存数据的时候,通知BlockManagerSlave传输或者删除数据。
BlockManagerworker,从对象,存在于executor中。BlockManagerworker与BlockManagerworker之间通信无论在Driver端的BlockManager还是在Excutor端的BlockManager都含有四个对象:
diskstore:负责磁盘的管理
memorystore:负责内存的管理
connectionmanager:负责连接其他的blockmanagerworker
blocktransferservice:负责数据的传输
2.shuffle寻址流程
寻址图
寻址流程
当map task执行完成后,会将task的执行情况和磁盘小文件的地址封装到MpStatus对象中通过mapoutputtrackerworker对象向mapoutputtrackermaster汇报
在所有的map task执行完毕后,driver中就掌握所有的磁盘小文件的地址
在reduce task执行之前,会通过executor中mapoutputtrackerworker向driver端的mapoutputtrackermaster获取磁盘小文件的地址
获取到磁盘小文件的地址后,会通过blockmanager中的connectionmanager连接数据所在节点上的connectionmanager,然后通过blocktransferservice进行数据的传输
blocktransferservice默认启动5个task去节点拉取数据.默认情况下,5个task拉取数据量不能超过48M
5.内存管理
Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责创建SparkContext上下文,提交任务,task的分发等。Executor负责task的计算任务,并将结果返回给Driver。同时需要为需要持久化的RDD提供储存。Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。
使用静态内存可以通过参数spark.memory.useLegacyMode 设置为true(默认为false)使用静态内存管理。
1.静态内存管理
中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。
整个内存被分为三部分:
task计算占20%
shuffle聚合内存占20%,其中的10%预留,防止OOM
剩下60%中的10%预留,防止OOM问题,其中这60%中的90%中的80%用于存储RDD的缓存数据和广播变量,剩下的90%中的20%用于解压序列化数据
2.统一内存管理
与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。
主要分四部分:
预留总内存的重的300M,用于JVM自身运行
总内存-300M的25%用于task计算(spark2.0以后是40%)
总内存-300M的75%中的50%用于shuffle聚合,剩下的用于存储RDD缓存数据和广播变量.这里的两部分可以相互动态借用
reduce 中OOM如何处理?
减少每次拉取的数据量
提高shuffle聚合的内存比例
提高Excutor的总内存
6.shuffle调优
SparkShuffle调优配置项如何使用?
在代码中,不推荐使用,硬编码。
new SparkConf().set(“spark.shuffle.file.buffer”,”64”)
在提交spark任务的时候,推荐使用。
spark-submit --conf spark.shuffle.file.buffer=64 –conf ….
在conf下的spark-default.conf配置文件中,不推荐,因为是写死后所有应用程序都要用。
部分调优参数
spark.shuffle.file.buffer默认值:32k参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。?spark.reducer.maxSizeInFlight默认值:48m参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。?spark.shuffle.io.maxRetries默认值:3参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。shuffle file not find taskScheduler不负责重试task,由DAGScheduler负责重试stage?spark.shuffle.io.retryWait默认值:5s参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。?spark.shuffle.memoryFraction默认值:0.2参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。?spark.shuffle.manager默认值:sort|hash参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。?spark.shuffle.sort.bypassMergeThreshold默认值:200参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。?spark.shuffle.consolidateFiles默认值:false参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
原文地址:https://www.cnblogs.com/ruanjianwei/p/12119356.html