Hadoop: MapReduce2多个job串行处理

复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:

1. 求Sum

2. 求Count

3. 计算平均数

每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

  1 package yjmyzz.mr.job.link;
  2
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.fs.Path;
  5 import org.apache.hadoop.io.DoubleWritable;
  6 import org.apache.hadoop.io.LongWritable;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 13 import yjmyzz.util.HDFSUtil;
 14
 15 import java.io.IOException;
 16
 17
 18 public class Avg2 {
 19
 20     private static final Text TEXT_SUM = new Text("SUM");
 21     private static final Text TEXT_COUNT = new Text("COUNT");
 22     private static final Text TEXT_AVG = new Text("AVG");
 23
 24     //计算Sum
 25     public static class SumMapper
 26             extends Mapper<LongWritable, Text, Text, LongWritable> {
 27
 28         public long sum = 0;
 29
 30         public void map(LongWritable key, Text value, Context context)
 31                 throws IOException, InterruptedException {
 32             sum += Long.parseLong(value.toString());
 33         }
 34
 35         protected void cleanup(Context context) throws IOException, InterruptedException {
 36             context.write(TEXT_SUM, new LongWritable(sum));
 37         }
 38
 39     }
 40
 41     public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 42
 43         public long sum = 0;
 44
 45         public void reduce(Text key, Iterable<LongWritable> values, Context context)
 46                 throws IOException, InterruptedException {
 47             for (LongWritable v : values) {
 48                 sum += v.get();
 49             }
 50             context.write(TEXT_SUM, new LongWritable(sum));
 51         }
 52
 53     }
 54
 55     //计算Count
 56     public static class CountMapper
 57             extends Mapper<LongWritable, Text, Text, LongWritable> {
 58
 59         public long count = 0;
 60
 61         public void map(LongWritable key, Text value, Context context)
 62                 throws IOException, InterruptedException {
 63             count += 1;
 64         }
 65
 66         protected void cleanup(Context context) throws IOException, InterruptedException {
 67             context.write(TEXT_COUNT, new LongWritable(count));
 68         }
 69
 70     }
 71
 72     public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 73
 74         public long count = 0;
 75
 76         public void reduce(Text key, Iterable<LongWritable> values, Context context)
 77                 throws IOException, InterruptedException {
 78             for (LongWritable v : values) {
 79                 count += v.get();
 80             }
 81             context.write(TEXT_COUNT, new LongWritable(count));
 82         }
 83
 84     }
 85
 86     //计算Avg
 87     public static class AvgMapper
 88             extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
 89
 90         public long count = 0;
 91         public long sum = 0;
 92
 93         public void map(LongWritable key, Text value, Context context)
 94                 throws IOException, InterruptedException {
 95             String[] v = value.toString().split("\t");
 96             if (v[0].equals("COUNT")) {
 97                 count = Long.parseLong(v[1]);
 98             } else if (v[0].equals("SUM")) {
 99                 sum = Long.parseLong(v[1]);
100             }
101         }
102
103         protected void cleanup(Context context) throws IOException, InterruptedException {
104             context.write(new LongWritable(sum), new LongWritable(count));
105         }
106
107     }
108
109
110     public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> {
111
112         public long sum = 0;
113         public long count = 0;
114
115         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
116                 throws IOException, InterruptedException {
117             sum += key.get();
118             for (LongWritable v : values) {
119                 count += v.get();
120             }
121         }
122
123         protected void cleanup(Context context) throws IOException, InterruptedException {
124             context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));
125         }
126
127     }
128
129
130     public static void main(String[] args) throws Exception {
131
132         Configuration conf = new Configuration();
133
134         String inputPath = "/input/duplicate.txt";
135         String maxOutputPath = "/output/max/";
136         String countOutputPath = "/output/count/";
137         String avgOutputPath = "/output/avg/";
138
139         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
140         HDFSUtil.deleteFile(conf, maxOutputPath);
141         HDFSUtil.deleteFile(conf, countOutputPath);
142         HDFSUtil.deleteFile(conf, avgOutputPath);
143
144         Job job1 = Job.getInstance(conf, "Sum");
145         job1.setJarByClass(Avg2.class);
146         job1.setMapperClass(SumMapper.class);
147         job1.setCombinerClass(SumReducer.class);
148         job1.setReducerClass(SumReducer.class);
149         job1.setOutputKeyClass(Text.class);
150         job1.setOutputValueClass(LongWritable.class);
151         FileInputFormat.addInputPath(job1, new Path(inputPath));
152         FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));
153
154
155         Job job2 = Job.getInstance(conf, "Count");
156         job2.setJarByClass(Avg2.class);
157         job2.setMapperClass(CountMapper.class);
158         job2.setCombinerClass(CountReducer.class);
159         job2.setReducerClass(CountReducer.class);
160         job2.setOutputKeyClass(Text.class);
161         job2.setOutputValueClass(LongWritable.class);
162         FileInputFormat.addInputPath(job2, new Path(inputPath));
163         FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));
164
165
166         Job job3 = Job.getInstance(conf, "Average");
167         job3.setJarByClass(Avg2.class);
168         job3.setMapperClass(AvgMapper.class);
169         job3.setReducerClass(AvgReducer.class);
170         job3.setMapOutputKeyClass(LongWritable.class);
171         job3.setMapOutputValueClass(LongWritable.class);
172         job3.setOutputKeyClass(Text.class);
173         job3.setOutputValueClass(DoubleWritable.class);
174
175         //将job1及job2的输出为做job3的输入
176         FileInputFormat.addInputPath(job3, new Path(maxOutputPath));
177         FileInputFormat.addInputPath(job3, new Path(countOutputPath));
178         FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));
179
180         //提交job1及job2,并等待完成
181         if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {
182             System.exit(job3.waitForCompletion(true) ? 0 : 1);
183         }
184
185     }
186
187
188 }

