Spark自定义分区(Partitioner)

我们都知道Spark内部提供了HashPartitionerRangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:

01
    package org.apache.spark
02

03
    /**
04
     * An object that defines how the elements in a key-value pair RDD are partitioned by key.
05
     * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
06
     */
07
    abstract class Partitioner extends Serializable {
08
      def numPartitions: Int
09
      def getPartition(key: Any): Int
10
    }

def numPartitions: Int:这个方法需要返回你想要创建分区的个数;

def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1

equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。

  假如我们想把来自同一个域名的URL放到一台节点上,比如:http://www.iteblog.comhttp://www.iteblog.com/archives/1368,如果你使用HashPartitioner,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,可以如下实现:

01
    package com.iteblog.utils
02

03
    import org.apache.spark.Partitioner
04

05
    /**
06
     * User: 过往记忆
07
     * Date: 2015-05-21
08
     * Time: 下午23:34
09
     * bolg: http://www.iteblog.com
10
     * 本文地址:http://www.iteblog.com/archives/1368
11
     * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
12
     * 过往记忆博客微信公共帐号:iteblog_hadoop
13
     */
14

15
    class IteblogPartitioner(numParts: Int) extends Partitioner {
16
      override def numPartitions: Int = numParts
17

18
      override def getPartition(key: Any): Int = {
19
        val domain = new java.net.URL(key.toString).getHost()
20
        val code = (domain.hashCode % numPartitions)
21
        if (code < 0) {
22
          code + numPartitions
23
        } else {
24
          code
25
        }
26
      }
27

28
      override def equals(other: Any): Boolean = other match {
29
        case iteblog: IteblogPartitioner =>
30
          iteblog.numPartitions == numPartitions
31
        case _ =>
32
          false
33
      }
34

35
      override def hashCode: Int = numPartitions
36
    }

因为hashCode值可能为负数,所以我们需要对他进行处理。然后我们就可以在partitionBy()方法里面
使用我们的分区:

1
    iteblog.partitionBy(new IteblogPartitioner(20))

  类似的,在Java中定义自己的分区策略和Scala类似,只需要继承org.apache.spark.Partitioner,并实现其中的方法即可。

  在Python中,你不需要扩展Partitioner类,我们只需要对iteblog.partitionBy()加上一个额外的hash函数,如下:

1
    import urlparse
2

3
    def iteblog_domain(url):
4
      return hash(urlparse.urlparse(url).netloc)
5

6
    iteblog.partitionBy(20, iteblog_domain)
时间: 2024-10-29 20:42:30

Spark自定义分区(Partitioner)的相关文章

spark自定义分区及示例代码

有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写入到part-00001 . . . 19写入到part-00009 给读者提供一个自定义分区的思路 import org.apache.spark.{Partitioner, SparkContext, SparkConf} //自定义分区类,需继承Partitioner类 class Usrid

spark自定义分区器实现

在spark中,框架默认使用的事hashPartitioner分区器进行对rdd分区,但是实际生产中,往往使用spark自带的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区.具体的流程步骤如下: 1.创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner 2.重写partitioner中的方法 override def numPartitions: Int = ??? override def

Hadoop自定义分区Partitioner

一:背景 为了使得MapReduce计算后的结果显示更加人性化,Hadoop提供了分区的功能,可以使得MapReduce计算结果输出到不同的分区中,方便查看.Hadoop提供的Partitioner组件可以让Map对Key进行分区,从而可以根据不同key来分发到不同的reduce中去处理,我们可以自定义key的分发规则,如数据文件包含不同的省份,而输出的要求是每个省份对应一个文件. 二:技术实现 自定义分区很简单,我们只需要继承抽象类Partitioner,实现自定义的getPartitione

spark自定义分区器

1.spark中默认的分区器: Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数. 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的2. 参考博客:https://www.jianshu.

Spark自定义排序与分区

Spark自定义排序与分区 前言: 随着信息时代的不断发展,数据成了时代主题,今天的我们徜徉在数据的海洋中:由于数据的爆炸式增长,各种数据计算引擎如雨后春笋般冲击着这个时代.作为时下最主流的计算引擎之一 Spark也是从各方面向时代展示自己的强大能力.Spark无论是在数据处理还是数据分析.挖掘方面都展现出了强大的主导能力.其分布式计算能力受到越来越多的青睐.本文将介绍spark的排序以及分区. 一.Spark自定义排序 在spark中定义了封装了很多高级的api,在我们的日常开发中使用这些ap

大数据入门第二十二天——spark(三)自定义分区、排序与查找

一.自定义分区 1.概述 默认的是Hash的分区策略,这点和Hadoop是类似的,具体的分区介绍,参见:https://blog.csdn.net/high2011/article/details/68491115 2.实现 package cn.itcast.spark.day3 import java.net.URL import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext} import s

Spark 学习(四)RDD自定义分区和缓存

一,简介 二,自定义分区规则 2.1 普通的分组TopN实现 2.2 自定义分区规则TopN实现 三,RDD的缓存 3.1 RDD缓存简介 3.2 RDD缓存方式 正文 一,简介 在之前的文章中,我们知道RDD的有一个特征:就是一组分片(Partition),即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度.用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值.默认值就是程序所分配到的CPU Core的数目.这个分配的规则我们是

hadoop编程小技巧(3)---自定义分区类Partitioner

Hadoop代码测试环境:Hadoop2.4 原理:在Hadoop的MapReduce过程中,Mapper读取处理完成数据后,会把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下: /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numRe

kafka 自定义分区器

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; import java.util.List; import ja