剖析美国平均气温项目,掌握MapReduce编程

数据集导入HDFS

通过命令行访问刚刚上传至HDFS的数据集

[[email protected] hadoop-2.6.0]$ bin/hdfs dfs -ls /weather/

  

MapReduce程序编译及运行:

第一步:在 Map 阶段,提取气象站和气温数据

public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
		/**
		 * @function Mapper 解析气象站数据
		 * @input key=偏移量  value=气象站数据
		 * @output key=weatherStationId value=temperature
		 */
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String line = value.toString(); //读取每行数据
			int temperature = Integer.parseInt(line.substring(14, 19).trim());//气温

			if (temperature != -9999) { //过滤无效数据
				FileSplit fileSplit = (FileSplit) context.getInputSplit();
				String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
				context.write(new Text(weatherStationId), new IntWritable(temperature));
			}
		}
	}

第二步:在 Reduce 阶段,统计每个气象站的平均气温

/**
	 *
	 * @function Reducer 统计美国各个州的平均气温
	 * @input key=weatherStationId  value=temperature
	 * @output key=weatherStationId value=average(temperature)
	 */
	public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			int count = 0;
			for (IntWritable val : values) {
				sum += val.get();
				count++;
			}
			result.set(sum / count);
			context.write(key, result);
		}
	}

第三步:对代码进行单元测试及debug调试。

Mapper单元测试

Mapper 的逻辑就是从读取的气象站数据中,提取气温值。比如读取一行"1985 07 31 02   200    94 10137   220    26     1     0 -9999"气象数据,提取第14位到19位之间的字符即为气温值200。

/**
 * Mapper 端的单元测试
 */
@SuppressWarnings("all")
public class TemperatureMapperTest {
	private Mapper mapper;//定义一个Mapper对象
	private MapDriver driver;//定义一个MapDriver 对象

	@Before
	public void init() {
		mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
		driver = new MapDriver(mapper);//实例化MapDriver对象
	}

	@Test
	public void test() throws IOException {
		//输入一行测试数据
		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
		driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
				.withOutput(new Text("weatherStationId"), new IntWritable(200))//跟TemperatureMapper输出类型一致
				.runTest();
	}
}

Reduce单元测试

Reduce 函数的逻辑就是把key相同的 value 值相加然后取平均值,Reducer 单元测试

/**
 * Reducer 单元测试
 */
@SuppressWarnings("all")
public class TemperatureReduceTest {
	private Reducer reducer;//定义一个Reducer对象
	private ReduceDriver driver;//定义一个ReduceDriver对象

	@Before
	public void init() {
		reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
		driver = new ReduceDriver(reducer);//实例化ReduceDriver对象
	}

	@Test
	public void test() throws IOException {
		String key = "weatherStationId";//声明一个key值
		List values = new ArrayList();
		values.add(new IntWritable(200));//添加第一个value值
		values.add(new IntWritable(100));//添加第二个value值
		driver.withInput(new Text("weatherStationId"), values)//跟TemperatureReducer输入类型一致
			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
			  .runTest();
	}
}

Mapper 和 Reducer 集成起来测试

/**
 * Mapper 和 Reducer 集成起来测试
 */
@SuppressWarnings("all")
public class TemperatureTest {
	private Mapper mapper;//定义一个Mapper对象
	private Reducer reducer;//定义一个Reducer对象
	private MapReduceDriver driver;//定义一个MapReduceDriver 对象

	@Before
	public void init() {
		mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
		reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
		driver = new MapReduceDriver(mapper, reducer);//实例化MapReduceDriver对象
	}

	@Test
	public void test() throws RuntimeException, IOException {
		//输入两行行测试数据
		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
		String line2 = "1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999";
		driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
			  .withInput(new LongWritable(), new Text(line2))
			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
			  .runTest();
	}
}

第四步:将项目编译和打包为Tempearture.jar,使用客户端将 Tempearture.jar上传至hadoop的/home/hadoop/Temp目录下。

