大数据学习之提交job流程,排序11

1实现接口->WritableCompareable

排序操作在hadoop中属于默认的行为。默认按照字典殊勋排序。

排序的分类:

1)部分排序

2)全排序

3)辅助排序

4)二次排序

3 案例: 在流量汇总输出文件里的数据  进行分区,每个分区中的数据进行排序

数据预览,这里只是进行了流量的汇总,没有进行分区和排序

1:编写flowbean

实现WritableCompareable完成序列化并且完成排序

package it.dawn.YARNPra.基本用法.排序;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * @author Dawn
 * @date 2019年5月7日09:04:04
 * @version 1.0
 * 直接继承 WritableComparable,
 */
public class FlowBean implements WritableComparable<FlowBean>{

	private long upFlow;
	private long dfFlow;
	private long flowSum;

	//无参构造
	public FlowBean() {}
	//有参构造
	public FlowBean(long upFlow,long dfFlow) {
		this.upFlow=upFlow;
		this.dfFlow=dfFlow;
		this.flowSum=upFlow+dfFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDfFlow() {
		return dfFlow;
	}

	public void setDfFlow(long dfFlow) {
		this.dfFlow = dfFlow;
	}

	public long getFlowSum() {
		return flowSum;
	}

	public void setFlowSum(long flowSum) {
		this.flowSum = flowSum;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeLong(upFlow);
		out.writeLong(dfFlow);
		out.writeLong(flowSum);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		upFlow=in.readLong();
		dfFlow=in.readLong();
		flowSum=in.readLong();
	}

	@Override
	public String toString() {
		return upFlow+"\t"+dfFlow+"\t"+flowSum;
	}

	//排序
	@Override
	public int compareTo(FlowBean o) {
		//倒序
		return this.flowSum>o.getFlowSum()? -1 : 1;
	}

}

2:编写Mapper类

package it.dawn.YARNPra.基本用法.排序;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author Dawn
 * @date 2019年5月7日09:24:06
 * @version 1.0
 *
 * 输入?
 * 13480253104	120	1320	1440
 * 输出?
 * <key2                ,   v2>
 * <流量上行+\t+流量下行,手机号码>
 */
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		//1:读数据
		String line=value.toString();

		//2:切割
		String[] fields=line.split("\t");

		//3:取出指定字段
		long upFlow=Long.parseLong(fields[1]);
		long dfFlow=Long.parseLong(fields[2]);

		//4:输出到reduce阶段
		context.write(new FlowBean(upFlow, dfFlow), new Text(fields[0]));
	}

}

  

3:编写Reducer类

package it.dawn.YARNPra.基本用法.排序;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{

	@Override
	protected void reduce(FlowBean k3, Iterable<Text> v3, Context context)
			throws IOException, InterruptedException {

		//直接输出
		context.write(v3.iterator().next(), k3);
	}

}

  

4:编写Partitioner类

package it.dawn.YARNPra.基本用法.排序;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowSortPartitioner extends Partitioner<FlowBean, Text>{

	@Override
	public int getPartition(FlowBean key, Text value, int numPartitions) {

		//1: 获取手机前3个数字
		String phoneThree=value.toString().substring(0, 3);

		//2:定义分区号
		int partitioner=4;

		if("135".equals(phoneThree)) {
			return 0;
		}else if("137".equals(phoneThree)) {
			return 1;
		}else if("138".equals(phoneThree)) {
			return 2;
		}else if("139".equals(phoneThree)) {
			return 3;
		}

		return partitioner;
	}

}

  

5:编写driver类

package it.dawn.YARNPra.基本用法.排序;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Dawn
 * @date 2019年5月7日09:22:12
 * @version 1.0
 * 需求?
 * 将数据进行分区,并在每个分区中进行排序
 */
public class FlowSortDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		//1:添加配置
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);

		//2:设置主类
		job.setJarByClass(FlowSortDriver.class);

		//3:设置Mapper和Reduce类
		job.setMapperClass(FlowSortMapper.class);
		job.setReducerClass(FlowSortReducer.class);

		//4:设置Map输出类
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);

		//5:设置Reduce输出类
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		//添加自定义分区
		job.setPartitionerClass(FlowSortPartitioner.class);
		job.setNumReduceTasks(5);

		//6:设置输入输出路径
		FileInputFormat.setInputPaths(job, new Path("f:/temp/流量统计结果/out1/part-r-00000"));
		FileOutputFormat.setOutputPath(job, new Path("f:/temp/流量统计结果/out2"));

		//7提交任务
		boolean rs=job.waitForCompletion(true);
		System.out.println(rs ? "success" : "fail");
	}

}

  

