Spark自定义排序与分区

Spark自定义排序与分区

前言:

随着信息时代的不断发展,数据成了时代主题,今天的我们徜徉在数据的海洋中;由于数据的爆炸式增长,各种数据计算引擎如雨后春笋般冲击着这个时代。作为时下最主流的计算引擎之一 Spark也是从各方面向时代展示自己的强大能力。Spark无论是在数据处理还是数据分析、挖掘方面都展现出了强大的主导能力。其分布式计算能力受到越来越多的青睐。本文将介绍spark的排序以及分区。

一、Spark自定义排序

在spark中定义了封装了很多高级的api,在我们的日常开发中使用这些api能获得不少的便利。但是有的时候这些默认的规则并不足以实现我们的目的,这时候需要我们了解其底层原理,编写一套适合我们需求的处理逻辑。下面通过代码简单介绍一下spark如何自定义排序。

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object CustomSort1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("CustomSort1").setMaster("local[*]")

    val sc = new SparkContext(conf)

    //排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序

    val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")

    //将Driver端的数据并行化变成RDD

    val lines: RDD[String] = sc.parallelize(users)

    //切分整理数据

    val userRDD: RDD[User] = lines.map(line => {

      val fields = line.split(" ")

      val name = fields(0)

      val age = fields(1).toInt

      val fv = fields(2).toInt

      //(name, age, fv)

      new User(name, age, fv)

    })

    //不满足要求

    //tpRDD.sortBy(tp => tp._3, false)

    //将RDD里面装的User类型的数据进行排序

    val sorted: RDD[User] = userRDD.sortBy(u => u)

    val r = sorted.collect()

    println(r.toBuffer)

    sc.stop()

  }

}

class User(val name: String, val age: Int, val fv: Int) extends Ordered[User] with Serializable {

  override def compare(that: User): Int = {

    if(this.fv == that.fv) {

      this.age - that.age

    } else {

      -(this.fv - that.fv)

    }

  }

  override def toString: String = s"name: $name, age: $age, fv: $fv"

}

对于自定义排序有多种方式实现:

1、User类继承Ordered使User类变成可排序的类。在spark中由于我们虽然测试是在本地测试,但是他会模拟集群模式,所以我们自定义的object在运行时会shuffle有网络传输会涉及序列化的问题。所以需要同时继承Serializable。

2、使用case class样例类:

 case class Man(age: Int, fv: Int) extends Ordered[Man] {}

不需要继承序列化类,case class默认已经实现序列化。

3、定义样例类隐式排序规则

 object SortRules {

  implicit object OrderingUser extends Ordering[User] {

    override def compare(x: User, y: User): Int = {

      if(x.fv == y.fv) {

        x.age - y.age

      } else {

        y.fv - x.fv

      }

    }

  }

}

主程序代码:

//切分整理数据

    val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {

      val fields = line.split(" ")

      val name = fields(0)

      val age = fields(1).toInt

      val fv = fields(2).toInt

      (name, age, fv)

    })

    //排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)

    import SortRules.OrderingUser

    val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => User(tp._2, tp._3))

4、某些特殊数据类型不需要自定义,使用原生api更方便。

//充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个

val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))

5、将排序规则添加到隐士转换中

  //Ordering[(Int, Int)]最终比较的规则格式

    //on[(String, Int, Int)]未比较之前的数据格式

    //(t =>(-t._3, t._2))怎样将规则转换成想要比较的格式

    implicit val rules = Ordering[(Int, Int)].on[(String, Int, Int)](t =>(-t._3, t._2))

val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => tp)

二、Spark自定义分区器

1、combineByKey

在reduceByKey、groupByKey等算子都基于combineByKey算子实现。这是一个底层的算子,可以自定义一些规则,比较灵活。

Rdd.combineByKey(x=>x,(m:Int,n:Int)=>m+n,(a:Int,B:Int)=>a+b,new HashPartition(2),true,null)

参数解释:

(1)、相同key的value放入一个分区

(2)、局部聚合

(3)、全局聚合

(4)、分区数(可以设置分区数)

(5)、是否进行map端局部聚合

(6)、序列化参数

conbineByKey是一个较为底层的api,一般情况下可能不会用到它,但是当一些高级api满足不了我们的需求的时候它给我们提供了解决便利。

2、自定义分区器

在spark计算中不可避免的会涉及到shuffle,数据会根据不同的规则有分区器分发到不同的分区中。所以分区器决定了上游的数据发送到哪个下游。以不同专业学生数据计算不同专业的学生成绩。分组取topN :

(1)、自定义分区器

//自定义分区器:majors:专业集合

class MajorParitioner(majors: Array[String]) extends Partitioner {

  //相当于主构造器(new的时候回执行一次)

  //用于存放规则的一个map

  val rules = new mutable.HashMap[String, Int]()

  var i = 0

  for(major<- majors) {

    //rules(major) = i

    rules.put(major, i)

    i += 1

  }

  //返回分区的数量(下一个RDD有多少分区)

  override def numPartitions: Int = majors.length

