Hadoop.2.x_高级应用_二次排序及MapReduce端join

一、对于二次排序案例部分理解

1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序)
    杂乱的原始数据           排序完成的数据
        a,1                     a,1
        b,1                     a,2
        a,2       [排序]        a,100
        b,6        ===>         b,-3
        c,2                     b,-2
        b,-2                    b,1
        a,100                   b,6
        b,-3                    c,-7
        c,-7                    c,2
2. 分析[MapRedice过程]
     1> 分析数据传入通过input()传入map()
     2> map()对数据进行层层过滤,以达到我们想要的数据源,
     3> 过滤方法中可添加自定义计数器
     4> 过滤后写入context,转入shuffle阶段
     5> 可以说大部分shuffle阶段是map()端的shuffle
     6> 具体shullfe中,数据经过默认分区(hashPartitioner),而默认分区规则是获取
        (key.getHashCode() & Integer.MAX_VALUE)%numReudeceTasks;当然默认reduce数目就一个,
        reduce输出的文件也就一个,我是这样认为的,经过输出测试,就算你设置了自定义的分区,但你的partition数目
        并没设置,仍然走默认分区
     7> 分区之后对是分区的一个排序,再对分区中的数据进行排序,排序规则按照key排序,我们可以自定义数据类型对其
        设置排序规则,比如二次排序,可以自定义一个组合的key,在组合key中定义根据第一个字段排序,如果第一个字段
        相同,那么再进行对第二个字段排序,以达到二次排序的目的,在分区排序后进入分组阶段也是默认按照key分的,
        分组需要实现RawComparator
     8> 分组之后是merge个并归排序然后进入reduce,其中分组决定数据进入某个reduce,而分区决定了reduce阶段生成
        文件的数目,分组算是shuffle阶段对程序运行的一个优化吧我是这么理解的
 3. 分析[二次排序]
     1> 从上面的数据可以看出,我们可以自定义一个数据类型,来存放第一个和第二个字段,然后自定义一个比较器来
        说明排序规则按照key中的第一个字段进行排序,这里涉及到自定义数据需要实现WritableComparable也可以
        分别继承Writable和Comparable,反正越方便越好
     2> 接下来看看分区操作,该例只生成一个排好序的文件,不用自定义分区,自定义分区后也不会走该类,自定义分区需要
        继承Partitioner,注意是继承,我们自己要重写分区规则
     3> 然后是分组操作,分组为优化考虑还是有必要的,我们设计分组规则为按照自定义数据类型的第一个字段进行分组,
        分组需要实现RawComparator
     4> 考虑哪里还需要优化,根据数据源的数据量,字段是否必在,长度情况,
        类型情况,是否使用combine与自定义压缩类,数值为负数等,在比较器中既然定义了根据第二个字段比较,我想也
        没必要加个大数减个大数
  效果展示:
      数据源      map()后         shuffle阶段后         reduce()后
       a,1        a#1,1          a#1 [1,2,100]          a   1
       b,1        b#1,1          b#-3 [-3,-2,1,6]       a   2
       a,2        a#2,2          c#-7 [-7,2]            a   100
       b,6        b#6,6                                 b   -3
       c,2        c#2,2                                 b   -2
       b,-2       b#-2,-2                               b   1
       a,100      a#100,100                             b   6
       b,-3       b#-3,-3                               c   -7
       c,-7       c#-7,-7                               c   2

二、二次排序示例代码

