Spark高级排序彻底解密(DT大数据梦工厂)

内容:

1、基础排序算法实战;

2、二次排序算法实战;

3、更高局级别排序算法;

4、排序算法内幕解密;

为啥讲排序?因为在应用的时候都有排序要求。

海量数据经常排序之后要我们想要的内容。

==========基础排序算法============

scala> sc.setLogLevel("WARN")

scala> val x = sc.textFile("/historyserverforSpark/README.md", 3).flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_,1)

x: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:27

scala> x.map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).collect

16/02/08 20:34:00 WARN client.AppClient$ClientEndpoint: Connection to Worker2:7077 failed; waiting for master to reconnect...

16/02/08 20:34:01 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...

16/02/08 20:34:01 WARN client.AppClient$ClientEndpoint: Connection to Worker2:7077 failed; waiting for master to reconnect...

res11: Array[(String, Int)] = Array(("",67), (the,21), (to,14), (Spark,13), (for,11), (and,10), (##,8), (a,8), (run,7), (is,6), (can,6), (on,5), (of,5), (in,5), (if,4), (also,4), (you,4), (build,3), (including,3), (Please,3), (use,3), (or,3), (Hadoop,3), (documentation,3), (example,3), (an,3), (You,3), (with,3), (For,2), (This,2), (Hive,2), (To,2), (SparkPi,2), (refer,2), (Interactive,2), (be,2), (./bin/run-example,2), (1000:,2), (tests,2), (examples,2), (at,2), (using,2), (Shell,2), (class,2), (`examples`,2), (set,2), (Hadoop,,2), (cluster,2), (supports,2), (Python,2), (particular,2), (general,2), (locally,2), (following,2), (which,2), (should,2), ([project,2), (do,2), (how,2), (It,2), (Scala,2), (detailed,2), (return,2), (one,2), (Python,,2), (building,2), (that,2), (SQL,2), (guidance...

/**
 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
 * `collect` or `save` on the resulting RDD will return or output an ordered list of records
 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
 * order of the keys).
 */
// TODO: this currently doesn‘t work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

reduceByKey(_+_,1),RDD的并发是继承的

==========二次排序实战============

所谓二次排序就是排序的时候考虑两个纬度

建立HelloSort.txt,内容:

2 3

4 1

3 2

4 3

9 7

2 1

第一列有2个4,无法排了,这个时候就要借助第二列了,这个就是二次排序

二次排序知识入门,如果是10次排序,20次排序,100次排序呢?怎么办?所以二次排序要把这些都考虑了。。。

自定义Key才是以不变应万变!

JAVA版本代码

package com.dt.spark.SparkApps.cores;

import java.io.Serializable;

import scala.math.Ordered;

/**

* 自定义二次排序

* @author 威

*

*/

public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable {

//需要二次排序的key

private int first;

public int getFirst() {

return first;

}

public void setFirst(int first) {

this.first = first;

}

public int getSecond() {

return second;

}

public void setSecond(int second) {

this.second = second;

}

private int second;

public SecondarySortKey(int first,int second){

this.first = first;

this.second = second;

}

@Override

public boolean $greater(SecondarySortKey other) {

if(this.first>other.getFirst()){

return true;

}else if(this.first == other.getFirst()&&this.second>other.getSecond()){

return true;

}

return false;

}

@Override

public boolean $greater$eq(SecondarySortKey other) {

if(this.$greater(other)){

return true;

}else if(this.first==other.getFirst()&&this.second==other.getSecond()){

return true;

}

return false;

}

@Override

public boolean $less(SecondarySortKey other) {

if(this.first<other.getFirst()){

return true;

}else if(this.first==other.getFirst()&&this.second<other.getSecond()){

return true;

}

return false;

}

@Override

public boolean $less$eq(SecondarySortKey other) {

if(this.$less(other)){

return true;

}else if(this.first==other.getFirst()&&this.second==other.getSecond()){

return true;

}

return false;

}

@Override

public int compare(SecondarySortKey other) {

if(this.first-other.getFirst() != 0){

return this.first-other.getFirst();

}else{

return this.second-other.getSecond();

}

}

@Override

public int compareTo(SecondarySortKey other) {

if(this.first-other.getFirst() != 0){

return this.first-other.getFirst();

}else{

return this.second-other.getSecond();

}

}

@Override

public int hashCode() {

final int prime = 31;

int result = 1;

result = prime * result + first;

result = prime * result + second;

return result;

}

@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (obj == null)

return false;

if (getClass() != obj.getClass())

return false;

SecondarySortKey other = (SecondarySortKey) obj;

if (first != other.first)

return false;

if (second != other.second)

return false;

return true;

}

}

package com.dt.spark.SparkApps.cores;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**

* 二次排序

* @author

* 第一步:按照ordered和 Serializable接口实现自定义排序的key

* 第二步:将要进行二次排序的文件加载进来生成key,value类型的RDD

* 第三步:基于自定义的使用sortByKey进行二次排序

* 第四步:去除掉排序的key,只保留排序的结果

*/

public class SecondarySortApp {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("SecondarySortApp" ).setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);// 其地产曾实际上就是Scala的SparkContext

JavaRDD<String> lines = sc.textFile("F:/exepro/scala/HelloSort.txt" );

JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair( new PairFunction<String, SecondarySortKey, String>() {

@Override

public Tuple2<SecondarySortKey, String> call(String line) throws Exception {

String[] splited = line.split( " ");

SecondarySortKey key = new SecondarySortKey(Integer.valueOf(splited[0]),Integer. valueOf(splited[1]));

return new Tuple2<SecondarySortKey,String>(key,line);

}

});

JavaPairRDD<SecondarySortKey, String> sorted = pairs.sortByKey();

//过滤掉排序后自定义的key,保留排序的结果

JavaRDD<String> secondarySorted = sorted .map(new Function<Tuple2<SecondarySortKey,String>, String>() {

@Override

public String call(Tuple2<SecondarySortKey, String> sortedContent) throws Exception {

return sortedContent ._2 ;

}

});

secondarySorted.foreach(new VoidFunction<String>() {

@Override

public void call(String str) throws Exception {

// TODO Auto-generated method stub

System. out.println(str );

}

});

}

}

