Hadoop中的MultipleOutputs实践

本例子采用hadoop1.1.2版本

采用气象数据作为处理数据

1、MultipleOutputs例子,具体解释在代码中有注释

package StationPatitioner;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * hadoop Version 1.1.2
 * MultipleOutputs例子
 * @author 巧克力黑
 *
 */
public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool {
	enum Counter 
	{
		LINESKIP,	//出错的行
	}
	static class StationMapper extends MapReduceBase implements Mapper<LongWritable , Text, Text , Text>{
		private NcdcRecordParser parser = new NcdcRecordParser();
		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			try {
				parser.parse(value);
				output.collect(new Text(parser.getStationid()), value);
			} catch (Exception e) {
				reporter.getCounter(Counter.LINESKIP).increment(1);	//出错令计数器+1
			}

		}
	}

	static class MultipleOutputReducer extends MapReduceBase implements Reducer<Text, Text, NullWritable, Text>{
		private MultipleOutputs multipleOutputs;
		@Override
		public void configure(JobConf jobconf) {
			multipleOutputs = new MultipleOutputs(jobconf);//初始化一个MultipleOutputs
		}

		@Override
		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<NullWritable, Text> output, Reporter reporter)
				throws IOException {
			//得到OutputCollector
			OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter);
			while(values.hasNext()){
				collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector输出数据
			}
		}

		@Override
		public void close() throws IOException {
			multipleOutputs.close();
		}
	}

	@Override
	public int run(String[] as) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");//windows下用户与linux用户不一直,采用此方法避免报Permission相关错误
		JobConf conf = new JobConf();

		conf.setMapperClass(StationMapper.class);
		conf.setReducerClass(MultipleOutputReducer.class);
		conf.setMapOutputKeyClass(Text.class);
		conf.setOutputKeyClass(NullWritable.class);
		conf.setOutputFormat(NullOutputFormat.class);
	    FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路径
	    FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路径

		MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class);

		JobClient.runJob(conf);
		return 0;
	}

	public static void main(String[] args) throws Exception{
		int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args);
		System.exit(exitCode);
	}

}

2、解析气象数据的类

package StationPatitioner;

import org.apache.hadoop.io.Text;

public class NcdcRecordParser {
	private static final int MISSING_TEMPERATURE = 9999;

	private String year;
	private int airTemperature;
	private String quality;
	private String stationid;

	public void parse(String record) {
		stationid = record.substring(0, 5);
		year = record.substring(15, 19);
		String airTemperatureString;
		// Remove leading plus sign as parseInt doesn‘t like them
		if (record.charAt(87) == ‘+‘) {
			airTemperatureString = record.substring(88, 92);
		} else {
			airTemperatureString = record.substring(87, 92);
		}
		airTemperature = Integer.parseInt(airTemperatureString);
		quality = record.substring(92, 93);
	}

	public String getStationid(){
		return stationid;
	}

	public void parse(Text record) {
		parse(record.toString());
	}

	public boolean isValidTemperature() {
		return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
	}

	public String getYear() {
		return year;
	}

	public int getAirTemperature() {
		return airTemperature;
	}
}
时间: 2024-10-11 12:19:49

Hadoop中的MultipleOutputs实践的相关文章

Hadoop中的Combiner实践

