MapReduce表连接之半连接SemiJoin

一:背景

SemiJoin,一般称为半连接,其原理是在Map端过滤掉一些不需要join的数据,从而大大减少了reduce和Shuffle的时间,因为我们知道,如果仅仅使用Reduce端连接,那么如果一份数据,存在大量的无效数据,而这些数据在join中并不需要,但是因为没有做过预处理,所以这些数据直到真正执行reduce函数时,才被定义为无效数据,但是这个时候已经执行过了Shuffle、merge还有sort操作,所以这部分无效的数据就浪费了大量的网络IO和磁盘IO,所以在整体来讲,这是一种降低性能的表现,如果存在的无效数据越多,那么这种趋势就越明显。之所以会出现半连接,这其实是reduce端连接的一个变种,只不过是我们在Map端过滤掉了一些无效的数据,所以减少了reduce过程的Shuffle时间,所以能获取一个性能的提升。

二:技术实现

(1):利用DistributedCache将小表分发到各个节点上,在Map过程的setup()函数里,读取缓存里的文件,只将小表的连接键存储在hashSet中。

(2):在map()函数执行时,对每一条数据进行判断,如果这条数据的连接键为空或者在hashSet里不存在,那么则认为这条数据无效,使条数据也不参与reduce的过程。

注:从以上步骤就可以发现,这种做法很明显可以提升join性能,但是要注意的是小表的key如果非常大的话,可能会出现OOM的情况,这时我们就需要考虑其他的连接方式了。

测试数据如下:

/semi_jon/a.txt:

[java] view plain copy

  1. 1,三劫散仙,13575468248
  2. 2,凤舞九天,18965235874
  3. 3,忙忙碌碌,15986854789
  4. 4,少林寺方丈,15698745862

/semi_join/b.txt:

[java] view plain copy

  1. 3,A,99,2013-03-05
  2. 1,B,89,2013-02-05
  3. 2,C,69,2013-03-09
  4. 3,D,56,2013-06-07
  5. 5,E,100,2013-09-09
  6. 6,H,200,2014-01-10

#需求就是对上面两个表做半连接。

实现代码如下:

