2018-08-04 期 MapReduce倒排索引编程案例2(jobControll方式)

1、第一阶段MapReduce任务程序

package cn.itcast.bigdata.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**

* 利用MapReduce实现输入多个文件中单词在每个文件中出现的次数,输出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 实现方法:采用倒排索引算法并结合jobControll实现

* 本案例中所有的Mapper、Reducer、Job均采用匿名内部类实现

* @author songjq

*

*/

public class IndexStepOne {

/**

* 第一阶段Mapper处理后输出数据格式为

* <k2> <v2>

* <hello:a.txt> <1>

* <hello:a.txt> <1>

* <hello:b.txt> <1>

* @author songjq

*

*/

static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

/**

* 格式:<hello-->a.txt,1><helle-->b.txt,1>

*/

private Text tkey = new Text();

private IntWritable tvalue = new IntWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

FileSplit inputSplit = (FileSplit) context.getInputSplit();

String fileName = inputSplit.getPath().getName();

String line = value.toString();

String[] split = line.split(" ");

for (String val : split) {

tkey.set(val + "-->" + fileName);

context.write(tkey, tvalue);

}

}

}

/**

* 第一阶段Mapper输出数据格式为

* <k2> <v2>

* <hello:a.txt> <1>

* <hello:a.txt> <1>

* <hello:b.txt> <1>

* 第一阶段Reducer处理后输出到HDFS数据格式为

* <k3> <v3>

* <hello> <a.txt-->2>

* <hello> <b.txt-->1>

* @author songjq

*

*/

static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, LongWritable> {

private LongWritable tvalue = new LongWritable(0);

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)

throws IOException, InterruptedException {

long count = 0;

for(IntWritable value:values) {

count++;

}

tvalue.set(count);

ctx.write(key, tvalue);

}

}

}

2、第二阶段MapReduce任务程序

package cn.itcast.bigdata.index;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

/**

* 利用MapReduce实现输入多个文件中单词在每个文件中出现的次数,输出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 实现方法:采用倒排索引算法并结合jobControll实现

* 本案例中所有的Mapper、Reducer、Job均采用匿名内部类实现

* @author songjq

*

*/

public class IndexStepTwo {

/**

* 第二阶段Mapper

* 第二阶段Mapper输入数据为第一阶段Reducer输出到HDFS的数据,格式为

* hello a.txt-->2

* hello b.txt-->1

* 通过第二阶段Mapper处理,输出数据格式为

* <k2> <v2>

* <hello> <a.txt-->2,b.txt-->1>

* @author songjq

*

*/

static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] split = line.split("\t");

if(split.length>1) {

String[] split2 = split[0].split("-->");

tkey.set(split2[0]);

if(split2.length>1) {

tvalue.set(split2[1]+"-->"+split[1]);

context.write(tkey, tvalue);

}

}

}

}

/**

* 第二阶段Reducer

* 通过第二阶段Reducer处理后,为最终输出结果,输出格式为

* <k4> <v4>

* <hello> <(a.txt 2,b.txt 1)>

* @author songjq

*

*/

static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{

private Text tval = new Text();

@Override

protected void reduce(Text key, Iterable<Text> values, Context ctx)

throws IOException, InterruptedException {

StringBuffer sb = new StringBuffer();

for(Text value:values) {

sb.append(value+" ");

}

tval.set(sb.toString());

ctx.write(key, tval);

}

}

}

3、利用jobControll来实现依赖任务的提交

package cn.itcast.bigdata.index;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneMapper;

import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneReducer;

import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoMapper;

import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoReducer;

/**

* 简单的job串联可以使用jobControll来实现 更复杂的job的调度可以考虑用shell脚本来写,或者干脆用现成的任务调度工具oozie来做

* 这里使用简单的jobControll来实现两个阶段MapReduce任务依赖提交处理

* 由于第二阶段的Mapper输入需要依赖第一阶段Reducer的输出,因此可以利用jobControll来实现第二阶段Mapper的等待,直到

* 第一阶段Reducer输出后,第二阶段的job才开始提交处理

* 核心方法:

* controlledJob2.addDependingJob(controlledJob1);

* @author songjq

*

*/

