spark自定义分区器实现

在spark中,框架默认使用的事hashPartitioner分区器进行对rdd分区,但是实际生产中,往往使用spark自带的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区。具体的流程步骤如下:

1、创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner

2、重写partitioner中的方法

  override def numPartitions: Int = ???   override def getPartition(key: Any): Int = ???

代码实现:测试数据集:
cookieid,createtime,pv
cookie1,2015-04-10,1
cookie1,2015-04-11,5
cookie1,2015-04-12,7
cookie1,2015-04-13,3
cookie1,2015-04-14,2
cookie1,2015-04-15,4
cookie1,2015-04-16,4
cookie2,2015-04-10,2
cookie2,2015-04-11,3
cookie2,2015-04-12,5
cookie2,2015-04-13,6
cookie2,2015-04-14,3
cookie2,2015-04-15,9
cookie2,2015-04-16,7

  指定按照第一个字段进行分区

步骤1:
package _core.sourceCodeLearning.partitioner

import org.apache.spark.Partitioner
import scala.collection.mutable.HashMap

/**
  * Author Mr. Guo
  * Create 2019/6/23 - 12:19
  */
class UDFPartitioner(args: Array[String]) extends Partitioner {

  private val partitionMap: HashMap[String, Int] = new HashMap[String, Int]()
  var parId = 0
  for (arg <- args) {
    if (!partitionMap.contains(arg)) {
      partitionMap(arg) = parId
      parId += 1
    }
  }

  override def numPartitions: Int = partitionMap.valuesIterator.length

  override def getPartition(key: Any): Int = {
    val keys: String = key.asInstanceOf[String]
    val sub = keys
    partitionMap(sub)
  }
}

  步骤2:

主类测试:

package _core.sourceCodeLearning.partitioner

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.sql.SparkSession

/**
  * Author Mr. Guo
  * Create 2019/6/23 - 12:21
  */
object UDFPartitionerMain {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc = ssc.sparkContext
    sc.setLogLevel("WARN")

    val rdd = ssc.sparkContext.textFile("file:///E:\\TestFile\\analyfuncdata.txt")
    val transform = rdd.filter(_.split(",").length == 3).map(x => {
      val arr = x.split(",")
      (arr(0), (arr(1), arr(2)))
    })
    val keys: Array[String] = transform.map(_._1).collect()
    val partiion = transform.partitionBy(new UDFPartitioner(keys))
    partiion.foreachPartition(iter => {
      println(s"**********分区号:${TaskContext.getPartitionId()}***************")
      iter.foreach(r => {
        println(s"分区:${TaskContext.getPartitionId()}###" + r._1 + "\t" + r._2 + "::" + r._2._1)
      })
    })
    ssc.stop()
  }
}

  运行结果:

这样就是按照第一个字段进行了分区,当然在分区器的中,对于key是可以根据自己的需求随意的处理,比如添加随机数等等

原文地址:https://www.cnblogs.com/Gxiaobai/p/11073381.html

时间: 2024-10-09 09:40:18

spark自定义分区器实现的相关文章

spark自定义分区器

1.spark中默认的分区器: Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数. 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的2. 参考博客:https://www.jianshu.

SPARK之分区器

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的 原文地址:https://www.cnblogs.com/xiangyuguan/p/1

Spark自定义分区(Partitioner)

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景.但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略.为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法: 01 package org.apache.spark 02 03 /** 04 * An object that defines how the element

spark自定义分区及示例代码

有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写入到part-00001 . . . 19写入到part-00009 给读者提供一个自定义分区的思路 import org.apache.spark.{Partitioner, SparkContext, SparkConf} //自定义分区类,需继承Partitioner类 class Usrid

kafka 自定义分区器

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; import java.util.List; import ja

Spark自定义排序与分区

Spark自定义排序与分区 前言: 随着信息时代的不断发展,数据成了时代主题,今天的我们徜徉在数据的海洋中:由于数据的爆炸式增长,各种数据计算引擎如雨后春笋般冲击着这个时代.作为时下最主流的计算引擎之一 Spark也是从各方面向时代展示自己的强大能力.Spark无论是在数据处理还是数据分析.挖掘方面都展现出了强大的主导能力.其分布式计算能力受到越来越多的青睐.本文将介绍spark的排序以及分区. 一.Spark自定义排序 在spark中定义了封装了很多高级的api,在我们的日常开发中使用这些ap

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

Kafka 分区分配计算(分区器 Partitions )

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段.在某些应用场景下,业务逻辑需要控制每条消息落到合适的分区中,有些情形下则只要根据默认的分配规则即可.在KafkaProducer计算分配时,首先根据的是ProducerRecord中的partition字段指定的序号计算分区.读者有可能刚睡醒,看到这个ProducerRecord似

大数据入门第二十二天——spark(三)自定义分区、排序与查找

一.自定义分区 1.概述 默认的是Hash的分区策略,这点和Hadoop是类似的,具体的分区介绍,参见:https://blog.csdn.net/high2011/article/details/68491115 2.实现 package cn.itcast.spark.day3 import java.net.URL import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext} import s