Hadoop Mapreduce模板

Mapper

 1 package com.scb.jason.mapper;
 2
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7
 8 import java.io.IOException;
 9 import java.util.StringTokenizer;
10
11 /**
12  * Created by Administrator on 2017/7/23.
13  */
14 // Step 1: Map Class
15 public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
16
17     private Text mapOutputkey =  new Text();
18     private final static IntWritable mapOutputValue = new IntWritable(1);
19
20     @Override
21     protected void setup(Context context) throws IOException, InterruptedException {
22         super.setup(context);
23     }
24
25     @Override
26     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
27         String lineValue = value.toString();
28         StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
29         while(stringTokenizer.hasMoreTokens()){
30             String wordValue = stringTokenizer.nextToken();
31             mapOutputkey.set(wordValue);
32             context.write(mapOutputkey,mapOutputValue);
33         }
34     }
35
36     @Override
37     protected void cleanup(Context context) throws IOException, InterruptedException {
38         super.cleanup(context);
39     }
40
41     @Override
42     public void run(Context context) throws IOException, InterruptedException {
43         super.run(context);
44     }
45 }


Reducer

 1 package com.scb.jason.reducer;
 2
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6
 7 import java.io.IOException;
 8
 9 /**
10  * Created by Administrator on 2017/7/23.
11  */
12 // Step 2: Reduce Class
13 public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
14
15     private IntWritable outputValue = new IntWritable();
16
17     @Override
18     protected void setup(Context context) throws IOException, InterruptedException {
19         super.setup(context);
20     }
21
22     @Override
23     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
24         int sum = 0;
25         for(IntWritable value:values){
26             sum += value.get();
27         }
28         outputValue.set(sum);
29         context.write(key,outputValue);
30     }
31
32     @Override
33     protected void cleanup(Context context) throws IOException, InterruptedException {
34         super.cleanup(context);
35     }
36
37     @Override
38     public void run(Context context) throws IOException, InterruptedException {
39         super.run(context);
40     }
41 }


Driver

 1 package com.scb.jason.driver;
 2
 3 import com.scb.jason.mapper.WordCountMapper;
 4 import com.scb.jason.reducer.WordCountReducer;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9 import org.apache.hadoop.io.IntWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.util.Tool;
15 import org.apache.hadoop.util.ToolRunner;
16
17 import java.io.IOException;
18
19 /**
20  * Created by Administrator on 2017/7/17.
21  */
22 public class WordCount extends Configured implements Tool {
23
24     // Step 3: Driver
25     public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
26         Configuration configuration = new Configuration();
27         FileSystem fs = FileSystem.get(configuration);
28
29         Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
30         job.setJarByClass(this.getClass());
31
32         Path input = new Path(args[0]);
33         FileInputFormat.addInputPath(job,input);
34
35         job.setMapperClass(WordCountMapper.class);
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(IntWritable.class);
38
39         job.setReducerClass(WordCountReducer.class);
40         job.setMapOutputKeyClass(Text.class);
41         job.setMapOutputValueClass(IntWritable.class);
42
43         Path outPath = new Path(args[1]);
44         if(fs.exists(outPath)){
45             fs.delete(outPath,true);
46         }
47         FileOutputFormat.setOutputPath(job,outPath);
48
49         boolean isSuccess = job.waitForCompletion(true);
50         return isSuccess?1:0;
51     }
52
53     public static void main(String[] args) throws Exception {
54         int exitCode = ToolRunner.run(new WordCount(),args);
55         System.exit(exitCode);
56     }
57
58 }


Hadoop Mapreduce模板

时间: 2024-07-30 16:56:12

Hadoop Mapreduce模板的相关文章

hadoop MapReduce Yarn运行机制

原 Hadoop MapReduce 框架的问题 原hadoop的MapReduce框架图 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败.重启等操作. TaskTracker 是 Ma

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Hadoop MapReduce编程 API入门系列之处理Excel通话记录(二十)

不多说,直接上代码. 与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示.我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件夹. 2016-12-12 20:04:10,203 INFO [zhouls.bigdata.myMapReduce.ExcelContactCount.ExcelContactCount$ExcelMapper] - Map processing finished2016-12-12 20:04:10,203 I

Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)

不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.friend; import org.apache.hadoop.io.Text; public class Fof extends Text{//自定义Fof,表示f1和f2关系 public Fof(){//无参构造 super(); } public Fof(String a,String b){//有参构造 super(getFof(a, b)); } public static Strin

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop MapReduce Next Generation - Setting up a Single Node Cluster

Hadoop MapReduce Next Generation - Setting up a Single Node Cluster. Purpose This document describes how to set up and configure a single-node Hadoop installation so that you can quickly perform simple operations using Hadoop MapReduce and the Hadoop

使用Python实现Hadoop MapReduce程序

转自:使用Python实现Hadoop MapReduce程序 英文原文:Writing an Hadoop MapReduce Program in Python 根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapReduce程序. 尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C+

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)

不多说,直接上代码. 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件. 代码 package zhouls.bigdata.myMapReduce.flowsum; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException; import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableCompa