Hadoop中WritableComparable 和 comparator

1.WritableComparable

查看HadoopAPI,如图所示:

WritableComparable继承自Writable和java.lang.Comparable接口,是一个Writable也是一个Comparable,也就是说,既可以序列化,也可以比较!

再看看它的实现类,发现BooleanWritable, BytesWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, MD5Hash, NullWritable, Record, RecordTypeInfo, Text, VIntWritable, VLongWritable都实现了WritableComparable类!

WritableComparable的实现类之间相互来比较,在Map/Reduce中,任何用作键来使用的类都应该实现WritableComparable接口!

Example:

 1 package cn.roboson.writable;
 2
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6
 7 import org.apache.hadoop.io.WritableComparable;
 8
 9 /**
10  * 1.自定义一个类,继承WritableComparable
11  * 2.发现有三个未实现的方法,两个是Writable接口的(序列化),一个是Comparable接口的(用来比较)
12  * 3.自定义比较,这里以counter来作为比较
13  * @author roboson
14  *
15  */
16 public class MyWritableComparable implements WritableComparable<MyWritableComparable>{
17
18     private int counter;
19     private long timestamp;
20     public MyWritableComparable() {
21         // TODO Auto-generated constructor stub
22     }
23
24     public MyWritableComparable(int counter,long timestamp) {
25         // TODO Auto-generated constructor stub
26         this.counter = counter;
27         this.timestamp = timestamp;
28     }
29
30     @Override
31     public void readFields(DataInput in) throws IOException {
32         // TODO Auto-generated method stub
33
34         //将输入流中的字节流数据转化为结构化数据
35         counter = in.readInt();
36         timestamp = in.readLong();
37     }
38
39     @Override
40     public void write(DataOutput out) throws IOException {
41         // TODO Auto-generated method stub
42
43         //讲结构化数据写入输出流
44         out.writeInt(counter);
45         out.writeLong(timestamp);
46     }
47
48     @Override
49     public int compareTo(MyWritableComparable other) {
50         // TODO Auto-generated method stub
51         int thisValue = this.counter;
52         int otherValue = other.counter;
53         return (thisValue < otherValue ? -1 : (thisValue == otherValue ? 0 : 1));
54     }
55
56     public int getCounter() {
57         return counter;
58     }
59
60     public void setCounter(int counter) {
61         this.counter = counter;
62     }
63
64     public long getTimestamp() {
65         return timestamp;
66     }
67
68     public void setTimestamp(long timestamp) {
69         this.timestamp = timestamp;
70     }
71
72
73     public static void main(String[] args) {
74         MyWritableComparable comparable = new MyWritableComparable(3,4);
75         MyWritableComparable otherComparable = new MyWritableComparable(4, 5);
76         int value = comparable.compareTo(otherComparable);
77         if(value==-1){
78             System.out.println("comparable<otherComparable");
79         }else if(value==0){
80             System.out.println("comparable=otherComparable");
81         }else{
82             System.out.println("comparable>otherComparable");
83         }
84     }
85 }

运行结果:

2.RawComparator

对于MapReduce来说,因为中间有个基于键的排序阶段,所以类型的比较是非常重要的。Hadoop中提供了原生的比较接口RawComparator,该接口继承子Java Comparator接口。RawComparator接口允许其实现直接比较数据流中的记录,无需先把数据流饭序列化为对象,这样便避免了新建对象的额外开销。

 1 package org.apache.hadoop.io;
 2
 3 import java.util.Comparator;
 4
 5 public interface RawComparator<T> extends Comparator<T>{
 6
 7     //自己的方法
 8     public int compare(byte[] b1, int s1, int l1, byte[] b2,int s2, int l2);
 9
10     //继承自Comparator的方法
11     @Override
12     public int compare(T o1, T o2);
13
14     @Override
15     public boolean equals(Object obj);
16 }

查看HadoopAPI:

该类并非被多数的衍生类所实现,其具体的子类为WritableComparator,多数情况下是作为实现Writable接口的类的内置类,提供序列化字节的比较。如下图说所示:BooleanWritable, BytesWritable, ByteWritable, org.apache.hadoop.io.serializer.DeserializerComparator, DoubleWritable, FloatWritable, IntWritable, JavaSerializationComparator, LongWritable, LongWritable, MD5Hash, NullWritable, RecordComparator, Text, UTF8,都实现了RawComparator,作为其内部类。