结果:

16/02/08 21:37:49 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms

2 1

2 3

3 2

4 1

4 3

9 7

16/02/08 21:37:49 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver

Scala版本代码:

package com.dt.spark.cores

/**
  * 自定义二次排序
  * Created by 威 on 2016/2/9.
  */
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
  def compare(other:SecondarySortKey): Int ={
    if(this.first-other.first!=0){
      this.first-other.first
    }else{
      this.second-other.second
    }
  }
}

package com.dt.spark.cores

import org.apache.spark.{SparkContext, SparkConf}

/**
  * 二次排序
  *
  * @author 威
  * 第一步:按照ordered和Serializable接口实现自定义排序的key
  * 第二步:将要进行二次排序的文件加载进来生成key,value类型的RDD
  * 第三步:基于自定义的使用sortByKey进行二次排序
  * 第四步:去除掉排序的key,只保留排序的结果
  */
object SecondarySortApp {
  def main(args: Array[String]) {
    val conf = new SparkConf()//创建SparkConf对象
    conf.setAppName("SecondarySortApp")//设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("local")//此时程序在本地运行,不需要安装Spark集群
    val sc = new SparkContext(conf)//通过创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
    val lines = sc.textFile("F:/exepro/scala/HelloSort.txt")//通过HadoopRDD以及MapPartitionRDD获取文件中每一行的内容本身

val pairWithSortKey = lines.map(line=>{
      val splited = line.split(" ")
      (new SecondarySortKey(splited(0).toInt,splited(1).toInt),line)
    })

val sorted = pairWithSortKey.sortByKey(false)

val secondarySorted = sorted.map(sortedLine=>sortedLine._2)

secondarySorted.collect().foreach(println)

}
}

Scala太简洁了。

作业:

1、用Scala实现二次排序(三次排序),key用object的apply来实现;

2、自己去阅读RangePartitioner的源代码;

时间: 2024-08-16 15:52:49

Spark高级排序彻底解密(DT大数据梦工厂)的相关文章

Spark on Yarn彻底解密(DT大数据梦工厂)

内容: 1.Hadoop Yarn的工作流程解密: 2.Spark on Yarn两种运行模式实战: 3.Spark on Yarn工作流程解密: 4.Spark on Yarn工作内幕解密: 5.Spark on Yarn最佳实践: 资源管理框架Yarn Mesos是分布式集群的资源管理框架,和大数据没关系,但是可以管理大数据的资源 ==========Hadoop Yarn解析============ 1.Yarn是Hadoop推出的资源管理器,是负责分布式(大数据)集群计算的资源管理的,负