public class OnceSubmitClient {

public static void main(String[] args) throws Exception {

// 构造第一阶段的基本job对象job1

Configuration conf1 = new Configuration();

Job job1 = Job.getInstance(conf1, "inexStepOne");

job1.setJarByClass(OnceSubmitClient.class);

job1.setMapperClass(IndexStepOneMapper.class);

job1.setReducerClass(IndexStepOneReducer.class);

job1.setMapOutputKeyClass(Text.class);

job1.setMapOutputValueClass(IntWritable.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileInputFormat.setInputPaths(job1, new Path(args[0]));

FileOutputFormat.setOutputPath(job1, new Path(args[1]));

// 构造第二阶段的基本job对象job2

Configuration conf2 = new Configuration();

Job job2 = Job.getInstance(conf2, "inexStepTwo");

job2.setJarByClass(OnceSubmitClient.class);

job2.setMapperClass(IndexStepTwoMapper.class);

job2.setReducerClass(IndexStepTwoReducer.class);

job2.setMapOutputKeyClass(Text.class);

job2.setMapOutputValueClass(Text.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(Text.class);

// 第二个job的输出是第一个job的输入

FileInputFormat.setInputPaths(job2, new Path(args[1]));

FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// ControlledJob是基本的job的封装

ControlledJob controlledJob1 = new ControlledJob(conf1);

// 将job1封装到controlledJob1中去

controlledJob1.setJob(job1);

ControlledJob controlledJob2 = new ControlledJob(conf2);

// 将job2封装到controlledJob2中去

controlledJob2.setJob(job2);

// 先构造一个job控制器

JobControl jobControl = new JobControl("index");

// 指定两个job之间的依赖关系

controlledJob2.addDependingJob(controlledJob1);

// 向job控制器中添加job

jobControl.addJob(controlledJob1);

jobControl.addJob(controlledJob2);

// 创建一个线程去启动jobControl

Thread thread = new Thread(jobControl);

thread.start();

// 如果job没有运行完,主线程就等等

while (!jobControl.allFinished()) {

thread.sleep(500);

}

int succeedSize = jobControl.getSuccessfulJobList().size();

//0正常退出 1异常退出

System.exit(succeedSize == 2 ? 0 : 1);

}

}

原文地址:http://blog.51cto.com/2951890/2154751

时间: 2024-11-10 08:24:46

2018-08-04 期 MapReduce倒排索引编程案例2(jobControll方式)的相关文章

2018-08-03 期 MapReduce倒排索引编程案例1(Combiner方式)

package cn.sjq.bigdata.inverted.index; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.map

新手C#string类常用函数的学习2018.08.04

ToLower()用于将字符串变为小写,注意字符串的不可变特性,需要重新赋值给另一个字符串变量. s = s.ToLower();//字符串具有不可变性,转换后需要重新赋值,不可仅有s.ToLower(); 这可以使用户的输入不区分大小写,例如验证码. ToUpper()用于将字符串全部变为大写,与上面类似. Trim()可以用于去掉两边的空格. string s1 = " a b c "; s1 = s1.Trim();//用于去除字符串两边的空格 Console.WriteLine

MapReduce 单词统计案例编程

MapReduce 单词统计案例编程 一.在Linux环境安装Eclipse软件 1.   解压tar包 下载安装包eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz到/opt/software目录下. 解压到/opt/tools目录下: [[email protected] tools]$ tar -zxf /opt/sofeware/eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz -C /opt/tool

Hadoop集群(第9期)_MapReduce初级案例 - 虾皮 - 博客园

body{ font-family: "Microsoft YaHei UI","Microsoft YaHei",SimSun,"Segoe UI",Tahoma,Helvetica,Sans-Serif,"Microsoft YaHei", Georgia,Helvetica,Arial,sans-serif,宋体, PMingLiU,serif; font-size: 10.5pt; line-height: 1.5;}

2014/08/04 – Backbonejs

[来自: Backbone.js 开发秘笈 第1章] 各种模型实际上是通过扩展其基类 Backbone.Model 实现的.同理,定义的集合是靠扩展其基类 Backbone.Collection 而实现的. 控制器的功能被分散实现在 Backbone.Router 和 Backbone.View 当中. 路由器负责处理 URL 的变化,并且委派一个视图来继续处理应用.路由器(异步)获取模型后,随即触发一个视图的更新操作. 视图负责监听 DOM 事件.它要么对模型进行更新,要么通过路由器转移到应用

hadoop笔记之MapReduce的应用案例(利用MapReduce进行排序)

MapReduce的应用案例(利用MapReduce进行排序) MapReduce的应用案例(利用MapReduce进行排序) 思路: Reduce之后直接进行结果合并 具体样例: 程序名:Sort.java import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; impo

面向对象编程案例02--显示地调用父类的__init__()

# -*- coding: utf-8 -*- #python 27 #xiaodeng #面向对象编程案例02--显示地调用父类的__init__() ''' 继承是面向对象的重要特征之一,继承是2个类或多个类之间的父子关系,子类继承父类的所有共有实例变量和方法. 继承实现了代码的重用,减少代码的编写量 python在类名后用圆括号来表示继承关系,括号中的类表示父类 如果父类有init方法,则子类会显示地调用其父类的init方法,不需要重写定义.如果子类需要拓展父类的init方法,则可以父类的

2014.08.04,读书,《Matlab概率与数理统计分析》-第1章 MATLAB的数据基础

第1章 MATLAB数据基础 虽然一直间或使用MATLAB,但从来没有系统的学习过,现在开始也不晚.先对几个重点或者平时忽略的要点做下笔记. %后的所有文字为注释,多条命令可以放在一行,但要用逗号或分号隔开,命令后的逗号表示显示结果,分号表示禁止显示结果. 符号…表示语句的余下部分将出现在下一行,但不能出现在变量名或运算符之间. M文件又称Script文件,具有全局性,文件中的所有变量在整个工作环境中有效. 命令: cumsum(x,dim),求累积和,matlab中cumsum函数通常用于计算

Bootstrap 3.2.0 源码试读 2014/08/04

第一部分 normalize.css 用于解决不同浏览器下显示不一致的问题 8至12行 html {   font-family: sans-serif;    /* 设置默认字体为 sans-serif */   -webkit-text-size-adjust: 100%;    /* 手机等设备转屏时,字体大小随着自动调整 */       -ms-text-size-adjust: 100%;    /* 但是如果禁用了缩放功能,则此设置无效 */ } text-size-adjust由