输入文本在上一篇可以找到,上面这段代码的主要思路:

1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下

2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下

时间: 2024-10-09 08:29:39

Hadoop: MapReduce2多个job串行处理的相关文章

串行化--深度复制

package day; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import org.junit.jupiter.api.Test; /** * DataInputStream+DataOutputStream * @author ASUS * */ public class DataIOStreamDemo { /** * 写入 * @

PHP中的抽象类与抽象方法/静态属性和静态方法/PHP中的单利模式(单态模式)/串行化与反串行化(序列化与反序列化)/约束类型/魔术方法小结

  前  言  OOP  学习了好久的PHP,今天来总结一下PHP中的抽象类与抽象方法/静态属性和静态方法/PHP中的单利模式(单态模式)/串行化与反串行化(序列化与反序列化). 1  PHP中的抽象类与抽象方法 1.什么是抽象方法?              没有方法体 {} 的方法,必须使用abstract 关键字修饰.这样的方,我们叫做抽象方法.                    abstract function say(); //    抽象方法 2.什么是抽象类?        

IOS多线程知识总结/队列概念/GCD/串行/并行/同步/异步

进程:正在进行中的程序被称为进程,负责程序运行的内存分配;每一个进程都有自己独立的虚拟内存空间: 线程:线程是进程中一个独立的执行路径(控制单元);一个进程中至少包含一条线程,即主线程. 队列:dispatch_queue_t,一种先进先出的数据结构,线程的创建和回收不需要程序员操作,由队列负责. 串行队列:队列中的任务只会顺序执行(类似跑步) dispatch_queue_t q = dispatch_queue_create(“....”, dispatch_queue_serial); 并

串行工作模式之同步移位寄存器的输入输出方式

主要用于扩展并行输入或输出口.数据有RXD(P3.0)引脚输入或输出,同步移位脉冲由TXD(P3.1)引脚输出.发送和接受均为8位数据.低位在前,高位在后. //串行口工作模式0 #include <reg52.h> #define uchar unsigned char #define uint unsigned int void delayms(uint xms) { uint i,j; for(i=xms;i>0;i--) for(j=110;j>0;j--); } void

Node.js 实现串行化流程控制

为了演示如何实现串行流程控制,我们准备做个小程序,让它从一个随机选择的RSS预定源中获取一片标题和URL,并显示出来. RSS预定源列表放在rss_feeds.txt文件中,内容如下: http://feed.cnblogs.com/blog/u/376823/rss http://lambda-the-ultimate.org/rss.xml 运行程序前我们需要安装两个模块:request模块是个经过简化的HTTP客户端,你可以用它获取RSS数据.htmlparser模块能把原始的RSS数据转

【iOS面试系列-2】多线程中同步、异步和串行、并行之间的逻辑关系(必考,必须掌握)

一.同步.异步和串行.并行 任务串行执行就是每次只有一个任务被执行,任务并发执行就是在同一时间可以有多个任务被执行. 一个同步函数只在完成了它预定的任务后才返回.一个异步函数,刚好相反,会立即返回,预定的任务会完成但不会等它完成.因此,一个异步函数不会阻塞当前线程去执行下一个函数. (来源:http://www.cocoachina.com/industry/20140428/8248.html) 队列分为串行和并行 任务的执行分为同步和异步 -------  队列只是负责任务的调度,而不负责任

SPI、I2C、UART三种串行总线协议的区别和SPI接口介绍(转)

SPI.I2C.UART三种串行总线协议的区别 第一个区别当然是名字: SPI(Serial Peripheral Interface:串行外设接口); I2C(INTER IC BUS) UART(Universal Asynchronous Receiver Transmitter:通用异步收发器) 第二,区别在电气信号线上: SPI总线由三条信号线组成:串行时钟(SCLK).串行数据输出(SDO).串行数据输入(SDI).SPI总线可以实现多个SPI设备互相连接.提供SPI串行时钟的SPI

对象的序列化(串行化)分析(一)

对象的序列化(串行化)序列化概念:(1)对象的寿命通常随着生成该对象的程序的终止而终止.有时候,可能需要将对象的状态保存下 来,在需要时再将对象恢复.我们把对象的这种能记录自己的状态以便将来再生的能力.叫作对象的持续性(persistence).对象通过写出描述自己状 态的数值来记录自己 ,这个过程叫对象的串行化(Serialization-连续) .(2)一个对象随着创建而存在,随着程序结束而结束.那 如果我要保存一个对象的状态呢?Java序列化能够将对象的状态写入byte流存储起来,也从其他

linux总结应用之三 安装和配置串行,并行链路

 (一)远程站的设置: 最简单的做法是在远程的机器专为拨号连接建立PPP 登陆项 : ppp:off:700:700:ppp acount:/home/ppp:home/ppp/ppplogin 为账号建立起始目录; # mkdir   /home/ppp #  chown  ppp. /home/ppp 注意在新加的行中,将下列程序作为登陆的shell: /home/ppp/ppplogin 实际上,它不是shell程序.而是在远程机上用来启动pppd守护程序的script. 它的典型形式如下