Hadoop之——MapReduce实现从海量数字信息中获取最大值

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46287805

通过Hadoop的自定义排序算法可实现从海量数字中获取最大值,不多说,直接上代码

1、Mapper类的实现

		static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
			long max = Long.MIN_VALUE;
			protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
				final long temp = Long.parseLong(v1.toString());
				if(temp>max){
					max = temp;
				}
			};

			protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
				context.write(new LongWritable(max), NullWritable.get());
			};
		}

2、Reducer类的实现

		static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
			long max = Long.MIN_VALUE;
			protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
				final long temp = k2.get();
				if(temp>max){
					max = temp;
				}
			};

			protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
				context.write(new LongWritable(max), NullWritable.get());
			};
		}	

3、程序入口Main

		public static void main(String[] args) throws Exception {
			Configuration conf = new Configuration();
			final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
			final Path outPath = new Path(OUT_PATH);
			if(fileSystem.exists(outPath)){
				fileSystem.delete(outPath, true);
			}

			final Job job = new Job(conf , TopKApp.class.getSimpleName());
			FileInputFormat.setInputPaths(job, INPUT_PATH);
			job.setMapperClass(MyMapper.class);
			job.setReducerClass(MyReducer.class);
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(NullWritable.class);
			FileOutputFormat.setOutputPath(job, outPath);
			job.waitForCompletion(true);
		}

4、完整代码

package com.lyz.hadoop.suanfa;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 通过Hadoop自定义排序算法实现在海量数字信息中获取最大值
 * @author liuyazhuang
 *
 */
public class TopKApp {
	//要统计的文件位置
		static final String INPUT_PATH = "hdfs://liuyazhuang:9000/input";
		//统计结果输出的位置
		static final String OUT_PATH = "hdfs://liuyazhuang:9000/out";
		public static void main(String[] args) throws Exception {
			Configuration conf = new Configuration();
			final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
			final Path outPath = new Path(OUT_PATH);
			if(fileSystem.exists(outPath)){
				fileSystem.delete(outPath, true);
			}

			final Job job = new Job(conf , TopKApp.class.getSimpleName());
			FileInputFormat.setInputPaths(job, INPUT_PATH);
			job.setMapperClass(MyMapper.class);
			job.setReducerClass(MyReducer.class);
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(NullWritable.class);
			FileOutputFormat.setOutputPath(job, outPath);
			job.waitForCompletion(true);
		}
		static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
			long max = Long.MIN_VALUE;
			protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
				final long temp = Long.parseLong(v1.toString());
				if(temp>max){
					max = temp;
				}
			};

			protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
				context.write(new LongWritable(max), NullWritable.get());
			};
		}

		static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
			long max = Long.MIN_VALUE;
			protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
				final long temp = k2.get();
				if(temp>max){
					max = temp;
				}
			};

			protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
				context.write(new LongWritable(max), NullWritable.get());
			};
		}
}

5、自己随意构造任意数量的数字,本人是随机构造了100万个Long类型的数据用于测试

6、控制台输出

7、运行结果

时间: 2024-11-05 23:47:41

Hadoop之——MapReduce实现从海量数字信息中获取最大值的相关文章

数组中获取最大值和最小值

/* * 数组获取最大值,最小值 */ public class ArrayTest2 { public static void main(String[] args) { //定义1个数组 int[] arr= {12,13,34,55,100}; //定义参照物 int max=arr[0]; int min=arr[0]; //遍历数组 从索引1开始遍历 for(int x=1;x<arr.length;x++) { if(max<arr[x]) { max=arr[x]; } } Sy

从输入的值中获取最大值和最小值,输入0后结束(利用do_while boolean isRight来标识用户输入)

mport java.util.Scanner; public class DoWhile2 {public static void main(String[] args) { int min=0;//最小值 int max=0;//最大值 int num=0; Scanner input=new Scanner(System.in); System.out.print("请输入一个整数(输入0结束):"); boolean isRight; do{ num=input.nextInt

从Hadoop骨架MapReduce在海量数据处理模式(包括淘宝技术架构)

从hadoop框架与MapReduce模式中谈海量数据处理 前言 几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,认为它们非常是神奇.而神奇的东西常能勾起我的兴趣.在看过介绍它们的文章或论文之后,认为Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,近期凡是空暇时,便在看"Hadoop"."MapReduce""海量数据处理"这方面的论文.但在看论

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

Hadoop新MapReduce框架Yarn详解

简介 本文介绍了Hadoop自0.23.0版本后新的MapReduce框架(Yarn)原理,优势,运行机制和配置方法等,着重介绍新的yarn框架相对于原框架的差异及改进,并通过Demo示例详细介绍了在新的Yarn框架下搭建和开发Hadoop程序的方法.读者通过本文中新旧Hadoop MapReduce框架的对比,更深刻理解新的yarn框架技术与那里和设计思想,文中的Demo代码经过微小修改既可用于用户基于Hadoop新框架的实际生产环境. Hadoop MapReduceV2(Yarn)框架简介

hadoop学习;大数据集在HDFS中存为单个文件;安装linux下eclipse出错解决;查看.class文件插件

sudo apt-get install eclipse 安装后打开eclipse,提示出错 An error has occurred. See the log file /home/pengeorge/.eclipse/org.eclipse.platform_3.7.0_155965261/configuration/1342406790169.log. 查看错误日志然后解决 打开log文件,看到以下的错误 !SESSION 2012-07-16 10:46:29.992 --------

Hadoop之MapReduce

http://blog.csdn.net/wangloveall/article/details/21407531 摘要:MapReduce是Hadoop的又一核心模块,从MapReduce是什么,MapReduce能做什么以及MapReduce的工作机制三方面认识MapReduce. 关键词:Hadoop   MapReduce     分布式处理 面对大数据,大数据的存储和处理,就好比一个人的左右手,显得尤为重要.Hadoop比较适合解决大数据问题,很大程度上依赖其大数据存储系统,即HDFS

Hadoop 新 MapReduce 框架 Yarn 详解

原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介.使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框架图: 图 1.Hadoop 原 MapReduce 架构 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobCli

Hadoop之MapReduce程序应用三

摘要:MapReduce程序进行数据去重. 关键词:MapReduce   数据去重 数据源:人工构造日志数据集log-file1.txt和log-file2.txt. log-file1.txt内容 2014-1-1    wangluqing 2014-1-2    root 2014-1-3   root 2014-1-4  wangluqing 2014-1-5  root 2014-1-6  wangluqing log-file2.txt内容 2014-1-1  root 2014-