而WritableComparator则是其的具体子类。

3.WritableComparator

在《Hadoop权威指南》中,说到这儿,很模糊,只说WritableComparator是对继承自WritableComparable类的RawCompartor类的一个通用实现。让人看着很迷惑,这句话什么意思呢?

首先、在第二个小标题RawComparator中,我门都知道WritableComparator实现了RawComparator这个接口,也就是说,WritableComparator是RawComparator的实现。

其次、是对继承自WritableComparable类的RawComparator的一个通用实现。那么继承自WritableComparable类的RawComparator都有哪些呢?也就是说那些类,继承自WritableComparator,并且实现了RawComparator?在第二个小标题RawComparator中有也都说明清楚了,上面的红色部分!同理,实现了WritableComparable类的在第一个小标题WritableComparable中也有说明,红色部分字体!也就谁说WritableComparator是对BooleanWritable.Comparator, BytesWritable.Comparator, ByteWritable.Comparator, DoubleWritable.Comparator, FloatWritable.Comparator, IntWritable.Comparator, LongWritable.Comparator, MD5Hash.Comparator, NullWritable.Comparator, RecordComparator, Text.Comparator, UTF8.Comparator这些类的一个通用实现!这句话就引出了WritableComparator的两个功能:第一,它提供了对原始compare()方法的一个默认实现。该方法能够饭序列化将流中进行比较的对象,并调用对象的compara()方法。第二,它充当的是RawComparator实例的工厂(已注册Writable的实现)。例如,为了获得IntWratable的comparator,我们直接如下调用:

RawComparator<IntWritable> comparator = WritableComparator.get(IntWratable.class);

再来看看WritableComparator这个类是如何定义的,如下图所示:

WritableComparator类类似于一个注册表,里面记录了所有Comparator类的集合。Comparators成员用一张Hash表记录Key=Class,value=WritableComprator的注册信息.这就是它能够充当RawComparator实例工厂的原因!因为它本省的实现中有意个HashMap集合,HashMap<Class,WritableComparator>根据对应的Class,就能返回一个响应的WritableComparator!

Example:

 1 package cn.roboson.writable;
 2
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.DataInputStream;
 6 import java.io.DataOutputStream;
 7 import java.io.IOException;
 8
 9 import org.apache.hadoop.io.IntWritable;
10 import org.apache.hadoop.io.RawComparator;
11 import org.apache.hadoop.io.Writable;
12 import org.apache.hadoop.io.WritableComparator;
13
14 /**
15  * 1.通过WritableComparator获得IntWritable类的RawComparator实例
16  * 2.通过两种方式来比较
17  * @author roboson
18  *
19  */
20
21 public class ComparableFinish {
22
23     public static void main(String[] args) throws IOException {
24
25         //创建两个IntWritable来比较
26         IntWritable writable1 = new IntWritable(163);
27         IntWritable writable2 = new IntWritable(165);
28
29         //获得IntWritable的RawComparator实例
30         RawComparator<IntWritable> intRawComparator = WritableComparator.get(IntWritable.class);
31
32         //直接比较对象
33         int value1 =intRawComparator.compare(writable1, writable2);
34
35         if(value1==-1){
36             System.out.println("writable1<writable2");
37         }else if(value1==0){
38             System.out.println("writable1=writable2");
39         }else{
40             System.out.println("writable1>writable2");
41         }
42
43         //序列化两个对象,获得其字节流
44         byte[] byte1 = serizlize(writable1);
45         byte[] byte2 = serizlize(writable2);
46
47         //直接通过字符流比较大小
48         int value2 = intRawComparator.compare(byte1, 0, 4, byte2, 0, 4);
49         if(value2==-1){
50             System.out.println("writable1<writable2");
51         }else if(value2==0){
52             System.out.println("writable1=writable2");
53         }else{
54             System.out.println("writable1>writable2");
55         }
56     }
57
58     public static byte[] serizlize(Writable writable) throws IOException{
59
60         //创建一个输出字节流对象
61         ByteArrayOutputStream out = new ByteArrayOutputStream();
62         DataOutputStream dataout = new DataOutputStream(out);
63
64         //将结构化数据的对象writable写入到输出字节流。
65         writable.write(dataout);
66         return out.toByteArray();
67     }
68
69     public static byte[] deserizlize(Writable writable,byte[] bytes) throws IOException{
70
71         //创建一个输入字节流对象,将字节数组中的数据,写入到输入流中
72         ByteArrayInputStream in = new ByteArrayInputStream(bytes);
73         DataInputStream datain = new DataInputStream(in);
74
75         //将输入流中的字节流数据反序列化
76         writable.readFields(datain);
77         return bytes;
78
79     }
80 }

