一、方法介绍
假设要进行join的数据分别来自File1和File2.
参考:https://blog.csdn.net/yimingsilence/article/details/70242604
1.1 reduce side join
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
1.2 map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
1.3 SemiJoin
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。
实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
1.4 reduce side join + BloomFilter
在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。
BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。
因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
二、二次排序
在Hadoop中,默认情况下是按照key进行排序,如果要按照value进行排序怎么办?即:对于同一个key,reduce函数接收到的value list是按照value排序的。这种应用需求在join操作中很常见,比如,希望相同的key中,小表对应的value排在前面。
有两种方法进行二次排序,分别为:buffer and in memory sort和 value-to-key conversion。
对于buffer and in memory sort,主要思想是:在reduce()函数中,将某个key对应的所有value保存下来,然后进行排序。 这种方法最大的缺点是:可能会造成out of memory。
对于value-to-key conversion,主要思想是:将key和部分value拼接成一个组合key(实现WritableComparable接口或者调用setSortComparatorClass函数),这样reduce获取的结果便是先按key排序,后按value排序的结果,需要注意的是,用户需要自己实现Paritioner,以便只按照key进行数据划分。Hadoop显式的支持二次排序,在Configuration类中有个setGroupingComparatorClass()方法,可用于设置排序group的key值,
三、reduce端join实现
1. 数据准备
首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。
准备好下面两张表:
(1)m_ys_lab_jointest_a(以下简称表A)
建表语句为:
create table if not exists m_ys_lab_jointest_a ( id bigint, name string ) row format delimited fields terminated by ‘9‘ lines terminated by ‘10‘ stored as textfile;
数据
id name
1 北京
2 天津
3 河北
4 山西
5 内蒙古
6 辽宁
7 吉林
8 黑龙江
(2)m_ys_lab_jointest_b(以下简称表B)
建表语句为:
create table if not exists m_ys_lab_jointest_b ( id bigint, statyear bigint, num bigint ) row format delimited fields terminated by ‘9‘ lines terminated by ‘10‘ stored as textfile;
数据
id statyear num
1 2010 1962
1 2011 2019
2 2010 1299
2 2011 1355
4 2010 3574
4 2011 3593
9 2010 2303
9 2011 2347
我们的目的是,以id为key做join操作,得到以下表:
m_ys_lab_jointest_ab:
id name statyear num
1 北京 2011 2019
1 北京 2010 1962
2 天津 2011 1355
2 天津 2010 1299
4 山西 2011 3593
4 山西 2010 3574
2. 计算模型
整个计算过程是:
(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
3. 代码:
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Vector; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; /** * MapReduce实现Join操作 */ public class MapRedJoin { public static final String DELIMITER = "\u0009"; // 字段分隔符 // map过程 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { public void configure(JobConf job) { super.configure(job); } public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, ClassCastException { // 获取输入文件的全路径和名称 String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString(); // 获取记录字符串 String line = value.toString(); // 抛弃空记录 if (line == null || line.equals("")) return; // 处理来自表A的记录 if (filePath.contains("m_ys_lab_jointest_a")) { String[] values = line.split(DELIMITER); // 按分隔符分割出字段 if (values.length < 2) return; String id = values[0]; // id String name = values[1]; // name output.collect(new Text(id), new Text("a#"+name)); } // 处理来自表B的记录 else if (filePath.contains("m_ys_lab_jointest_b")) { String[] values = line.split(DELIMITER); // 按分隔符分割出字段 if (values.length < 3) return; String id = values[0]; // id String statyear = values[1]; // statyear String num = values[2]; //num output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num)); } } } // reduce过程 public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { Vector<String> vecA = new Vector<String>(); // 存放来自表A的值 Vector<String> vecB = new Vector<String>(); // 存放来自表B的值 while (values.hasNext()) { String value = values.next().toString(); if (value.startsWith("a#")) { vecA.add(value.substring(2)); } else if (value.startsWith("b#")) { vecB.add(value.substring(2)); } } int sizeA = vecA.size(); int sizeB = vecB.size(); // 遍历两个向量 int i, j; for (i = 0; i < sizeA; i ++) { for (j = 0; j < sizeB; j ++) { output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j))); } } } } protected void configJob(JobConf conf) { conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setOutputFormat(ReportOutFormat.class); } }
四、Map端join
使用场景:一张表十分小、一张表很大。
用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放到Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
1. 数据
Map side join中的left outer join
左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":
id name orderid city_code is_show
0 其他 9999 9999 0
1 长春 1 901 1
2 吉林 2 902 1
3 四平 3 903 1
4 松原 4 904 1
5 通化 5 905 1
6 辽源 6 906 1
7 白城 7 907 1
8 白山 8 908 1
9 延吉 9 909 1
table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
tb_user_profiles.dat文件内容,分隔符为"|":
userID network flow cityID
1 2G 123 1
2 3G 333 2
3 3G 555 1
4 2G 777 3
5 3G 666 4
结果:
1 长春 1 901 1 1 2G 123
1 长春 1 901 1 3 3G 555
2 吉林 2 902 1 2 3G 333
3 四平 3 903 1 4 2G 777
4 松原 4 904 1 5 3G 666
2. 代码:
package com.mr.mapSideJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MapSideJoinMain extends Configured implements Tool{ private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class); public static class LeftOutJoinMapper extends Mapper { private HashMap city_info = new HashMap(); private Text outPutKey = new Text(); private Text outPutValue = new Text(); private String mapInputStr = null; private String mapInputSpit[] = null; private String city_secondPart = null; /** * 此方法在每个task开始之前执行,这里主要用作从DistributedCache * 中取到tb_dim_city文件,并将里边记录取出放到内存中。 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = null; //获得当前作业的DistributedCache相关文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String cityInfo = null; for(Path p : distributePaths){ if(p.toString().endsWith("tb_dim_city.dat")){ //读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(cityInfo=br.readLine())){ String[] cityPart = cityInfo.split("\\|",5); if(cityPart.length ==5){ city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]); } } } } } /** * Map端的实现相当简单,直接判断tb_user_profiles.dat中的 * cityID是否存在我的map中就ok了,这样就可以实现Map Join了 */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //排掉空行 if(value == null || value.toString().equals("")){ return; } mapInputStr = value.toString(); mapInputSpit = mapInputStr.split("\\|",4); //过滤非法记录 if(mapInputSpit.length != 4){ return; } //判断链接字段是否在map中存在 city_secondPart = city_info.get(mapInputSpit[3]); if(city_secondPart != null){ this.outPutKey.set(mapInputSpit[3]); this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]); context.write(outPutKey, outPutValue); } } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); //获得配置文件对象 DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件 Job job=new Job(conf,"MapJoinMR"); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径 job.setJarByClass(MapSideJoinMain.class); job.setMapperClass(LeftOutJoinMapper.class); job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式 //设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); //设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new MapSideJoinMain(),args); System.exit(returnCode); } catch (Exception e) { // TODO Auto-generated catch block logger.error(e.getMessage()); } } }
DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。
五、SemiJoin
SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。
1. 数据
reudce side join中的left outer join
左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
tb_dim_city.dat文件内容,分隔符为"|":
id name orderid city_code is_show
0 其他 9999 9999 0
1 长春 1 901 1
2 吉林 2 902 1
3 四平 3 903 1
4 松原 4 904 1
5 通化 5 905 1
6 辽源 6 906 1
7 白城 7 907 1
8 白山 8 908 1
9 延吉 9 909 1
----------------------------------
table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
tb_user_profiles.dat文件内容,分隔符为"|":
userID network flow cityID
1 2G 123 1
2 3G 333 2
3 3G 555 1
4 2G 777 3
5 3G 666 4
------------------------------
joinKey.dat内容:
city_code
1
2
3
4
------------------------------
结果:
1 长春 1 901 1 1 2G 123
1 长春 1 901 1 3 3G 555
2 吉林 2 902 1 2 3G 333
3 四平 3 903 1 4 2G 777
4 松原 4 904 1 5 3G 666
2. 代码
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SemiJoin extends Configured implements Tool{ private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class); public static class SemiJoinMapper extends Mapper { private CombineValues combineValues = new CombineValues(); private HashSet joinKeySet = new HashSet(); private Text flag = new Text(); private Text joinKey = new Text(); private Text secondPart = new Text(); /** * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = null; //获得当前作业的DistributedCache相关文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String joinKeyStr = null; for(Path p : distributePaths){ if(p.toString().endsWith("joinKey.dat")){ //读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(joinKeyStr=br.readLine())){ joinKeySet.add(joinKeyStr); } } } } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //获得文件输入路径 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); //数据来自tb_dim_city.dat文件,标志即为"0" if(pathName.endsWith("tb_dim_city.dat")){ String[] valueItems = value.toString().split("\\|"); //过滤格式错误的记录 if(valueItems.length != 5){ return; } //过滤掉不需要参加join的记录 if(joinKeySet.contains(valueItems[0])){ flag.set("0"); joinKey.set(valueItems[0]); secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); }else{ return ; } }//数据来自于tb_user_profiles.dat,标志即为"1" else if(pathName.endsWith("tb_user_profiles.dat")){ String[] valueItems = value.toString().split("\\|"); //过滤格式错误的记录 if(valueItems.length != 4){ return; } //过滤掉不需要参加join的记录 if(joinKeySet.contains(valueItems[3])){ flag.set("1"); joinKey.set(valueItems[3]); secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); }else{ return ; } } } } public static class SemiJoinReducer extends Reducer { //存储一个分组中的左表信息 private ArrayList leftTable = new ArrayList(); //存储一个分组中的右表信息 private ArrayList rightTable = new ArrayList(); private Text secondPar = null; private Text output = new Text(); /** * 一个分组调用一次reduce函数 */ @Override protected void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException { leftTable.clear(); rightTable.clear(); /** * 将分组中的元素按照文件分别进行存放 * 这种方法要注意的问题: * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM, * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最 * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。 */ for(CombineValues cv : value){ secondPar = new Text(cv.getSecondPart().toString()); //左表tb_dim_city if("0".equals(cv.getFlag().toString().trim())){ leftTable.add(secondPar); } //右表tb_user_profiles else if("1".equals(cv.getFlag().toString().trim())){ rightTable.add(secondPar); } } logger.info("tb_dim_city:"+leftTable.toString()); logger.info("tb_user_profiles:"+rightTable.toString()); for(Text leftPart : leftTable){ for(Text rightPart : rightTable){ output.set(leftPart+ "\t" + rightPart); context.write(key, output); } } } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); //获得配置文件对象 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf); Job job=new Job(conf,"LeftOutJoinMR"); job.setJarByClass(SemiJoin.class); FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径 job.setMapperClass(SemiJoinMapper.class); job.setReducerClass(SemiJoinReducer.class); job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式 //设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); //设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new SemiJoin(),args); System.exit(returnCode); } catch (Exception e) { logger.error(e.getMessage()); } } }
blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。
原文地址:https://www.cnblogs.com/duaner92/p/9596770.html