SSortMr.java ## 主类
============
package com.bigdata_senior.SSortMr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SSortMr {

	//Mapper Class
	private static class SSortMapper
	    extends Mapper<LongWritable, Text, SecondaryWritable, LongWritable>{
		private SecondaryWritable mapOutKey = new SecondaryWritable();
		private LongWritable mapOutValue = new LongWritable();
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String lineValue = value.toString();
			String [] strValue = lineValue.split(",");
			mapOutKey.set(strValue[0],Integer.valueOf(strValue[1]));
			mapOutValue.set(Integer.valueOf(strValue[1]));
			context.write(mapOutKey, mapOutValue);
			System.out.println("key-->"+mapOutKey+" value-->"+mapOutValue);
		}
	}

	//Reduce Class
	private static class SSortReduce
	    extends Reducer<SecondaryWritable, LongWritable, Text, LongWritable>{
		private Text reduceOutKey = new Text();
		@Override
		public void reduce(SecondaryWritable key, Iterable<LongWritable> values,Context context)
				throws IOException, InterruptedException {

			for(LongWritable value : values){
				reduceOutKey.set(key.getFirst()+"#"+key.getSecond());
				context.write(reduceOutKey, value);
			}
		}
	}

	//Driver
	public int run(String[] args) throws Exception {

		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());
		//job.setNumReduceTasks(3);

		//input
		Path inPath = new Path(args[0]);
		FileInputFormat.addInputPath(job,inPath);

		//output
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);

		//mapper
		job.setMapperClass(SSortMapper.class);
		job.setMapOutputKeyClass(SecondaryWritable.class);
		job.setMapOutputValueClass(LongWritable.class);

		//partitioner
		//job.setPartitionerClass(SecondaryPartionerCLass.class);

		//group
		job.setGroupingComparatorClass(SecondaryGroupClass.class);

		//Reduce
		job.setReducerClass(SSortReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		//submit job
		boolean isSuccess = job.waitForCompletion(true);

		return isSuccess ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {

		args = new String[]{
			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/input",
			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/output13"
		};
		//run job
		int status = new SSortMr().run(args);
		System.exit(status);
	}
}
SecondaryWritable.java ## 自定义数据类型
======================
package com.bigdata_senior.SSortMr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class SecondaryWritable implements WritableComparable<SecondaryWritable> {

	private String first;
	private int second;

	public SecondaryWritable() {}

	public SecondaryWritable(String first,int second){
		this.set(first, second);
	}

	public void set(String fist,int second){
		this.first = fist;
		this.second = second;
	}

	public String getFirst() {
		return first;
	}

	public void setFirst(String first) {
		this.first = first;
	}

	public int getSecond() {
		return second ;
	}

	public void setSecond(int second) {
		this.second = second ;
	}

	@Override
	public void write(DataOutput out) throws IOException {

		out.writeUTF(this.first);
		out.writeInt(this.second);
	}

	@Override
	public void readFields(DataInput in) throws IOException {

		this.first = in.readUTF();
		this.second = in.readInt();
	}

	@Override
	public int compareTo(SecondaryWritable o) {

		int comp = this.first.compareTo(o.first);
		if(0 != comp){
			return comp;
		}
		return Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second));
	}

	@Override
	public String toString() {
		return first + "#" + second;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((first == null) ? 0 : first.hashCode());
		result = prime * result + second;
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		SecondaryWritable other = (SecondaryWritable) obj;
		if (first == null) {
			if (other.first != null)
				return false;
		} else if (!first.equals(other.first))
			return false;
		if (second != other.second)
			return false;
		return true;
	}
}
SecondaryPartionerCLass.java ## 自定义分区规则(已注释不用)
============================
package com.bigdata_senior.SSortMr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class SecondaryPartionerCLass extends Partitioner<SecondaryWritable, LongWritable> {

	@Override
	public int getPartition(SecondaryWritable key, LongWritable value,
			int numPartitions) {
		return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
	}
}
SecondaryGroupClass.java ## 自定义分组规则
========================
package com.bigdata_senior.SSortMr;

import java.util.Arrays;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class SecondaryGroupClass implements RawComparator<SecondaryWritable> {

	@Override
	public int compare(SecondaryWritable o1, SecondaryWritable o2) {
		System.out.println("o1: "+o1.toString()+" o2: "+o2.toString());
		return o1.getFirst().compareTo(o2.getFirst());
	}

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		System.out.println("b1: "+Arrays.toString(b1)+" b2: "+Arrays.toString(b2));
		return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
	}
}
另外还可以: ## 但这个对于小数据可用,大数据将非常消耗资源
SSortMr2.java
=============
package com.bigdata_senior.SSortMr2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SSortMr2 {

	//Mapper Class
	private static class SSortMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		private Text mapOutKey = new Text();
		private LongWritable mapOutValue = new LongWritable();
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String lineValue = value.toString();
			String [] strValue = lineValue.split(",");
			mapOutKey.set(strValue[0]);
			mapOutValue.set(Integer.valueOf(strValue[1]));
			context.write(mapOutKey, mapOutValue);
			System.out.println("key-->"+mapOutKey+" value-->"+mapOutValue);
		}
	}

	//Reduce Class
	private static class SSortReduce extends Reducer<Text, LongWritable, Text, Long>{
		@Override
		public void reduce(Text key, Iterable<LongWritable> values,Context context)
				throws IOException, InterruptedException {

			List<Long> longList = new ArrayList<Long>();
			for(LongWritable value: values){
				longList.add(value.get());
			}
			Collections.sort(longList);
			for(Long value : longList){
				System.out.println("key--> "+key+" value--> "+value);
				context.write(key, value);
			}
		}
	}

	//Driver
	public int run(String[] args) throws Exception {

		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		//input
		Path inPath = new Path(args[0]);
		FileInputFormat.addInputPath(job,inPath);

		//output
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);

		//mapper
		job.setMapperClass(SSortMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		//Reduce
		job.setReducerClass(SSortReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Long.class);

		//submit job
		boolean isSuccess = job.waitForCompletion(true);

		return isSuccess ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {

		args = new String[]{
			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/input",
			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/output22"
		};
		//run job
		int status = new SSortMr2().run(args);
		System.exit(status);
	}
}

