hadoop 多个maper处理

package com.smilezl.learn.CalWord;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class MultipleMapper {

public static class DeptMapper extends Mapper<Object, Text, Text, Text> {

@Override

protected void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer tokenizer = new StringTokenizer(value.toString());

String tmp = "";

while (tokenizer.hasMoreTokens()) {

tmp += tokenizer.nextToken().toString();

}

context.write(new Text("dept"), new Text(tmp));

}

}

public static class EmpMapper extends Mapper<Object, Text, Text, Text> {

@Override

protected void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer tokenizer = new StringTokenizer(value.toString());

String tmp = "";

while (tokenizer.hasMoreTokens()) {

tmp += tokenizer.nextToken().toString();

}

context.write(new Text("dept"), new Text(tmp));

}

}

public static class MulReducer extends Reducer<Object, Text, Text, Text> {

@Override

protected void reduce(Object key, Iterable<Text> value, Context context)

throws IOException, InterruptedException {

StringTokenizer tokenizer = new StringTokenizer(value.toString());

String tmp = "";

while (tokenizer.hasMoreTokens()) {

tmp += tokenizer.nextToken().toString() + "__";

}

context.write(new Text("emp"), new Text(tmp));

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 3) {

System.out.println(otherArgs.length);

System.out.println("Usage: MultipleMapper <in> <in> <out>");

System.exit(2);

}

Job job = new Job(conf, "MultipleMapper");

job.setJarByClass(MultipleMapper.class);

MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, DeptMapper.class);

MultipleInputs.addInputPath(job, new Path(otherArgs[1]), TextInputFormat.class, EmpMapper.class);

FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

job.setCombinerClass(MulReducer.class);

job.setReducerClass(MulReducer.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

hadoop 多个maper处理

时间: 2024-08-02 11:03:22

hadoop 多个maper处理的相关文章

实验二-3 Hadoop&amp;Paoding 中文词频统计

  参考教程 在Hadoop上使用庖丁解牛(较复杂,并未采用,可以之后试试) http://zhaolinjnu.blog.sohu.com/264905210.html Lucene3.3.Lucene3.4中文分词——庖丁解牛分词实例(屈:注意版本) http://www.360doc.com/content/13/0217/13/11619026_266124504.shtml 庖丁分词在hadoop上运行时的配置问题(采纳了一半,没有按照其所写配置dic属性文件) http://f.da

python 实现Hadoop的partitioner和二次排序

我们知道,一个典型的Map-Reduce过程包 括:Input->Map->Patition->Reduce->Output.Pation负责把Map任务输出的中间结果 按key分发给不同的Reduce任务进行处理.Hadoop 提供了一个非常实用的partitioner类KeyFieldBasedPartitioner,通过配置相应的参数就可以使用.通过 KeyFieldBasedPartitioner可以方便地实现二次排序. 使用方法:       -partitioner o

Hadoop的二次排序

Hadoop的二次排序 2013-01-08 14:46:53 分类: HADOOP hadoop的使用中,一般只关注运行结果.对于mapper和reducer之间的处理逻辑往往不care.比如key-value对到达reducer的先后顺序等 目前接触到的运用场景有: 1.根据用户操作时间来整理事件链,在网站分析里比较常用.需要按时间先后顺序来处理,如果过亿的访问操作全在reducer里来排序,对计算能力和内存都是一个挑战. 2.海量数据处理中,求去重distinct这种操作,往往需要先缓存很

Hadoop单点部署与案例开发(微博用户数据分析)

一.环境搭建 1.Hadoop运行环境搭建 1.1 安装虚拟机 (1)下载并安装VMware虚拟机软件. (2)创建虚拟机,实验环境虚拟机配置如下图所示. (3)安装Ubuntu系统,安装结果如下图所示. 1.2  配置JDK环境 下载并安装JDK,安装结束后需对java环境进行配置,配置成功结果如下所示. 2.Hadoop安装和部署 (1)创建Hadoop安装文件夹,并切到到此路径下. (2)从 hadoop.apache.org 下载Hadoop 安装文件,并复制文件到安装Hadoop的文件

Hadoop MapReduce计算框架

1.MapReduce理论 1.1.MapReduce是什么? MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据):MapReduce是谷歌公司在研究如何处理海量数据所提出的一种面向大规模数据处理的并行计算模型和方法. 1.2.MapReduce概述 MapReduce是一个计算框架,用于对大数据进行处理,它的主要思想就是"分而治之":整个MapReduce计算过程可以分为Map(映射)阶段

hadoop基本组件原理小总结

Hadoop基础知识小总结  这是本人(学生党)在学习hadoop半个学期后根据教科书后习题做的一个小总结,如有发现错误还请各位海涵并指出,我会及时改过来的,谢谢! 目录 Hadoop基础知识小总结... 1 第一章... 2 1.简述hadoop平台的发展过程... 2 2.简述Hasoop名称和及技术来源.... 3 3.简述Hadoop的体系架构.... 3 4.简述MapReduce的体系架构.... 3 5.简述HDFS和MapReduce在Hadoop中的角色.... 4 第二章..

Hadoop学习之路(5)Mapreduce程序完成wordcount

程序使用的测试文本数据: Dear River Dear River Bear Spark Car Dear Car Bear Car Dear Car River Car Spark Spark Dear Spark 1编写主要类 (1)Maper类 首先是自定义的Maper类代码 public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWrit

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea

Hadoop:Windows 7 32 Bit 编译与运行

所需工具 1.Windows 7 32 Bit OS(你懂的) 2.Apache Hadoop 2.2.0-bin(hadoop-2.2.0.tar.gz) 3.Apache Hadoop 2.2.0-src(hadoop-2.2.0-src.tar.gz) 3.JDK 1.7 4.Maven 3.2.1(apache-maven-3.2.1-bin.zip) 5.Protocol Buffers 2.5.0 6.Unix command-line tool Cygwin(Setup-x86.e