Hadoop 学习自定义数据类型

(学习网易云课堂Hadoop大数据实战笔记)

序列化在分布式环境的两大作用:进程间通信,永久存储。

Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.

MR的任意Value必须实现Writable接口:

MR的key必须实现WritableComparable接口,WritableComparable继承自Writable和Comparable接口

(本节先讲自定义value值,下一节再讲自定义key值,根据key值进行自定义排序)

以一个例子说明,自定义数据类型(例子来源于学习的课程):

原始数据是由若干条下面数据组成:

数据格式及字段顺序如下:

现在要做的工作是以“手机号码”为关键字,计算同一个号码的upPackNum, downPackNum,upPayLoad,downPayLoad四个累加值。

运用MapReduce解决问题思路:

1、框架将数据分成<k1,v1>,k1是位置标记,v1表示一行数据;

2、map函数输入<k1,v1>,输入<k2,v2>,k2是选定数据的第1列(从0开始),v2是自定义的数据类型,包含第六、七、八、九列封装后的数据;

3、框架将<k2,v2>依据k2关键字进行map排序,然后进行combine过程,再进行Reduce段排序,得到<k2,list(v2...)>;

4、reduce函数处理<k2,list(v2...)>,以k2为关键字,计算list的内容。

要自定义的数据类型是Value值,因此要继承Writable接口,自定义数据类型如下

  

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class TrafficWritable implements Writable {

	long upPackNum, downPackNum,upPayLoad,downPayLoad;

	public TrafficWritable() { //这个构造函数不能省,否则报错
		super();
		// TODO Auto-generated constructor stub
	}

	public TrafficWritable(String upPackNum, String downPackNum, String upPayLoad,
			String downPayLoad) {
		super();
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void write(DataOutput out) throws IOException { //序列化
		// TODO Auto-generated method stub
		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException { //反序列化
		// TODO Auto-generated method stub
		this.upPackNum=in.readLong();
		this.downPackNum=in.readLong();
		this.upPayLoad=in.readLong();
		this.downPayLoad=in.readLong();
	}

	@Override
	public String toString() { //不加toStirng函数,最后输出内存的地址
		return upPackNum + "\t"+ downPackNum + "\t" + upPayLoad + "\t"
				+ downPayLoad;
	}

}

  

最后实现map函数和Reduce函数如下,基本框架和wordCount相同:

  

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TrafficCount {
	/**
	 * @author nwpulisz
	 * @date 2016.3.31
	 */
	static final String INPUT_PATH="hdfs://192.168.255.132:9000/input";
	static final String OUTPUT_PATH="hdfs://192.168.255.132:9000/output";

	public static void main(String[] args) throws Throwable {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Path outPut_path= new Path(OUTPUT_PATH);
		Job job = new Job(conf, "TrafficCount");

		//如果输出路径是存在的,则提前删除输出路径
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		if(fileSystem.exists(outPut_path))
		{
			fileSystem.delete(outPut_path,true);
		}

		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, outPut_path);

		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TrafficWritable.class);
		job.waitForCompletion(true);
	}

	static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{

		protected void map(LongWritable k1, Text v1,
                Context context) throws IOException, InterruptedException {
				String[] splits = v1.toString().split("\t");
				Text k2 = new Text(splits[1]);
				TrafficWritable v2 = new TrafficWritable(splits[6], splits[7],
						splits[8], splits[9]);
				context.write(k2, v2);

		}

	}

	static class MyReducer extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{

		protected void reduce(Text k2, Iterable<TrafficWritable> v2s, Context context
                ) throws IOException, InterruptedException {

			long upPackNum=0L, downPackNum=0L,upPayLoad=0L,downPayLoad=0L;
			for(TrafficWritable traffic: v2s) {
					upPackNum += traffic.upPackNum;
					downPackNum += traffic.downPackNum;
					upPayLoad += traffic.upPayLoad;
					downPayLoad += traffic.downPayLoad;
			}
			context.write(k2,new TrafficWritable(upPackNum+"",downPackNum+"",upPayLoad+"",
					downPayLoad+""));
			}
	}
}

  