Spark Executor内幕彻底解密(DT大数据梦工厂)

内容: 1.Spark Executor工作原理图: 2.ExecutorBackend注册源码解密: 3.Executor实例化内幕: 4.Executor具体是如何工作的? 1.Master发指令给Worker启动Executor: 2.Worker接受到Master发送来的指令,通过ExecutorRunner启动另外一个进程来运行Executor: 3.此时会启动粗粒度的ExecutorBackend(CoarseGrainedExecutorBackend): 4.CoarseGrai

Spark内核架构解密(DT大数据梦工厂)

只有知道内核架构的基础上,才知道为什么要这样写程序? 手工绘图来解密Spark内核架构 通过案例来验证Spark内核架构 Spark架构思考 ==========Spark Runtime的几个概念============ 下载下来运行,基本都是standalone模式,如果掌握了standalone,则yarn和mesos,以后不做特别说明,一律是standalone模式 application=driver+executor,executor是具体处理数据分片,里面是线程池并发的处理数据分片

Spark Sort-Based Shuffle内幕彻底解密(DT大数据梦工厂)

内容: 1.为什么使用Sorted-Based Shuffle: 2.Sorted-Based Shuffle实战: 3.Sorted-Based Shuffle内幕: 4.Sorted-Based Shuffle的不足: 最常用的Shuffle方式,Sorted-Based Shuffle涉及了大规模Spark开发.运维时核心问题,以及答案的要害所在. 必须掌握这一讲内容. 本课是从Spark初级人才成功升级为Spark中级人才的通道. 稍有水平的大公司,面试内容本讲肯定会涉及. ======

SparkRDD解密(DT大数据梦工厂)

第一阶段,彻底精通Spark 第二阶段,从0起步,操作项目 Hadoop是大数据的基础设施,存储等等 Spark是计算核心所在 1.RDD:基于工作集的应用抽象 2.RDD内幕解密 3.RDD思考 不掌握RDD的人,不可能成为Spark的高手 绝对精通RDD,解决问题的能力大大提高 各种框架底层封装的都是RDD,RDD提供了通用框架 RDD是Spark的通用抽象基石 顶级SPark高手, 1.能解决问题.性能调优: 2.Spark高手拿Spark过来就是修改的 ==========基于工作集的应

Spark Runtime(Driver、Masster、Worker、Executor)内幕解密(DT大数据梦工厂)

内容: 1.再论Spark集群部署: 2.Job提交解密: 3.Job的生成和接受: 4.Task的运行: 5.再论Shuffle: 从一个作业视角,透过Master.Drvier.Executor来透视Spark Runtime ==========再论Spark集群部署============ 官网中关于集群的部署: 默认情况下,每个Worker下有一个Executor,会最大化的使用内存和CPU. Master发指令给Worker来分配资源,不关心Worker能不能分配到这个资源,他发给多

Spark天堂之门(SparkContext)解密(DT大数据梦工厂)

内容: 1.Spark天堂之门: 2.SparkContext使用案例鉴赏: 3.SparkContext内幕: 4.SparkContext源码解密: SparkContext是编写任意Spark程序的第一个对象,用SparkConf为传入的参数 ==========Spark天堂之门:SparkContext !!!============ 1.Spark程序在运行的时候分为Driver和Executors: 2.Spark程序编写是基于SparkContext的,具体来说包含两个方面: 1

Master HA彻底解密(DT大数据梦工厂)

内容: 1.Master HA解析: 2.Master HA的四种方式: 3.Master HA的内部工作机制: 4.Master HA的源码解密: 本讲主要源码角度分析Master HA,因为在生产环境必然要做的 ==========Master HA解析============ Spark是Master-Slave的结构 现在业界是1个Master Active,2个以上standby 如果有HA的话,切换active的时候,会在上次运行的基础上继续运行 Drvier提交程序.申请资源,是跟

HA下Spark集群工作原理(DT大数据梦工厂)

Spark高可用HA实战 Spark集群工作原理详解 资源主要指内存.CPU 如果是单点的话,Master如果出现故障,则集群不能对外工作 Spark通过Zookeeper做HA,一般做HA是一个active级别,standby active就是当前工作 standby是随时准备active的挂了之后会切换成为active级别 以前一般是2台机器,一个active,一个standby 现在一般是3台机器,一个active,两个standby,甚至3台以上 Zookeeper包含了哪些内容:所有的