三、MapReduce join简单理解

1. join(组合)
2. 即两张或两张以上的数据源数据组合输出
3. 由于学了hive,感觉MapReduce的join不再是重点,因为在MapReduce处理
    1> 为止join表数目
    2> 操作繁琐,过滤多样,可能会考虑不全
    3> 资源消耗较重
4. MapReduce的join大致就是将两张表加载进内存,在数据混淆情况下,为其设置自定义数据类型以区分两张表,
   然后在reudece()中分别获取表并指定输出结果,当然处理join的方式还有很多,比如setup()加载一张表存进集合处理

四、MapReduce join代码示例

JoinMr.java ## 主类
===========
package com.bigdata_senior.joinMr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JoinMr {

	//Mapper Class
	private static class WordCountMapper extends
		Mapper<LongWritable, Text, LongWritable, JoinWritable>{

		private LongWritable mapoutputkey = new LongWritable();
		private JoinWritable mapoutputvalue = new JoinWritable();

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
		}

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String lineValue = value.toString();
			String [] strValue = lineValue.split(",");

			int length = strValue.length;
			if(3 != length && 4 != length){
				return;
			}

			//get cid
			Long cid = Long.valueOf(strValue[0]);
			//get cname
			String cname = strValue[1];
			//set customer
			if(3 == length){
				String phone = strValue[2];
				mapoutputkey.set(cid);
				mapoutputvalue.set("customer", cname + "," + phone);
			}

			//set order
			if(4 == length){
				String price = strValue[2];
				String date = strValue[3];
				mapoutputkey.set(cid);
				mapoutputvalue.set("order", cname +","+price +","+ date);
			}
			context.write(mapoutputkey, mapoutputvalue);
		}
	}

	//Reduce Class
	private static class WordCountReduce extends
		Reducer<LongWritable, JoinWritable, NullWritable, Text>{

		private Text outputValue = new Text();
		@Override
		public void reduce(LongWritable key, Iterable<JoinWritable> values,Context context)
				throws IOException, InterruptedException {

			String customerInfo = null;
			List<String> orderList = new ArrayList<String>();
			for(JoinWritable value : values){
				if("customer".equals(value.getTag())){
					customerInfo = value.getData();
					System.out.println(customerInfo);
				}else if("order".equals(value.getTag())){
					orderList.add(value.getData());
				}
			}
			for(String order: orderList){
				outputValue.set(key.get()+","+customerInfo+","+order);
				context.write(NullWritable.get(), outputValue);
			}
		}
	}

	//Driver
	public int run(String[] args) throws Exception {

		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		//input
		Path inPath = new Path(args[0]);
		FileInputFormat.addInputPath(job,inPath);

		//output
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);

		//mapper
		job.setMapperClass(WordCountMapper.class);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(JoinWritable.class);

		//Reduce
		job.setReducerClass(WordCountReduce.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		//submit job
		boolean isSuccess = job.waitForCompletion(true);

		return isSuccess ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {

		args = new String[]{
			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/join/input",
			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/join/output2"
		};
		//run job
		int status = new JoinMr().run(args);
		System.exit(status);
	}
}
JoinWritable.java ## 自定义数据类型
package com.bigdata_senior.joinMr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class JoinWritable implements Writable {

	private String tag;
	private String data;

	public JoinWritable(){}

	public JoinWritable(String tag,String data){
		this.set(tag, data);
	}
	public void set(String tag,String data){
		this.setTag(tag);
		this.setData(data);
	}

	public String getTag() {
		return tag;
	}

	public void setTag(String tag) {
		this.tag = tag;
	}

	public String getData() {
		return data;
	}

	public void setData(String data) {
		this.data = data;
	}

	@Override
	public void write(DataOutput out) throws IOException {

		out.writeUTF(this.getTag());
		out.writeUTF(this.getData());
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.setTag(in.readUTF());
		this.setData(in.readUTF());
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((data == null) ? 0 : data.hashCode());
		result = prime * result + ((tag == null) ? 0 : tag.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		JoinWritable other = (JoinWritable) obj;
		if (data == null) {
			if (other.data != null)
				return false;
		} else if (!data.equals(other.data))
			return false;
		if (tag == null) {
			if (other.tag != null)
				return false;
		} else if (!tag.equals(other.tag))
			return false;
		return true;
	}

	@Override
	public String toString() {
		return tag + "," +data;
	}
}
时间: 2024-08-08 09:41:40

Hadoop.2.x_高级应用_二次排序及MapReduce端join的相关文章

大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

   前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存). 一 概述 定义 MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE).这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间. 适用范围:数据量大,但是数据种类小可以放入内存. 基

