reduce端的join算法:
例子:
商品表数据 product:
pid
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000
订单表数据 order:
pid
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
mapReduce可以实现sql语句的功能:select 。。。。。。from product p left join order o on p.pid = o.pid
思路:将关联的条件作为map输出的key。
缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。
替代解决方案: map端join实现方式。
代码:
ReduceJoinMain:
package cn.itcast.demo4.reduceJoin; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner; public class ReduceJoinMain extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), ReduceJoinMain.class.getSimpleName());// job.setJarByClass(ReduceJoinMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\input")); job.setMapperClass(ReduceJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ReduceJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\reduce_join_output")); boolean b = job.waitForCompletion(true); return b?0:1; } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args); System.exit(run); } }
ReduceJoinMapper:
package cn.itcast.demo4.reduceJoin; import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); if(line.startsWith("p")){ context.write(new Text(split[0]),value); }else{ context.write(new Text(split[2]),value); } }}
ReduceJoinReducer:
package cn.itcast.demo4.reduceJoin; import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class ReduceJoinReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String firP = ""; String secP = ""; for(Text text:values){ String value = text.toString(); if(value!=null && !"".equals(value)) { if(value.startsWith("p")) secP += value; else firP += value+"\t"; } } context.write(key,new Text(firP+secP)); }}
原文地址:https://www.cnblogs.com/mediocreWorld/p/11028591.html
时间: 2024-10-12 07:29:45