查看最终的输出结果:

  

原文地址:https://www.cnblogs.com/hidamowang/p/10828686.html

时间: 2024-10-27 18:53:57

大数据学习之提交job流程,排序11的相关文章

大数据学习之提交job流程,分区和合并11

一:合并(mapTask的合并) 使用合并的注意事项: (1)合并是一种特殊的Reducer (2)合并是在Mapper端执行一次合并,用于减少Mapper输出到Reducer的数据量,可以提高效率. (3)举例:以WordCount为例 (4)注意:一定要谨慎使用Combiner,有些不能使用:求平均值 有Combiner,或者没有Combiner,都不能改变Map和Reduce对应数据的类型 原理图: 1:maptask并行度与决定机制 2 maptask工作机制 3:代码改写:使用上回的W

大数据学习的流程方案

大数据成为了当下发展的一种趋势,很多人去追求大数据的学习,但是苦于无从下手,今天编者根据自己的经验系统总结一下大数据学习的方略: 第一步:感性认识,找准思路 (1)看一些大数据发展及应用,了解市场形势 (2)阅读大数据相关书籍,了解知识架构 对上面基本知识有一个了解过程之后,明确自己的思路,就可以进入下一步学习; 第二步:理论学习,扎实基础 大数据平台学习路径:   预备课程 1. 大数据平台Linux基础 2. 大数据平台Java基础 3. 大数据平台Python基础   基础课程 1.  大

大数据学习路线整理

一.大数据技术基础 1.linux操作基础 linux系统简介与安装    linux常用命令–文件操作    linux常用命令–用户管理与权限    linux常用命令–系统管理    linux常用命令–免密登陆配置与网络管理    linux上常用软件安装    linux本地yum源配置及yum软件安装    linux防火墙配置    linux高级文本处理命令cut.sed.awk    linux定时任务crontab 2.shell编程 shell编程–基本语法    shel

2019大数据学习方向【最新分享】

一.大数据运维之Linux基础打好Linux基础,以便更好地学习Hadoop,hbase,NoSQL,Spark,Storm,docker,openstack等.因为企业中的项目基本上都是使用Linux环境下搭建或部署的. 1)Linux系统概述2)系统安装及相关配置?3)Linux网络基础?4)OpenSSH实现网络安全连接?5)vi文本编辑器 6)用户和用户组管理7)磁盘管理?8)Linux文件和目录管理?9)Linux终端常用命令?10)linux系统监测与维护 二.大数据开发核心技术 -

大数据学习方向,从入门到精通

推荐一个大数据学习群 119599574晚上20:10都有一节[免费的]大数据直播课程,专注大数据分析方法,大数据编程,大数据仓库,大数据案例,人工智能,数据挖掘都是纯干货分享,你愿意来学习吗 很多初学者在萌生向大数据方向发展的想法之后,不免产生一些疑问,应该怎样入门?应该学习哪些技术?学习路线又是什么? 所有萌生入行的想法与想要学习Java的同学的初衷是一样的.岗位非常火,就业薪资比较高,,前景非常可观.基本都是这个原因而向往大数据,但是对大数据却不甚了解. 如果你想学习,那么首先你需要学会编

大数据学习路线图 让你精准掌握大数据技术学习?

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图,学好大数据就还得靠专业的工具. 大数据学习QQ群:119599574 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java

大数据学习之小白如何学大数据?(详细篇)

大数据这个话题热度一直高居不下,不仅是国家政策的扶持,也是科技顺应时代的发展.想要学习大数据,我们该怎么做呢?大数据学习路线是什么?先带大家了解一下大数据的特征以及发展方向. 大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘. 先说一下大数据的4V特征: 数据量大,TB->PB 数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等; 商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来; 处理时效性高,海量数据的处

大数据学习路线指导,告诉你如何学习大数据

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图, ? ? 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java字符串.Java数组与类和对象.数字处理类与核心技术.I/O

大数据学习路线 让你精准掌握大数据技术学习

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图,学好大数据就还得靠专业的工具. 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java字符串.Java数组与类和对象.数字处