Hadoop二次排序及MapReduce处理流程实例详解

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序的例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和Map.

python 实现Hadoop的partitioner和二次排序

我们知道,一个典型的Map-Reduce过程包 括:Input->Map->Patition->Reduce->Output.Pation负责把Map任务输出的中间结果 按key分发给不同的Reduce任务进行处理.Hadoop 提供了一个非常实用的partitioner类KeyFieldBasedPartitioner,通过配置相应的参数就可以使用.通过 KeyFieldBasedPartitioner可以方便地实现二次排序. 使用方法:       -partitioner o

图形学_二维图形的剪裁_Sutherland-Hodgeman_Cohen—Sutherland

一.Cohen-Sutherland剪裁算法 1.基本思想 对于每条线段P1P2分为三种情况处理: (1)若P1P2完全在窗口内,则显示该线段P1P2. (2)若P1P2明显在窗口外,则丢弃该线段. (3)若线段不满足(1)或(2)的条件,则在交点处把线段分为两段.其中一段完全在窗口外,可弃之.然后对另一段重复上述处理. 为快速判断,采用如下编码方法: 将窗口边线两边沿长,得到九个区域,每一个区域都用一个四位二进制数标识,直线的端点都按其所处区域赋予相应的区域码,用来标识出端点相对于裁剪矩形边界

用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控

写在前面 前文:用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题. 为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1). 假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HAD

进击的Python【第五章】:Python的高级应用(二)常用模块

Python的高级应用(二)常用模块学习 本章学习要点: Python模块的定义 time &datetime模块 random模块 os模块 sys模块 shutil模块 ConfigParser模块 shelve模块 xml处理 re正则表达式 一.Python模块的定义 有过C语言编程经验的朋友都知道在C语言中如果要引用sqrt这个函数,必须用语句"#include<math.h>"引入math.h这个头文件,否则是无法正常进行调用的.那么在Python中,如

Hadoop二次排序的其他写法

二次排序原理 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现. 本例子中使用的是TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value. 这就是自定义Map的输入是<LongWritable, Text>的原因.然后调用自定义Map的map方法,将一个个<LongWrit

Hadoop的二次排序

Hadoop的二次排序 2013-01-08 14:46:53 分类: HADOOP hadoop的使用中,一般只关注运行结果.对于mapper和reducer之间的处理逻辑往往不care.比如key-value对到达reducer的先后顺序等 目前接触到的运用场景有: 1.根据用户操作时间来整理事件链,在网站分析里比较常用.需要按时间先后顺序来处理,如果过亿的访问操作全在reducer里来排序,对计算能力和内存都是一个挑战. 2.海量数据处理中,求去重distinct这种操作,往往需要先缓存很

hadoop二次排序的个人理解

看了多篇文档,现总结自己对二次排序的理解:1.流程 各个阶段:input ---> split --> recordreader --> 形成复合键值对textpair  --> 分区(setGroupingComparatorClass设置的分区方法)输出  --> 对每个分区进行排序setSortComparatorClass(按照设定的排序方式对textpair进行排序,其实这已经进行了一次二次排序了)  --> shuffle阶段  --> 内部排序(用s