类似于Linux管道重定向机制,前一个Map的输出直接作为下一个Map的输入,形成一个流水线。设想这样一个场景:在Map阶段,数据经过mapper1和mapper2处理;在Reduce阶段,数据经过sort和shuffle后,交给对应的reducer处理。reducer处理后并没有直接写入到Hdfs, 而是交给了另一个mapper3处理,它产生的结果最终写到hdfs的输出目录中。
注意:对任意MR作业,Map和Reduce阶段可以有无限个Mapper,但reduer只能有一个。
package chain; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VLongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Chain { /** * 手机 5000 * 需求: 电脑 2000 * 在第一个Mapper1里面过滤大于10000的数据 衣服 300 * 第二个Mapper2里面过滤掉大于100-10000的数据 鞋子 1200 * Reduce里面进行分类汇总并输出 裙子 434 * Reduce后的Mapper3里过滤掉商品名长度大于3的数据 手套 12 * 图书 12510 * 小商品 5 * 预计处理完的结果是: 小商品 3 * 手套 12 订餐 2 * 订餐 2 * @throws Exception */ public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(Chain.class); /** * 配置mapper1 * 注意此处带参数的构造函数:new Configuration(false) */ Configuration map1Conf = new Configuration(false); ChainMapper.addMapper(job, //主作业 Mapper1.class, //待加入的map class LongWritable.class, //待加入map class的输入key类型 Text.class, //待加入map class的输入value类型 Text.class, //待加入map class的输出key类型 VLongWritable.class, //待加入map class的输出value类型 map1Conf); //待加入map class的配置信息 //配置mapper2 ChainMapper.addMapper(job, Mapper2.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false)); /** * 配置Reducer * 注意此处使用的是setReducer()方法 */ ChainReducer.setReducer(job, Reducer_Only.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false)); //配置mapper3 ChainReducer.addMapper(job, Mapper3.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false)); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } //Mapper1 public static class Mapper1 extends Mapper<LongWritable, Text, Text, VLongWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * Hadoop中默认的输入格式 TextOutputFormat 只支持UTF-8格式 * 所以解决GBK中文输出乱码问题,有两个方法: * 1. 先将输入的Text类型的value转换为字节数组 * 2. 然后使用String的构造器String(byte[] bytes, int offset, int length, Charset charset) * 3. 通过使用指定的charset解码指定的byte子数组,构造一个新的String */ String line=new String(value.getBytes(),0,value.getLength(),"GBK"); String[] splited = line.split(" "); //过滤大于10000的数据 if(Integer.parseInt(splited[1])<10000L){ context.write(new Text(splited[0]), new VLongWritable(Long.parseLong(splited[1]))); } } } //Mapper2 public static class Mapper2 extends Mapper<Text, VLongWritable, Text, VLongWritable>{ @Override protected void map(Text key, VLongWritable value, Context context) throws IOException, InterruptedException { //过滤100-10000间的数据 if(value.get()<100L){ context.write(key, value); } } } //Reducer public static class Reducer_Only extends Reducer<Text, VLongWritable, Text, VLongWritable>{ @Override protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context) throws IOException, InterruptedException { long sumLong=0L; for(VLongWritable vLongWritable : v2s){ sumLong += vLongWritable.get(); context.write(key, new VLongWritable(sumLong)); } } } //Mapper3 public static class Mapper3 extends Mapper<Text, VLongWritable, Text, VLongWritable>{ @Override protected void map(Text key, VLongWritable value, Context context) throws IOException, InterruptedException { String line=new String(key.getBytes(),0,key.getLength(),"GBK"); //过滤商品名称长度大于3 if(line.length()<3){ context.write(key, value); } } } }
时间: 2024-10-10 20:34:21