[java] view plain copy

  1. public class SemiJoin {
  2. // 定义输入路径
  3. private static  String INPUT_PATH1 = "";
  4. private static  String INPUT_PATH2 = "";
  5. // 定义输出路径
  6. private static  String OUT_PATH = "";
  7. public static void main(String[] args) {
  8. try {
  9. // 创建配置信息
  10. Configuration conf = new Configuration();
  11. // 获取命令行的参数
  12. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  13. // 当参数违法时,中断程序
  14. if (otherArgs.length != 3) {
  15. System.err.println("Usage:Semi_join<in1> <in2> <out>");
  16. System.exit(1);
  17. }
  18. // 给路径赋值
  19. INPUT_PATH1 = otherArgs[0];
  20. INPUT_PATH2 = otherArgs[1];
  21. OUT_PATH = otherArgs[2];
  22. // 把小表添加到共享Cache里
  23. DistributedCache.addCacheFile(new URI(INPUT_PATH1), conf);
  24. // 创建文件系统
  25. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
  26. // 如果输出目录存在,我们就删除
  27. if (fileSystem.exists(new Path(OUT_PATH))) {
  28. fileSystem.delete(new Path(OUT_PATH), true);
  29. }
  30. // 创建任务
  31. Job job = new Job(conf, SemiJoin.class.getName());
  32. // 设置成jar包
  33. job.setJarByClass(SemiJoin.class);
  34. //1.1 设置输入目录和设置输入数据格式化的类
  35. FileInputFormat.setInputPaths(job, INPUT_PATH2);
  36. job.setInputFormatClass(TextInputFormat.class);
  37. //1.2设置自定义Mapper类和设置map函数输出数据的key和value的类型
  38. job.setMapperClass(SemiJoinMapper.class);
  39. job.setMapOutputKeyClass(Text.class);
  40. job.setMapOutputValueClass(CombineEntity.class);
  41. //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
  42. job.setPartitionerClass(HashPartitioner.class);
  43. job.setNumReduceTasks(1);
  44. //1.4 排序
  45. //1.5 归约
  46. //2.1 Shuffle把数据从Map端拷贝到Reduce端。
  47. //2.2 指定Reducer类和输出key和value的类型
  48. job.setReducerClass(SemiJoinReducer.class);
  49. job.setOutputKeyClass(Text.class);
  50. job.setOutputValueClass(Text.class);
  51. //2.3 指定输出的路径和设置输出的格式化类
  52. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  53. job.setOutputFormatClass(TextOutputFormat.class);
  54. // 提交作业 退出
  55. System.exit(job.waitForCompletion(true) ? 0 : 1);
  56. } catch (Exception e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. /**
  61. * 自定义Mapper函数
  62. *
  63. * @author 廖*民 time : 2015年1月21日下午8:40:43
  64. * @version
  65. */
  66. public static class SemiJoinMapper extends Mapper<LongWritable, Text, Text, CombineEntity> {
  67. // 创建相关对象
  68. private CombineEntity combine = new CombineEntity();
  69. private Text flag = new Text();
  70. private Text joinKey = new Text();
  71. private Text secondPart = new Text();
  72. // 存储小表的key
  73. private HashSet<String> joinKeySet = new HashSet<String>();
  74. @Override
  75. protected void setup(Mapper<LongWritable, Text, Text, CombineEntity>.Context context) throws IOException, InterruptedException {
  76. // 读取文件流
  77. BufferedReader br = null;
  78. String temp = "";
  79. // 获取DistributedCached里面的共享文件
  80. Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
  81. System.out.println("=================================>"+paths.length);
  82. // 遍历path数组
  83. for (Path path : paths) {
  84. if (path.getName().endsWith("a.txt")) {
  85. // 创建读取文件流
  86. br = new BufferedReader(new FileReader(path.toString()));
  87. // 读取数据
  88. while ((temp = br.readLine()) != null) {
  89. // 按","切割
  90. String[] splits = temp.split(",");
  91. // 将key加入小表中
  92. joinKeySet.add(splits[0]);
  93. }
  94. }
  95. }
  96. }
  97. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, CombineEntity>.Context context) throws IOException,
  98. InterruptedException {
  99. // 获取文件输入路径
  100. String pathName = ((FileSplit) (context.getInputSplit())).getPath().toString();
  101. System.out.println("Map中获取的路径没有a.txt吧?"+pathName);
  102. if (pathName.endsWith("a.txt")) {
  103. String[] valuesTemps = value.toString().split(",");
  104. System.out.println("进入a.txt==================>a中的字符串:"+value.toString());
  105. // 在这里过滤必须要连接的字符
  106. if (joinKeySet.contains(valuesTemps[0])) {
  107. // 设置标志位
  108. flag.set("0");
  109. // 设置连接键
  110. joinKey.set(valuesTemps[0]);
  111. // 设置第二部分
  112. secondPart.set(valuesTemps[1] + "\t" + valuesTemps[1]);
  113. // 封装实体
  114. combine.setFlag(flag);
  115. combine.setJoinKey(joinKey);
  116. combine.setSecondPart(secondPart);
  117. // 写出去
  118. context.write(combine.getJoinKey(), combine);
  119. } else {
  120. System.out.println("a.txt里");
  121. System.out.println("小表中没有此记录");
  122. for (String v : valuesTemps) {
  123. System.out.println(v + " ");
  124. }
  125. return;
  126. }
  127. } else if (pathName.endsWith("b.txt")) {
  128. System.out.println("进入b.txt==================>b中的字符串:"+value.toString());
  129. // 切割
  130. String[] valueItems = value.toString().split(",");
  131. // 判断是否在集合中
  132. if (joinKeySet.contains(valueItems[0])) {
  133. // 设置标志位
  134. flag.set("1");
  135. // 设置连接键
  136. joinKey.set(valueItems[0]);
  137. // 设置第二部分数据,注意:不同文件的列数不一样
  138. secondPart.set(valueItems[1] + "\t" + valueItems[2] + "\t" + valueItems[3]);
  139. // 封装实体
  140. combine.setFlag(flag);
  141. combine.setJoinKey(joinKey);
  142. combine.setSecondPart(secondPart);
  143. // 写出去
  144. context.write(combine.getJoinKey(), combine);
  145. } else {
  146. System.out.println("b.txt里");
  147. System.out.println("小表中没有此记录");
  148. for (String v : valueItems) {
  149. System.out.println(v + " ");
  150. }
  151. return;
  152. }
  153. }
  154. }
  155. }
  156. /**
  157. * 自定义Reducer函数
  158. *
  159. * @author 廖*民 time : 2015年1月21日下午8:41:01
  160. * @version
  161. */
  162. public static class SemiJoinReducer extends Reducer<Text, CombineEntity, Text, Text> {
  163. // 存储一个分组中左表信息
  164. private List<Text> leftTable = new ArrayList<Text>();
  165. // 存储一个分组中右表数据
  166. private List<Text> rightTable = new ArrayList<Text>();
  167. private Text secondPart = null;
  168. private Text outPut = new Text();
  169. // 一个分组调用一次reduce()函数
  170. protected void reduce(Text key, Iterable<CombineEntity> values, Reducer<Text, CombineEntity, Text, Text>.Context context) throws IOException,
  171. InterruptedException {
  172. // 清空分组数据
  173. leftTable.clear();
  174. rightTable.clear();
  175. // 将不同文件的数据,分别放在不同的集合中;注意数据过大时,会出现OOM
  176. for (CombineEntity val : values) {
  177. this.secondPart = new Text(val.getSecondPart().toString());
  178. System.out.println("传到reduce中的secondPart部分:" + this.secondPart);
  179. System.out.println("难道A表中就没有数据:" + val.getFlag().toString().trim().equals("0"));
  180. // 左表
  181. if (val.getFlag().toString().trim().equals("0")) {
  182. leftTable.add(secondPart);
  183. } else if (val.getFlag().toString().trim().equals("1")) {
  184. rightTable.add(secondPart);
  185. }
  186. }
  187. for (Text val : leftTable){
  188. System.out.println("A 表中的数据为:" + val);
  189. }
  190. for (Text val : rightTable){
  191. System.out.println("B 表中的数据为:" + val);
  192. }
  193. // 做笛卡尔积输出我们想要的连接数据
  194. for (Text left : leftTable) {
  195. for (Text right : rightTable) {
  196. outPut.set(left + "\t" + right);
  197. // 将数据写出
  198. context.write(key, outPut);
  199. }
  200. }
  201. }
  202. }
  203. }
  204. /**
  205. * 自定义实体
  206. *
  207. * @author 廖*民 time : 2015年1月21日下午8:41:18
  208. * @version
  209. */
  210. class CombineEntity implements WritableComparable<CombineEntity> {
  211. // 连接key
  212. private Text joinKey;
  213. // 文件来源标志
  214. private Text flag;
  215. // 除了键外的其他部分的数据
  216. private Text secondPart;
  217. // 无参构造函数
  218. public CombineEntity() {
  219. this.joinKey = new Text();
  220. this.flag = new Text();
  221. this.secondPart = new Text();
  222. }
  223. // 有参构造函数
  224. public CombineEntity(Text joinKey, Text flag, Text secondPart) {
  225. this.joinKey = joinKey;
  226. this.flag = flag;
  227. this.secondPart = secondPart;
  228. }
  229. public Text getJoinKey() {
  230. return joinKey;
  231. }
  232. public void setJoinKey(Text joinKey) {
  233. this.joinKey = joinKey;
  234. }
  235. public Text getFlag() {
  236. return flag;
  237. }
  238. public void setFlag(Text flag) {
  239. this.flag = flag;
  240. }
  241. public Text getSecondPart() {
  242. return secondPart;
  243. }
  244. public void setSecondPart(Text secondPart) {
  245. this.secondPart = secondPart;
  246. }
  247. public void write(DataOutput out) throws IOException {
  248. this.joinKey.write(out);
  249. this.flag.write(out);
  250. this.secondPart.write(out);
  251. }
  252. public void readFields(DataInput in) throws IOException {
  253. this.joinKey.readFields(in);
  254. this.flag.readFields(in);
  255. this.secondPart.readFields(in);
  256. }
  257. public int compareTo(CombineEntity o) {
  258. return this.joinKey.compareTo(o.joinKey);
  259. }
  260. }

打成jar包,运行命令如下:

[java] view plain copy

  1. hadoop jar join.jar /semi_join/a.txt /semi_join/* /out

注:a.txt是要加入到内存的表,/semi_join/*是要进入map()函数进行比对的目录,/out是输出目录。

程序运行的结果为:

时间: 2025-01-15 23:44:27

MapReduce表连接之半连接SemiJoin的相关文章

MapReduce表连接操作之Reduce端join

一:背景 Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程. 二:技术实现 基本思路 (1):Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的. (2):在reduce处理函数中,按照标识对数据进行处理. (3):然后根据Key去join来求出结果直接输出. 数据准备 准备好下面两张表: (1):tb_a(以下简称表A) [java] view plain copy id  name 1

MapReduce 表连接

题目描述: 根据给定的关系 child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma 打印出grandchild和grandparents.例如Lucy是Tom 的母亲,而Mary是lucy的目前,那么mary就是tom的外婆 思路:

Hadoop阅读笔记(三)——深入MapReduce排序和单表连接

继上篇了解了使用MapReduce计算平均数以及去重后,我们再来一探MapReduce在排序以及单表关联上的处理方法.在MapReduce系列的第一篇就有说过,MapReduce不仅是一种分布式的计算方法,更是一种解决问题的新思维.新思路.将原先看似可以一条龙似的处理一刀切成两端,一端是Map.一端是Reduce,Map负责分,Reduce负责合. 1.MapReduce排序 问题模型: 给出多个数据文件输入如: sortfile1.txt 11 13 15 17 19 21 23 25 27

MapReduce 多表连接

题目描述: 现在有两个文件,1为存放公司名字和城市ID,2为存放城市ID和城市名 表一: factoryname,addressed Beijing Red Star,1 Shenzhen Thunder,3 Guangzhou Honda,2 Beijing Rising,1 Guangzhou Development Bank,2 Tencent,3 Back of Beijing,1 表2: 1,Beijing 2,Guangzhou 3,Shenzhen 4,Xian 现在要求输出公司名

Hadoop 多表连接

环境:CentOS6.6  Hadoop1.2.1 样例数据: [[email protected] ~]$ hadoop fs -cat ./in/7/dept.txt 10 ACCOUNTING NEW YORK 20 RESEARCH DALLAS 30 SALES CHICAGO 40 OPERATIONS BOSTON [[email protected] ~]$ hadoop fs -cat ./in/7/emp.txt 7369 SMITH CLERK 7902 17-12月-80

表连接方式

--表连接方式1.Hash join:优化器使用两个表中较小的表(或数据源)利用连接键(HASH KEY)在内存中建立散列表(HASH表),然后扫描较大的表并探测散列表,找出与散列表匹配的行.如果hash表太大则无法在内存中完全放入,这时候优化器就分成不同区,把不能放入内存的分区放入到磁盘临时段,此时有较大的临时段来提高i/o性能.默认值指定方式:USE_HASH(table_name1 table_name2) 2.Nested loops:工作方式是从一张表(驱动表outer table,结

Hadoop 学习之单表连接

我在学习hadoop, 在看 陆嘉恒编著的hadoop实战,其中有单表连接的程序,我现在整理一下思路.这个问题是课本上的例子. 给出 child-parent 表, 要求输出 grandchild-grandparent 表 样例输入: child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesee Terry Alice Terry Jesee Philip Terry Ph

SQL Server三种表连接原理

http://msdn.microsoft.com/zh-cn/library/dn144699.aspx 简介 在SQL Server中,我们所常见的表与表之间的Inner Join,Outer Join都会被执行引擎根据所选的列,数据上是否有索引,所选数据的选择性转化为Loop Join,Merge Join,Hash Join这三种物理连接中的一种.理解这三种物理连接是理解在表连接时解决性能问题的基础,下面我来对这三种连接的原理,适用场景进行描述. 嵌套循环连接(Nested Loop J

每天一点数据库之-----Day 9 表连接

每天一点数据库之-----Day 9 表连接 ----转载请注明出处:coder-pig 本节引言: 前面我们学习的都是针对一个表来进行操作的,上一节虽然学了UNION这个可以操作多个表 的关键字,但是又有两个限制(查询字段数目与数据类型要相同),本节就来学习通过表连接 来操作多个表!而表连接又有四种: 内连接,外连接,交叉连接与自连接,那么接下来开始本节学习! 数据准备: 在开始学习前,我们先准备一些数据,建三个表:T_Stu,T_Class,T_Dorm 建T_Stu表: CREATE TA