  //根据传入的key计算分区标号

  //key是一个元组(String, String)

  override def getPartition(key: Any): Int = {

    //获取key

    val major= key.asInstanceOf[(String, String)]._1

    //根据规则计算分区编号

    rules(major)

  }

}

(2)、使用自定义分区器

//调用自定义的分区器,并且按照指定的分区器进行分区

    val majorPatitioner = new MajorParitioner(subjects);

    //partitionBy按照指定的分区规则进行分区

    //调用partitionBy时RDD的Key是(String, String)

    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(majorPatitioner )

    //如果一次拿出一个分区(可以操作一个分区中的数据了)

    val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {

      //将迭代器转换成list,然后排序,在转换成迭代器返回

      it.toList.sortBy(_._2).reverse.take(topN).iterator

    })

    //

val r: Array[((String, String), Int)] = sorted.collect()

通过这样自定义分区器后,数据通过shuffle之后每个分区的数据就是一个专业的学生数据,对这个分区的数据排序后取出前N个就是所需结果了。但是这个程序中还是会出现一个问题,当数据量太大的时候可能会导致内存溢出的情况,因为我们是将数据放到了list中进行排序,而list是存放于内存中。所以会导致内存溢出。那么怎么才能避免这个情况呢。我们可以在mapPartitions内部定义一个集合,不加载所有数据。,每次将这个集合排序后最小的值移除,通过多次循环后最终集合中剩下的就是需要的结果。

三、总结

无论是排序还是分区,在spark中都封装了高级的api共我们使用,但是他不会适用于所有情况,只会适用与部分情况,而通过对这些api的底层实现了解,通过自定义规则可以编辑一套适合于我们需求的程序。这样一来可以大大提高效率。没有什么能适配万物,随机应变才是取胜之道。

原文地址:https://www.cnblogs.com/lsbigdata/p/10933494.html

时间: 2024-08-06 04:49:47

Spark自定义排序与分区的相关文章

MapReduce 学习4 ---- 自定义分区、自定义排序、自定义组分

1. map任务处理 1.3 对输出的key.value进行分区. 分区的目的指的是把相同分类的<k,v>交给同一个reducer任务处理. public static class MyPartitioner<Text, LongWritable> extends Partitioner<Text, LongWritable>{ static HashMap<String,Integer> map = null; static{ map = new Hash

Spark自定义分区(Partitioner)

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景.但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略.为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法: 01 package org.apache.spark 02 03 /** 04 * An object that defines how the element

spark自定义分区器实现

在spark中,框架默认使用的事hashPartitioner分区器进行对rdd分区,但是实际生产中,往往使用spark自带的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区.具体的流程步骤如下: 1.创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner 2.重写partitioner中的方法 override def numPartitions: Int = ??? override def

spark自定义分区器

1.spark中默认的分区器: Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数. 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的2. 参考博客:https://www.jianshu.

hadoop提交作业自定义排序和分组

现有数据如下: 3 3 3 2 3 1 2 2 2 1 1 1 要求为: 先按第一列从小到大排序,如果第一列相同,按第二列从小到大排序 如果是hadoop默认的排序方式,只能比较key,也就是第一列,而value是无法参与排序的 这时候就需要用到自定义的排序规则 解决思路: 自定义数据类型,将原本的key和value都包装进去 将这个数据类型当做key,这样就比较key的时候就可以包含第一列和第二列的值了 自定义数据类型NewK2如下: //要实现自定义的排序规则必须实现WritableComp

第19课:Spark高级排序彻底解密

本节课内容: 1.基础排序算法实战 2.二次排序算法实战 3.更高级别排序算法 4.排序算法内幕解密 排序在Spark运用程序中使用的比较多,且维度也不一样,如二次排序,三次排序等,在机器学习算法中经常碰到,所以非常重要,必须掌握! 所谓二次排序,就是根据两列值进行排序,如下测试数据: 2 3 4 1 3 2 4 3 8 7 2 1 经过二次排序后的结果(升序): 2 1 2 3 3 2 4 1 4 3 8 7 在编写二次排序代码前,先简单的写下单个key排序的代码: val conf = ne

Spark高级排序彻底解密(DT大数据梦工厂)

内容: 1.基础排序算法实战: 2.二次排序算法实战: 3.更高局级别排序算法: 4.排序算法内幕解密: 为啥讲排序?因为在应用的时候都有排序要求. 海量数据经常排序之后要我们想要的内容. ==========基础排序算法============ scala> sc.setLogLevel("WARN") scala> val x = sc.textFile("/historyserverforSpark/README.md", 3).flatMap(_

Hadoop读书笔记(十二)MapReduce自定义排序

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列 数据格式: 3 3 3 2 3 1 2 2 2 1 1 1 2.代码 SortApp.java package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOExc

自定义排序及Hadoop序列化

自定义排序 将两列数据进行排序,第一列按照升序排列,当第一列相同时,第二列升序排列. 在map和reduce阶段进行排序时,比较的是k2.v2是不参与排序比较的.如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较. 1 package sort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI;