【spark】示例:二次排序

我们有这样一个文件

        

首先我们的思路是把输入文件数据转化成键值对的形式进行比较不就好了嘛!

但是你要明白这一点,我们平时所使用的键值对是不具有比较意义的,也就说他们没法拿来直接比较。

我们可以通过sortByKey,sortBy(pair._2)来进行单列的排序,但是没法进行两列的同时排序。

那么我们该如何做呢?

我们可以自定义一个键值对的比较类来实现比较,

类似于JAVA中自定义类实现可比较性实现comparable接口。

我们需要继承Ordered和Serializable特质来实现自定义的比较类。

1.读取数据创建rdd

2.根据要求来定义比较类

  任务要求,先根据key进行排序,相同再根据value进行排序。

  我们可以把键值对当成一个数据有两个数字,先通过第一个数字比大小,再通过第二个数字比大小。

  (1)我们定义两个Int参数的比较类

  (2)继承Ordered 和 Serializable 接口 实现 compare 方法实现可以比较

class UDFSort (val first:Int,val second:Int) extends Ordered[UDFSort] with Serializable {
    override def compare(that: UDFSort): Int = {
        if(this.first - that.first != 0){//第一个值不相等的时候,直接返回大小
            this.first - that.first //返回值
        }
        else {//第一个值相等的时候,比较第二个值
            this.second - that.second
        }
    }
}

其实,懂java的人能看出来这个跟实现comparable很类似。

3.处理rdd

我们将原始数据按照每行拆分成一个含有两个数字的数组,然后传入我们自定义的比较类中

不是可以通过UDFSort就可以比较出结果了吗,

但是我们不能把结果给拆分掉,也就是说,我们只能排序,不能改数据。

我们这样改怎么办?

我们可以生成键值对的形式,key为UDFSort(line(0),line(1)),value为原始数据lines。

这样,我们通过sortByKey就能完成排序,然后通过取value就可以保持原始数据不变。

4.排序取结果

完整代码

package SparkDemo

import org.apache.spark.{SparkConf, SparkContext}

class UDFSort (val first:Int,val second:Int) extends Ordered[UDFSort] with Serializable {//自定义比较类
    override def compare(that: UDFSort): Int = {
        if(this.first - that.first != 0){//第一个值不相等的时候,直接返回大小
            this.first - that.first //返回值
        }
        else {//第一个值相等的时候,比较第二个值
            this.second - that.second
        }
    }
}
object Sort{
    def main(args:Array[String]): Unit ={
        //初始化配置:设置主机名和程序主类的名字
        val conf = new SparkConf().setAppName("UdfSort");
        //通过conf来创建sparkcontext
        val sc = new SparkContext(conf);
        val lines = sc.textFile("file:///...")
        //转换为( udfsort( line(0),line(1) ),line ) 的形式
        val pair = lines.map(line => (new UDFSort(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line))
        //对key进行排序,然后取value
        val result = pair.sortByKey().map( x => x._2)
    }
}

  

原文地址:https://www.cnblogs.com/zzhangyuhang/p/9038580.html

时间: 2024-11-11 06:27:55

【spark】示例:二次排序的相关文章

Spark的二次排序

1.数据样本: 1 52 43 61 32 11 142 454 113 235 126 13 2.排序规则:先按照第一个字符排序,如果第一个相同,再按照第二个字符排序 3.排序后的结果 1 31 51 142 12 42 453 63 234 115 126 13 4.spark二次排序实现 4.1.自定义key package com.test.spark /** * @author admin * scala处理二次排序的类 * 自定义key */ class SecondSortByKe

Spark用Java实现二次排序的自定义key

本人在研究Spak,最近看了很多网上的对于SPARK用Java实现二次排序的方法,对于自定义key的做法 基本上都是实现Ordered<>接口,重写$greater.$greater$eq.$less.$less$eq.compare.compareTo方法,定义hashCode.equals····· 感觉好麻烦,其实我们自定义key只是用了里面的compareTo方法,其他的$greater.$greater$eq.$less.$less$eq.compare 不用做任何改动,hashCo

Spark(二)算子详解

目录 Spark(二)算子讲解 一.wordcountcount 二.编程模型 三.RDD数据集和算子的使用 Spark(二)算子讲解 @ 一.wordcountcount 基于上次的wordcount,我们来写一个wordcountcount,来对wc程序进行第二次计数,我们来分析一下性能. package com.littlepage.wc import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkCon

python 实现Hadoop的partitioner和二次排序

我们知道,一个典型的Map-Reduce过程包 括:Input->Map->Patition->Reduce->Output.Pation负责把Map任务输出的中间结果 按key分发给不同的Reduce任务进行处理.Hadoop 提供了一个非常实用的partitioner类KeyFieldBasedPartitioner,通过配置相应的参数就可以使用.通过 KeyFieldBasedPartitioner可以方便地实现二次排序. 使用方法:       -partitioner o

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

使用java 实现二次排序

二次排序工具类: import java.io.Serializable; import scala.math.Ordered; /** * @author 作者 E-mail: * @version 创建时间:2017年8月30日 下午3:48:11 * 类说明 */ //二次排序key public class SecondeIndexSort implements Ordered<SecondeIndexSort>, Serializable{ private static final

3.算子+PV&amp;UV+submit提交参数+资源调度和任务调度源码分析+二次排序+分组topN+SparkShell

1.补充算子 transformations ?  mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值. ?  repartition 增加或减少分区.会产生shuffle.(多个分区分到一个分区不会产生shuffle) 多用于增多分区. 底层调用的是coalesce ?  coalesce(合并) coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle. true为产生shuffle,false不产生shuff

数据结构(十二)——排序算法

数据结构(十二)--排序算法 一.排序简介 1.排序的一般定义 排序是计算机中经常进行的操作,目的在于将一组无序的数据元素调整为有序的数据元素.序列:1,20,45,5,2,12排序后:1,2,5,12,20,45 2.排序的数学定义 3.排序的稳定性 如果序列中的两个元素R[i].R[j],关键字分别为K[i].K[j],并且在排序之前R[i]排在R[j]前面,如果排序操作后,元素R[i]仍然排在R[j]前面,则排序方法是稳定的:否则排序是不稳定的. 4.排序实现的关键 比较:任意两个数据元素

Hadoop Mapreduce分区、分组、二次排序

1.MapReduce中数据流动   (1)最简单的过程:  map - reduce   (2)定制了partitioner以将map的结果送往指定reducer的过程: map - partition - reduce   (3)增加了在本地先进性一次reduce(优化)过程: map - combin(本地reduce) - partition -reduce2.Mapreduce中Partition的概念以及使用.(1)Partition的原理和作用        得到map给的记录后,

MapReduce排序之 二次排序

一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序. 二:技术实现 我们先来看案例需求 #需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开) [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 MapRed