Combiner作用是合并Mapper的输出,Combiner的输出作为Reducer的输入,这样可以减少map任务和reducer任务之间的数据传输. 1.在Job中设置Combiner和不设置Combiner,观察Reducer输入情况 使用如下代码设置Combiner job.setCombinerClass(MaxTemperatureReducer.class); @Override public int run(String[] args) throws Exception { Jo

王家林的云计算分布式大数据Hadoop企业级开发动手实践

一:课程简介: Hadoop是云计算分布式大数据的事实标准软件框架,Hadoop中的架构实现是整个云计算产业技术的基础,作为与Google三大核心技术DFS.MapReduce.BigTable相对的HDFS.MapReduce.和HBase也是整个Hadoop生态系统的核心的技术,本课程致力于帮您掌握这三大技术的同时掌握云计算的数据仓库挖掘技术Hive,助您在云计算技术时代自由翱翔. 二:课程特色 1,      深入浅出中动手实作: 2,      掌握Hadoop三大核心:HDFS.Map

Hadoop大数据时代:Hadoop&amp;YarnSpark企业级最佳实践 (4天)

Hadoop.Yarn.Spark是企业构建生产环境下大数据中心的关键技术,也是大数据处理的核心技术,是每个云计算大数据工程师必修课. 大数据时代的精髓技术在于Hadoop.Yarn.Spark,是大数据时代公司和个人必须掌握和使用的核心内容. Hadoop.Yarn.Spark是Yahoo!.阿里淘宝等公司公认的大数据时代的三大核心技术,是大数据处理的灵魂,是云计算大数据时代的技术命脉之所在,以Hadoop.Yarn.Spark为基石构建起来云计算大数据中心广泛运行于Yahoo!.阿里淘宝.腾

Hadoop MapReduce开发最佳实践(上篇)

body{ font-family: "Microsoft YaHei UI","Microsoft YaHei",SimSun,"Segoe UI",Tahoma,Helvetica,Sans-Serif,"Microsoft YaHei", Georgia,Helvetica,Arial,sans-serif,宋体, PMingLiU,serif; font-size: 10.5pt; line-height: 1.5;}

Hadoop MapReduce链式实践--ChainReducer

版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0. 场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据: data1: A,10 A,11 A,12 A,13 B,21 B,31 B,41 B,51 data2: A,20 A,21 A,22 A,23 B,201 B,301 B,401 B,501 最后输出为: A,23 B,501 假如这样的逻辑的mapreduce数据流如下: 假设C组数据比较多,同时假设集群有2个节点,那么这个任

混合云场景下容器技术在新能源功率预测产品中的最佳实践

能源互联网是物联网和"互联网+"在能源行业深度融合的产物,是中国制造2025的重要组成部分,我们现在还处于能源互联网的早期阶段.绝大部分能源行业的应用都部署在私有局域网内,并且网络结构异常复杂,这是阻碍互联网技术在能源行业落地的最大挑战. 6月28日,金风科技数据平台架构师张利出席了Rancher Labs举办的Container Day 2018容器技术大会,并做了题为<混合云场景下容器技术在新能源功率预测产品中的最佳实践>的演讲. 金风科技是中国成立最早.自主研发能力最

hadoop中Configuration类剖析

Configuration是hadoop中五大组件的公用类,所以放在了core下,org.apache.hadoop.conf.Configruration.这个类是作业的配置信息类,任何作用的配置信息必须通过Configuration传递,因为通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息. 类图 说明:Configuration实现了Iterable和Writable两个接口,其中实现Iterable是为了迭代,迭代出Configuration对

Hadoop中作业(job)、任务(task)和task attempt

hadoop中,MapReduce作业(job)ID的格式为job_201412081211_0002.这表示该作业是第二个作业(作业号从0001开始),作业开始于2014年12月8号12:11. 任务(task)属于作业,通过使用"task"替换作业ID的"job"前缀,然后在后面加上一个后缀表示哪个作业中间的任务.例如:task_201412081211_0002_m_000003,表示该任务属于job_201412081211_0002作业的第三个map任务(

hadoop中Text类 与 java中String类的区别

hadoop 中 的Text类与java中的String类感觉上用法是相似的,但两者在编码格式和访问方式上还是有些差别的,要说明这个问题,首先得了解几个概念: 字符集: 是一个系统支持的所有抽象字符的集合.字符是各种文字和符号的总称,包括各国家文字.标点符号.图形符号.数字等.例如 unicode就是一个字符集,它的目标是涵盖世界上所有国家的文字和符号: 字符编码:是一套法则,使用该法则能够对自然语言的字符的一个集合(如字母表或音节表),与其他东西的一个集合(如号码或电脉冲)进行配对.即在符号集