第五步:使用cd /home/hadoop/Temp 切换到当前目录,通过hadoop jar Temperature.jar com.hadoop.base.Temperature /weather/ /weather/out/命令行执行任务。

第六步:任务的最终结果输出到 HDFS ,使用hadoop fs -cat /weather/out/part-r-00000命令查看结果。

原文地址:https://www.cnblogs.com/zhoupp/p/10924946.html

时间: 2024-10-22 11:43:01

剖析美国平均气温项目,掌握MapReduce编程的相关文章

剖析美国平均气温项目

第一步:数据集导入HDFS 使用命令行访问刚刚上传至HDFS的数据集 第一步:在 Map 阶段,提取气象站和气温数据. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //读取每行数据 int temperature = Integer.parseInt(line.sub

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码.这里不多赘述,直接送上代码. MRUnit 框架 MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用.MRUnit针对不同测试对象使用不同的Driver: MapDriver:针对单独的Map测试  ReduceDriver:针对单独的Reduce测试    MapReduceDri

MapReduce编程模型及其在Hadoop上的实现

转自:https://www.zybuluo.com/frank-shaw/note/206604 MapReduce基本过程 关于MapReduce中数据流的传输过程,下图是一个经典演示:  关于上图,可以做出以下逐步分析: 输入数据(待处理)首先会被切割分片,每一个分片都会复制多份到HDFS中.上图默认的是分片已经存在于HDFS中. Hadoop会在存储有输入数据分片(HDFS中的数据)的节点上运行map任务,可以获得最佳性能(数据TaskTracker优化,节省带宽). 在运行完map任务

Hadoop 实践(二) Mapreduce 编程

Mapreduce 编程,本文以WordCount  为例:实现文件字符统计 在eclipse 里面搭建一个java项目,引入hadoop lib目录下的jar,和 hadoop主目录下的jar. 新建WordCount 类: package org.scf.wordcount; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.co

MapReduce编程实战之“高级特性”

本篇介绍MapReduce的一些高级特性,如计数器.数据集的排序和连接.计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的""连接(join)操作. 计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计.计数器还可用于辅助诊断系统故障.对于大型分布式系统来说,获取计数器比分析日志文件容易的多. 示例一:气温缺失及不规则数据计数器 import java.io.IOException; import

MapReduce编程实例5

前提准备: 1.hadoop安装运行正常.Hadoop安装配置请参考:Ubuntu下 Hadoop 1.2.1 配置安装 2.集成开发环境正常.集成开发环境配置请参考 :Ubuntu 搭建Hadoop源码阅读环境 MapReduce编程实例: MapReduce编程实例(一),详细介绍在集成环境中运行第一个MapReduce程序 WordCount及代码分析 MapReduce编程实例(二),计算学生平均成绩 MapReduce编程实例(三),数据去重 MapReduce编程实例(四),排序 M

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Python计算国际交换站单日及每月平均气温

数据来源:rp5网站 下载数据样式:选择UTF-8编码.下载后解压得到 xxx.csv 文件 计算目标:包含每个月的平均气温和当月每一天的平均气温,按月命名并存储在相应文件中. 数据格式: 注意: 每日平均气温包含时次:02,08,14,20.初始数据包含时次中05,11,17,23时没有用处. 可能文件中某些日期有缺失的时次,如果有就不计算当天均温.计算月均温时总日数不包括缺失的这天. 代码: 1 #!/usr/bin/env python3 2 def calcTemp(t_file): 3

stl源码剖析学习笔记(二)traits编程技法简明例程

解释说明 traits侯捷老师的翻译是萃取.其目的就是在编译期进行模板调用的类型识别,从而做一些事情. 最突出的例子,我觉得不是<STL源码剖析>中"迭代器概念与traits编程技法"这一章的说明,而是stl算法中copy的实现.代码在stl源码的stl_algobase.h中. copy的最终实现,大致分为两类,一类是直接整块内存的memmove操作,另一类是一个个对象赋值.其中涉及has_trivial_assignment_operator的类型推断. 如果has_t