自定义排序及Hadoop序列化

自定义排序

将两列数据进行排序,第一列按照升序排列,当第一列相同时,第二列升序排列。

在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。

  1 package sort;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.io.WritableComparable;
 14 import org.apache.hadoop.mapreduce.Job;
 15 import org.apache.hadoop.mapreduce.Mapper;
 16 import org.apache.hadoop.mapreduce.Reducer;
 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 22
 23 public class SortApp {
 24     static final String INPUT_PATH = "hdfs://chaoren:9000/input";
 25     static final String OUT_PATH = "hdfs://chaoren:9000/out";
 26
 27     public static void main(String[] args) throws Exception {
 28         final Configuration configuration = new Configuration();
 29
 30         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),
 31                 configuration);
 32         if (fileSystem.exists(new Path(OUT_PATH))) {
 33             fileSystem.delete(new Path(OUT_PATH), true);
 34         }
 35
 36         final Job job = new Job(configuration, SortApp.class.getSimpleName());
 37
 38         // 1.1 指定输入文件路径
 39         FileInputFormat.setInputPaths(job, INPUT_PATH);
 40         // 指定哪个类用来格式化输入文件
 41         job.setInputFormatClass(TextInputFormat.class);
 42
 43         // 1.2指定自定义的Mapper类
 44         job.setMapperClass(MyMapper.class);
 45         // 指定输出<k2,v2>的类型
 46         job.setMapOutputKeyClass(NewK2.class);
 47         job.setMapOutputValueClass(LongWritable.class);
 48
 49         // 1.3 指定分区类
 50         job.setPartitionerClass(HashPartitioner.class);
 51         job.setNumReduceTasks(1);
 52
 53         // 1.4 TODO 排序、分区
 54
 55         // 1.5 TODO (可选)合并
 56
 57         // 2.2 指定自定义的reduce类
 58         job.setReducerClass(MyReducer.class);
 59         // 指定输出<k3,v3>的类型
 60         job.setOutputKeyClass(LongWritable.class);
 61         job.setOutputValueClass(LongWritable.class);
 62
 63         // 2.3 指定输出到哪里
 64         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
 65         // 设定输出文件的格式化类
 66         job.setOutputFormatClass(TextOutputFormat.class);
 67
 68         // 把代码提交给JobTracker执行
 69         job.waitForCompletion(true);
 70     }
 71
 72     static class MyMapper extends
 73             Mapper<LongWritable, Text, NewK2, LongWritable> {
 74         protected void map(
 75                 LongWritable key,
 76                 Text value,
 77                 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context)
 78                 throws java.io.IOException, InterruptedException {
 79             final String[] splited = value.toString().split("\t");
 80             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]),
 81                     Long.parseLong(splited[1]));
 82             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
 83             context.write(k2, v2);
 84         };
 85     }
 86
 87     static class MyReducer extends
 88             Reducer<NewK2, LongWritable, LongWritable, LongWritable> {
 89         protected void reduce(
 90                 NewK2 k2,
 91                 java.lang.Iterable<LongWritable> v2s,
 92                 org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context)
 93                 throws java.io.IOException, InterruptedException {
 94             context.write(new LongWritable(k2.first), new LongWritable(
 95                     k2.second));
 96         };
 97     }
 98
 99     /**
100      * 问:为什么实现该类? 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
101      *
102      */
103     // WritableComparable:Hadoop的序列化
104     static class NewK2 implements WritableComparable<NewK2> {
105         Long first;
106         Long second;
107
108         public NewK2() {
109         }
110
111         public NewK2(long first, long second) {
112             this.first = first;
113             this.second = second;
114         }
115
116         public void readFields(DataInput in) throws IOException {
117             this.first = in.readLong();
118             this.second = in.readLong();
119         }
120
121         public void write(DataOutput out) throws IOException {
122             out.writeLong(first);
123             out.writeLong(second);
124         }
125
126         /**
127          * 当k2进行排序时,会调用该方法. 当第一列不同时,升序;当第一列相同时,第二列升序
128          */
129         public int compareTo(NewK2 o) {
130             final long minus = this.first - o.first;
131             if (minus != 0) {
132                 return (int) minus;
133             }
134             return (int) (this.second - o.second);
135         }
136
137         @Override
138         public int hashCode() {
139             return this.first.hashCode() + this.second.hashCode();
140         }
141
142         @Override
143         public boolean equals(Object obj) {
144             if (!(obj instanceof NewK2)) {
145                 return false;
146             }
147             NewK2 oK2 = (NewK2) obj;
148             return (this.first == oK2.first) && (this.second == oK2.second);
149         }
150     }
151
152 }

Hadoop序列化

序列化概念:

  序列化:把结构化对象转化为字节流。

  反序列化:是序列化的逆过程。即把字节流转回结构化对象。

Hadoop序列化的特点:

  1、紧凑:高效使用存储空间。

  2、快速:读写数据的额外开销小。

  3、可扩展:可透明的读取老格式的数据。

  4、互操作:支持多语言的交互。

