正文开始前 ,先介绍几个概念
序列化
所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储。
反序列化
是指将字节流转回到结构化对象的逆过程
序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储
在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息
Hadoop使用了自己写的序列化格式Writable,它格式紧凑,速度快,但是它很难用Java以外的语言进行拓展或使用,因为Writable是Hadoop的核心,大多数MapReduce程序都会为键和值使用它
简单来说,RPC协议是让程序员可以调用远程计算机进程上的代码的一套工具
打个比方 就是A通过网络调用B的某个进程方法
通信中的协议是由程序员自己规定的,比如你可以规定说当A向B发送数字1, B就打印hello hadoop, 并返回数字1给A, 如果发送数字2,B就打印hello world并发送数字2给A.
序列化------------>写 write(DataOutput out)
反序列化-------->读 readFields(DataInput in)
首先声明:我是基于Hadoop2.6.4版本
一. Hadoop内置的数据类型
BooleanWritable:标准布尔型数值
ByteWritable:单字节数值
DoubleWritable:双字节数值
FloatWritable:浮点数
IntWritable:整型数
LongWritable:长整型数
Text:使用UTF8格式存储的文本
NullWritable:当<key, value>中的key或value为空时使用
Hadoop中的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被网络传输和文件存储。
二. 用户自定义数据类型的实现
1.继承接口Writable,实现其方法write()和readFields(), 以便该数据能被序列化后完成网络传输或文件输入/输出;
2.如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritalbeComparable接口,实现其方法write(),readFields(),CompareTo() 。
3.数据类型,必须要有一个无参的构造方法,为了方便反射,进行创建对象。
4.在自定义数据类型中,建议使用java的原生数据类型,最好不要使用Hadoop对原生类型进行封装的数据类型。比如 int x ;//IntWritable 和String s; //Text 等等
下面是一个自定义的数据类型 3D坐标轴
package com.tg.type; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Point3D implements WritableComparable<Point3D> { public float x, y, z; public Point3D(float fx, float fy, float fz) { this.x = fx; this.y = fy; this.z = fz; } public Point3D() { this(0.0f, 0.0f, 0.0f); } public void readFields(DataInput in) throws IOException { x = in.readFloat(); y = in.readFloat(); z = in.readFloat(); } @Override public void write(DataOutput out) throws IOException { out.writeFloat(x); out.writeFloat(y); out.writeFloat(z); } public String toString() { return "X:"+Float.toString(x) + ", " + "Y:"+Float.toString(y) + ", " + "Z:"+Float.toString(z); } public float distanceFromOrigin() { return (float) Math.sqrt( x*x + y*y +z*z); } public int compareTo(Point3D other) { return Float.compare( distanceFromOrigin(), other.distanceFromOrigin()); } public boolean equals(Object o) { if( !(o instanceof Point3D)) { return false; } Point3D other = (Point3D) o; return this.x == other.x && this.y == other.y && this.z == other.z; } /* 实现 hashCode() 方法很重要 * Hadoop的Partitioners会用到这个方法,后面再说 */ public int hashCode() { return Float.floatToIntBits(x) ^ Float.floatToIntBits(y) ^ Float.floatToIntBits(z); } }
下面讲数据输入输出格式和自定义数据输入输出格式 ,然后把上面讲过的自定义数据类型整合进去
首先看看输入文件a.txt
数据输入格式(InputFormat) 用于描述MapReduce作业的数据输入规范。MapReduce框架依靠数据输入格式完成输入规范检查(比如输入文件目录的检查)、对数据文件进行输入分块(也叫分片,InputSplit),以及提供从输入分块(分片)中将数据记录逐一读出,并转化为Map过程的输入键值对等功能
Hadoop提供了丰富的内置数据输入格式。最常用的数据输入格式包括:TextInputFormat和KeyValueInputFormat
TextInputFormat是系统默认的数据输入格式,可以将文本文件分块并逐行读入以便Map节点进行处理。读入一行时,所产生的主键Key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容,它是系统默认的输入格式,当用户程序不设置任何数据输入格式时,系统自动使用这个数据输入格式。
比如如下文件内容
hello tanggao
hello hadoop
第一行的偏移量为0
第二行偏移量为13
KeyValueTextInputFormat是另一个常用的数据输入格式,可将一个按照<key,value>格式逐行存放的文本文件逐行读出,并自动解析生成相应的key和value
比如
姓名 汤高
年龄 20
则解析出来的
第一行键Key为姓名 值value为汤高
第二行键key为年龄 值value为20
注意和TextInputFormat不同,TextInputFormat是偏移量做键,整行内容做值
对于一个数据输入格式,都需要一个对应的RecordReader。RecordReader。主要用于将一个文件中的数据记录分拆成具体的键值对,传送给Map过程作为键值对输入参数。每一个数据输入格式都有一个默认的RecordReader。TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader
当然肯定还有很多数据输入格式和对应的默认RecordReader 这里就不接受了,有需要的可以去官网看看
数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范。MapReduce框架依靠数据输出格式完成输出规范检查(蔽日检查输出目录是否存在),以及提供作业结果数据输出等功能
Hadoop提供了丰富的内置数据输出格式。最常用的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,可以将计算结果以 key+\t+value的形式逐行输出到文本文件中。
与数据输入格式中的RecordReader类似,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。
TextOutputFormat的默认RecordWriter是LineRecordWriter,其实际操作是将结果数据以key+\t+value的形式输出到文本文件中。
当然同样肯定还有很多数据输出格式和对应的默认RecordWriter
对于自定义数据输入格式 可以参考已有的数据输入格式,继承自它即可,只要重写GetRecordReader方法得到一个自己写的RecordReader即可
我的是仿造KeyValueTextInputFormat和它的KeyValueLineRecordReader来自定义自己的输入格式的,所以我都是自己复制了上面两个类的源码然后进行自己的改写
package com.my.input; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class myInputFormat extends FileInputFormat<Text,Text> { //用来压缩的 @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(genericSplit.toString()); return new MyRecordReader(context.getConfiguration()); } }
他的RecordReader
package com.my.input; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; @InterfaceAudience.Public @InterfaceStability.Stable public class MyRecordReader extends RecordReader<Text, Text> { public static final String KEY_VALUE_SEPERATOR = "mapreduce.input.mylinerecordreader.key.value.separator"; private final LineRecordReader lineRecordReader; //源码是根据\t分割 我改为了我自己的需求为=号分割 private byte separator = (byte) '='; private Text innerValue; private Text key; private Text value; public Class<Text> getKeyClass() { return Text.class; } public MyRecordReader(Configuration conf) throws IOException { lineRecordReader = new LineRecordReader(); String sepStr = conf.get(KEY_VALUE_SEPERATOR, "="); this.separator = (byte) sepStr.charAt(0); } public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { lineRecordReader.initialize(genericSplit, context); } public static int findSeparator(byte[] utf, int start, int length, byte sep) { for (int i = start; i < (start + length); i++) { if (utf[i] == sep) { return i; } } return -1; } public static void setKeyValue(Text key, Text value, byte[] line, int lineLen, int pos) { if (pos == -1) { key.set(line, 0, lineLen); value.set(""); } else { key.set(line, 0, pos); value.set(line, pos + 1, lineLen - pos - 1); } } /** Read key/value pair in a line. */ public synchronized boolean nextKeyValue() throws IOException { byte[] line = null; int lineLen = -1; if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) return false; if (key == null) { key = new Text(); } if (value == null) { value = new Text(); } int pos = findSeparator(line, 0, lineLen, this.separator); setKeyValue(key, value, line, lineLen, pos); return true; } public Text getCurrentKey() { return key; } public Text getCurrentValue() { return value; } public float getProgress() throws IOException { return lineRecordReader.getProgress(); } public synchronized void close() throws IOException { lineRecordReader.close(); } }
对于自定义数据输出格式 可以参考已有的数据输出格式,继承自它即可,只要重写GetRecordWriter方法得到一个自己写的RecordWriter即可
一种实现:我们一般继承自FileOutputFormat来改写
因为FileOutputFormat已经帮我们实现了许多通用的功能
package com.my.input; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class MyOutputFormat<K, V> extends FileOutputFormat<K, V> { public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; protected static class MyLineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } //改写了源码 把\t改为了=========> public MyLineRecordWriter(DataOutputStream out) { this(out, "=========>"); } /** * Write the object to the byte stream, handling Text as a special case. * * @param o * the object to print * @throws IOException * if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = conf.get(SEPERATOR, "=========>"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new MyLineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new MyLineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } }
第二种实现:FileOutputFormat的一个重要子类就是TextOutputFormat,我们也可以继承它,然后重写getRecordWriter方法即可
package com.my.input; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class MyOutputFormat2<K, V> extends TextOutputFormat<K, V> { @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); //改写了源码 把\t改为了=========> String keyValueSeparator = conf.get(SEPERATOR, "=========>"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } }
最后就可以用来测试了
测试代码
package com.my.input; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.tg.type.Point3D; public class Point3DDriver { /** * * @author 汤高 * Point3D为自定义数据类型 把它作为map的输出类型 * */ // Map过程 static int count=0; public static class MyMapper extends Mapper<Text, Text, Text, Point3D> { /*** * */ @Override protected void map(Text key, Text value, Mapper<Text, Text, Text, Point3D>.Context context) throws IOException, InterruptedException { count++; //这里得到的键是自定义输入格式输出的内容 本例是 One 、two、three //这里得到的值是X:1.0, Y:2.0, Z:3.0 等 //根据都好截取值里面的内容 分别设置到自定义数据类型Point3D里面去 String[] vs = value.toString().split(","); Point3D p = new Point3D(Float.parseFloat(vs[0].split(":")[1]), Float.parseFloat(vs[1].split(":")[1]), Float.parseFloat(vs[2].split(":")[1]) ); // 写出去 把自定义数据类型输出去 context.write(new Text(key), p); System.out.println("几个map==========>"+count); } } //Reduce过程 public static class MyReducer extends Reducer<Text, Point3D, Text, Point3D>{ protected void reduce(Text key, Point3D value, Reducer<Text, Point3D, Text, Point3D>.Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) { try { Configuration conf = new Configuration(); String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs(); if (paths.length < 2) { throw new RuntimeException("usage <input> <output>"); } Job job = Job.getInstance(conf, "Point3DDriver"); job.setJarByClass(Point3DDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setInputFormatClass(myInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Point3D.class); job.setOutputFormatClass(MyOutputFormat.class); //job.setOutputFormatClass(MyOutputFormat2.class); FileInputFormat.addInputPaths(job, paths[0]); FileOutputFormat.setOutputPath(job, new Path(paths[1] + System.currentTimeMillis()));// 整合好结果后输出的位置 System.exit(job.waitForCompletion(true) ? 0 : 1);// 执行job } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
对于编写Map函数和Reduce函数不熟悉的朋友,可以参看我上篇博客 里面讲解了如何实现MapReduce编程
结果:
码字不易,转载请指明出自 http://blog.csdn.net/tanggao1314/article/details/51305852