运行结果:

关于序列化方面的知识,可以参考我的博客《Hadoop序列化》地址如下:

http://www.cnblogs.com/robert-blue/p/4157768.html

参考博文:

http://blog.csdn.net/keda8997110/article/details/8518255

http://www.360doc.com/content/12/0827/09/9318309_232551844.shtml

时间: 2025-01-11 19:06:39

Hadoop中WritableComparable 和 comparator的相关文章

Hadoop中Comparator原理

在前面的博文<Hadoop中WritableComparable 和 comparator>中,对于WritableComparator说的不够细致,下面说说具体的实现原理! 1.WritableComparator主要提供了两个功能: 提供了对原始compara()方法的一个默认实现,默认实现是先反序列化成对象,在对对象进行比较 1 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 2 3 t

Hadoop中Writable类之四

1.定制Writable类型 Hadoop中有一套Writable实现,例如:IntWritable.Text等,但是,有时候可能并不能满足自己的需求,这个时候,就需要自己定制Writable类型. 定制分以下几步: 需要实现WritableComparable接口,因为Writable常常作为健值对出现,而在MapReduce中,中间有个排序很重要,因此,Hadoop中就让Writable实现了WritableComparable 需要实现WritableComparable的write().

java.lang.Comparable, java.util.Compartor区别以及Hadoop中关于自定义类型中的compare方法

public interface Comparable<T> { public int compareTo(T o); } 规定了对象内部比较的方法 public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); } 定义外部比较器的基本方法,其中equals是用来确定两个比较器是否相等. 关于对象内部比较和外部比较这两个接口的区别和使用场景如下: 个人总结: Compara

hadoop中实现定制Writable类

Hadoop中有一套Writable实现可以满足大部分需求,但是在有些情况下,我们需要根据自己的需要构造一个新的实现,有了定制的Writable,我们就可以完全控制二进制表示和排序顺序. 为了演示如何新建一个定制的writable类型,我们需要写一个表示一对字符串的实现: blic class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public Te

hadoop中compare函数

在看hadoop  的二次排序的时候,改写了下, 加了第三个参数,  本来以为是在 public int compareTo(IntPair o) { System.out.println("-----------compareTo"); if (first != o.first) { return first < o.first ? -1 : 1; } else if (second != o.second) { return second < o.second ? -1

结合手机上网流量业务来说明Hadoop中的自定义数据类型(序列化、反序列化机制)

大家都知道,Hadoop中为Key的数据类型必须实现WritableComparable接口,而Value的数据类型只需要实现Writable接口即可:能做Key的一定可以做Value,能做Value的未必能做Key.但是具体应该怎么应用呢?--本篇文章将结合手机上网流量业务进行分析. 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 接下来贴出详细代码,代码中含有详细注释,从代码中可以看出,

Hadoop中两表JOIN的处理方法(转)

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

hadoop 中对Vlong 和 Vint的压缩方法

hadoop 中对java的基本类型进行了writeable的封装,并且所有这些writeable都是继承自WritableComparable的,都是可比较的:并且,它们都有对应的get() 和 set()方法, 其中对整型(int 和 long)进行编码的时候,有固定长度格式(intWritable和LongWritable)和可变长度格式(VIntWritable 和 VLongWritable),其中VIntWritable和VLongWritable的编码规则是一样的, 所以VIntW

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制.分区机制,先介绍一下业务场景: 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 首先我们先通过mapreduce程序实现上面的业务逻辑: 代码实现: package FlowSum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOE