MapReduce编程实例5

前提准备:

1.hadoop安装运行正常。Hadoop安装配置请参考:Ubuntu下 Hadoop 1.2.1 配置安装

2.集成开发环境正常。集成开发环境配置请参考 :Ubuntu 搭建Hadoop源码阅读环境

MapReduce编程实例:

MapReduce编程实例(一),详细介绍在集成环境中运行第一个MapReduce程序 WordCount及代码分析

MapReduce编程实例(二),计算学生平均成绩

MapReduce编程实例(三),数据去重

MapReduce编程实例(四),排序

MapReduce编程实例(五),MapReduce实现单表关联

MapReduce编程实例(六),MapReduce实现多表关联

单表关联:

描述:

单表的自连接求解问题。如下表,根据child-parent表列出grandchild-grandparent表的值。

child parent
Tom Lucy
Tom Jim
Lucy David
Lucy Lili
Jim Lilei
Jim SuSan
Lily Green
Lily Bians
Green Well
Green MillShell
Havid James
James LiT
Richard Cheng
Cheng LiHua

问题分析:

显然需要分解为左右两张表来进行自连接,而左右两张表其实都是child-parent表,通过parent字段做key值进行连接。结合MapReduce的特性,MapReduce会在shuffle过程把相同的key放在一起传到Reduce进行处理。OK,这下有思路了,将左表的parent作为key输出,将右表的child做为key输出,这样shuffle之后很自然的,左右就连接在一起了,有木有!然后通过对左右表进行求迪卡尔积便得到所需的数据。

[java] view plain copy

  1. package com.t.hadoop;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.util.GenericOptionsParser;
  13. /**
  14. * 单表关联
  15. * @author daT [email protected]
  16. *
  17. */
  18. public class STJoin {
  19. public static int time = 0;
  20. public static class STJoinMapper extends Mapper<Object, Text, Text, Text>{
  21. @Override
  22. protected void map(Object key, Text value, Context context)
  23. throws IOException, InterruptedException {
  24. String childName = new String();
  25. String parentName = new String();
  26. String relation = new String();
  27. String line = value.toString();
  28. int i =0;
  29. while(line.charAt(i)!=‘ ‘){
  30. i++;
  31. }
  32. String[] values = {line.substring(0,i),line.substring(i+1)};
  33. if(values[0].compareTo("child") != 0){
  34. childName = values[0];
  35. parentName = values[1];
  36. relation = "1";//左右表分区标志
  37. context.write(new Text(parentName),new Text(relation+"+"+childName));//左表
  38. relation = "2";
  39. context.write(new Text(childName), new Text(relation+"+"+parentName));//右表
  40. }
  41. }
  42. }
  43. public static class STJoinReduce extends Reducer<Text, Text, Text, Text>{
  44. @Override
  45. protected void reduce(Text key, Iterable<Text> values,Context context)
  46. throws IOException, InterruptedException {
  47. if(time ==0){//输出表头
  48. context.write(new Text("grandChild"), new Text("grandParent"));
  49. time ++;
  50. }
  51. int grandChildNum = 0;
  52. String[] grandChild = new String[10];
  53. int grandParentNum = 0;
  54. String[] grandParent = new String[10];
  55. Iterator<Text> ite = values.iterator();
  56. while(ite.hasNext()){
  57. String record = ite.next().toString();
  58. int len = record.length();
  59. int i = 2;
  60. if(len ==0)  continue;
  61. char relation = record.charAt(0);
  62. if(relation == ‘1‘){//是左表拿child
  63. String childName = new String();
  64. while(i < len){//解析name
  65. childName = childName + record.charAt(i);
  66. i++;
  67. }
  68. grandChild[grandChildNum] = childName;
  69. grandChildNum++;
  70. }else{//是右表拿parent
  71. String parentName = new String();
  72. while(i < len){//解析name
  73. parentName = parentName + record.charAt(i);
  74. i++;
  75. }
  76. grandParent[grandParentNum] = parentName;
  77. grandParentNum++;
  78. }
  79. }
  80. //左右两表求迪卡尔积
  81. if(grandChildNum!=0&&grandParentNum!=0){
  82. for(int m=0;m<grandChildNum;m++){
  83. for(int n=0;n<grandParentNum;n++){
  84. System.out.println("grandChild "+grandChild[m] +" grandParent "+ grandParent[n]);
  85. context.write(new Text(grandChild[m]),new Text(grandParent[n]));
  86. }
  87. }
  88. }
  89. }
  90. }
  91. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
  92. Configuration conf = new Configuration();
  93. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  94. if(otherArgs.length<2){
  95. System.out.println("parameter error");
  96. System.exit(2);
  97. }
  98. Job job = new Job(conf);
  99. job.setJarByClass(STJoin.class);
  100. job.setMapperClass(STJoinMapper.class);
  101. job.setReducerClass(STJoinReduce.class);
  102. job.setOutputKeyClass(Text.class);
  103. job.setOutputValueClass(Text.class);
  104. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  105. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  106. System.exit(job.waitForCompletion(true)?0:1);
  107. }
  108. }

传入参数:

hdfs://localhost:9000/user/dat/stjon_input hdfs://localhost:9000/user/dat/stjon_output

输出结果:

grandChild grandParent
Richard LiHua
Lily Well
Lily MillShell
Havid LiT
Tom Lilei
Tom SuSan
Tom Lili
Tom David

OK~!欢迎同学们多多交流~~

时间: 2024-08-30 01:36:38

MapReduce编程实例5的相关文章

Hadoop 综合揭秘——MapReduce 编程实例(详细介绍 Combine、Partitioner、WritableComparable、WritableComparator 使用方式)

前言 本文主要介绍 MapReduce 的原理及开发,讲解如何利用 Combine.Partitioner.WritableComparator等组件对数据进行排序筛选聚合分组的功能.由于文章是针对开发人员所编写的,在阅读本文前,文章假设读者已经对Hadoop的工作原理.安装过程有一定的了解,因此对Hadoop的安装就不多作说明.请确保源代码运行在Hadoop 2.x以上版本,并以伪分布形式安装以方便进行调试(单机版会对 Partitioner 功能进行限制).文章主要利用例子介绍如何利用 Ma

MapReduce编程实例

<机器学习实战> 1top K 问题. 在搜索中,我们常常需要搜索最近热门的K个搜索词,这是典型的top k问题.就可以分解成两个mapreduce.先完成统计词频,然后找出词频最高的的查询词.第一个作业是典型的WordCOUNT,第一个作业是用MAp函数.第二个任务是汇总每个map任务得到的查询词的前提,并输出频率最高的前k个查询词. 2k均值聚类算法 这是一个基于距离的距离算法.它采用距离作为相似性的评价指标,因为两个对象距离越近,他们相似度就越大.该算法可以抽奖成为:给定正整数k和n的对

MapReduce编程实战之“高级特性”

本篇介绍MapReduce的一些高级特性,如计数器.数据集的排序和连接.计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的""连接(join)操作. 计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计.计数器还可用于辅助诊断系统故障.对于大型分布式系统来说,获取计数器比分析日志文件容易的多. 示例一:气温缺失及不规则数据计数器 import java.io.IOException; import

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码.这里不多赘述,直接送上代码. MRUnit 框架 MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用.MRUnit针对不同测试对象使用不同的Driver: MapDriver:针对单独的Map测试  ReduceDriver:针对单独的Reduce测试    MapReduceDri

MapReduce编程模型及其在Hadoop上的实现

转自:https://www.zybuluo.com/frank-shaw/note/206604 MapReduce基本过程 关于MapReduce中数据流的传输过程,下图是一个经典演示:  关于上图,可以做出以下逐步分析: 输入数据(待处理)首先会被切割分片,每一个分片都会复制多份到HDFS中.上图默认的是分片已经存在于HDFS中. Hadoop会在存储有输入数据分片(HDFS中的数据)的节点上运行map任务,可以获得最佳性能(数据TaskTracker优化,节省带宽). 在运行完map任务

暴力破解MD5的实现(MapReduce编程)

本文主要介绍MapReduce编程模型的原理和基于Hadoop的MD5暴力破解思路. 一.MapReduce的基本原理 Hadoop作为一个分布式架构的实现方案,它的核心思想包括以下几个方面:HDFS文件系统,MapReduce的编程模型以及RPC框架.无论是怎样的架构,一个系统的关键无非是存储结构和业务逻辑.HDFS分布式文件系统是整个Hadoop的基础.在HDFS文件系统之中,大文件被分割成很多的数据块,每一块都有可能分布在集群的不同节点中.也就是说在HDFS文件系统中,文件的情况是这样的:

MapReduce编程实践

一.MapReduce编程思想 学些MapRedcue主要是学习它的编程思想,在MR的编程模型中,主要思想是把对数据的运算流程分成map和reduce两个阶段: Map阶段:读取原始数据,形成key-value数据(map方法) Reduce阶段:把map阶段的key-value数据按照相同的key进行分组聚合(reduce方法) 它其实是一种数据逻辑运算模型,对于这样的运算模型,有一些成熟的具体软件实现,比如hadoop中的mapreduce框架.spark等,例如在hadoop的mr框架中,

Hadoop 实践(二) Mapreduce 编程

Mapreduce 编程,本文以WordCount  为例:实现文件字符统计 在eclipse 里面搭建一个java项目,引入hadoop lib目录下的jar,和 hadoop主目录下的jar. 新建WordCount 类: package org.scf.wordcount; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.co

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co