最终输出结果如下:

附实验数据下载地址:https://yunpan.cn/cqcEy6QSzUEs7  访问密码 2fb1。数据来源:网易云课堂hadoop大数据实战

来自为知笔记(Wiz)

时间: 2024-11-05 16:55:08

Hadoop 学习自定义数据类型的相关文章

hadoop 学习自定义排序

(网易云课程hadoop大数据实战学习笔记) 自定义排序,是基于k2的排序,设现有以下一组数据,分别表示矩形的长和宽,先按照面积的升序进行排序. 99 66 78 11 54 现在需要重新定义数据类型,MR的key值必须继承WritableComparable接口,因此定义RectangleWritable数据类型如下: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import

hadoop 学习自定义分区

(网易云课程hadoop大数据实战学习笔记) 如图所示:有三个ReducerTask,因此处理完成之后的数据存储在三个文件中: 默认情况下,numReduceTasks的数量为1,前面做的实验中,输出数据都是在一个文件中.通过自定义myPatitioner类,可以把ruduce处理后的数据分类汇总,这里MyPartitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类.HashPartitioner是mapreduce的默认partitioner.计算方法

结合手机上网流量业务来说明Hadoop中的自定义数据类型(序列化、反序列化机制)

大家都知道,Hadoop中为Key的数据类型必须实现WritableComparable接口,而Value的数据类型只需要实现Writable接口即可:能做Key的一定可以做Value,能做Value的未必能做Key.但是具体应该怎么应用呢?--本篇文章将结合手机上网流量业务进行分析. 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 接下来贴出详细代码,代码中含有详细注释,从代码中可以看出,

Hadoop学习笔记—9.Partitioner与自定义Partitioner

一.初步探索Partitioner 1.1 再次回顾Map阶段五大步凑 在第四篇博文<初始MapReduce>中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步凑,如下图所示: 其中,step1.3就是一个分区操作.通过前面的学习我们知道Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并.哪个key到哪个Reducer的分配过程,是由Partition

Hadoop学习笔记—5.自定义类型处理手机上网日志

一.测试数据:手机上网日志 1.1 关于这个日志 假设我们如下一个日志文件,这个文件的内容是来自某个电信运营商的手机上网日志,文件的内容已经经过了优化,格式比较规整,便于学习研究. 该文件的内容如下(这里我只截取了三行): 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995033 15920133257 5C-

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

正文开始前 ,先介绍几个概念 序列化 所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储. 反序列化 是指将字节流转回到结构化对象的逆过程 序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储 在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 .RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息 Hadoop使用了自己写的序列

hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式

hadoop分割与读取输入文件的方式被定义在InputFormat接口的一个实现中,TextInputFormat是默认的实现,当你想要一次获取一行内容作为输入数据时又没有确定的键,从TextInputFormat返回的键为每行的字节偏移量,但目前没看到用过 以前在mapper中曾使用LongWritable(键)和Text(值),在TextInputFormat中,因为键是字节偏移量,可以是LongWritable类型,而当使用KeyValueTextInputFormat时,第一个分隔符前后

Hadoop学习笔记—7.计数器与自定义计数器

一.Hadoop中的计数器 计数器:计数器是用来记录job的执行进度和状态的.它的作用可以理解为日志.我们通常可以在程序的某个位置插入计数器,用来记录数据或者进度的变化情况,它比日志更便利进行分析. 例如,我们有一个文件,其中包含如下内容: hello you hello me 它被WordCount程序执行后显示如下日志: 在上图所示中,计数器有19个,分为四个组:File Output Format Counters.FileSystemCounters.File Input Format

Hadoop学习笔记(7) ——高级编程

Hadoop学习笔记(7) ——高级编程 从前面的学习中,我们了解到了MapReduce整个过程需要经过以下几个步骤: 1.输入(input):将输入数据分成一个个split,并将split进一步拆成<key, value>. 2.映射(map):根据输入的<key, value>进生处理, 3.合并(combiner):合并中间相两同的key值. 4.分区(Partition):将<key, value>分成N分,分别送到下一环节. 5.化简(Reduce):将中间结