Hadoop的序列化格式:Writable

Hadoop序列化的作用:

  序列化在分布式环境的两大作用:进程间通信,永久存储。

  Hadoop节点间通信:

  

Writable接口

  Writable接口,是根据DataInput和DataOutput实现的简单、有效的序列化对象。

  MR的任意key和value必须实现Writable接口。

  MR的任意key必须实现WritableComparable接口。

自定义Writable类(上面代码中有)

  实现Writable:

        1、write是把每个对象序列化到输出流。

          2、readFields是把输入流字节反序列化。

  实现WritableComparable:

        Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法。

 

时间: 2024-10-12 22:04:45

自定义排序及Hadoop序列化的相关文章

Hadoop之--&gt;自定义排序

data: 3 33 23 12 22 11 1 --------------------- 需求: 1 12 12 23 13 23 3 package sort; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path

Hadoop学习之路(7)MapReduce自定义排序

本文测试文本: tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000    MapReduce中,根据key进行分区.排序.分组MapReduce会按照基本类型对应的key进行排序,如int类型的IntWritable,long类型的LongWritable,Text类型,默认升序排序   为什么要自定义排序规则?现有需求,需要自定义key类型,

hadoop 学习自定义排序

(网易云课程hadoop大数据实战学习笔记) 自定义排序,是基于k2的排序,设现有以下一组数据,分别表示矩形的长和宽,先按照面积的升序进行排序. 99 66 78 11 54 现在需要重新定义数据类型,MR的key值必须继承WritableComparable接口,因此定义RectangleWritable数据类型如下: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import

Hadoop序列化机制及实例

序列化 1.什么是序列化? 将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程. 2.什么是反序列化? 将字节流转换为一系列结构化对象的过程. 序列化用途: 1.作为一种持久化格式. 2.作为一种通信的数据格式. 3.作为一种数据拷贝.克隆机制. Java序列化和反序列化 1.创建一个对象实现了Serializable 2.序列化:ObjectOutputStream.writeObject(序列化对象) 反序列化:ObjectInputStream.readObject()返回序列

Java 和 Hadoop 序列化机制浅讲

1.序列化 序列化 (Serialization)将对象的状态信息转换为可以存储或传输的形式的过程(字节流).在序列化期间,对象将其当前状态写入到临时或持久性存储区.以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象. 通常来说有三个用途: 持久化:对象可以被存储到磁盘上 通信:对象可以通过网络进行传输 拷贝.克隆:可以通过将某一对象序列化到内存的缓冲区,然后通过反序列化生成该对象的一个深拷贝(破解单例模式的一种方法) 2.Java序列化机制 在Java中要实现序列化,只需要实现S

大数据-Hadoop生态(12)-Hadoop序列化和源码追踪

1.什么是序列化 2.为什么要序列化 3.为什么不用Java的序列化 4.自定义bean对象实现序列化接口(Writable) 在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口. 具体实现bean对象序列化步骤如下7步: 1) 必须实现Writable接口 2) 反序列话时,需要反射调用无参构造方法,所以必须要有无参构造方法 3) 重写序列化方法write() 4) 重写反序列化方法readFields() 5)

Hadoop序列化与Writable源码分析

序列化的概念     1.序列化(Serialization)是指把结构化对象转化为字节流.     2.反序列化(Deserialization)是序列化的逆过程,即把字节流转回结构化对象 Hadoop序列化的特点     1.序列化格式特点         ——紧凑:高效使用 存储空间         ——快速:读写数据的额外开销小         ——可扩展:可透明地读取老格式的数据         ——互操作:支持多语言的交互注:hadoop1.x的序列化仅满足了紧凑和快速的特点. 2.

一步一步跟我学习lucene(13)---lucene搜索之自定义排序的实现原理和编写自己的自定义排序工具

自定义排序说明 我们在做lucene搜索的时候,可能会需要排序功能,虽然lucene内置了多种类型的排序,但是如果在需要先进行某些值的运算然后在排序的时候就有点显得无能为力了: 要做自定义查询,我们就要研究lucene已经实现的排序功能,lucene的所有排序都是要继承FieldComparator,然后重写内部实现,这里以IntComparator为例子来查看其实现: IntComparator相关实现 其类的声明为 public static class IntComparator exte

Hadoop序列化与Writable接口(一)

Hadoop序列化与Writable接口(一) 序列化 序列化(serialization)是指将结构化的对象转化为字节流,以便在网络上传输或者写入到硬盘进行永久存储:相对的反序列化(deserialization)是指将字节流转回到结构化对象的过程. 在分布式系统中进程将对象序列化为字节流,通过网络传输到另一进程,另一进程接收到字节流,通过反序列化转回到结构化对象,以达到进程间通信.在Hadoop中,Mapper,Combiner,Reducer等阶段之间的通信都需要使用序列化与反序列化技术.