Hadoop 自连接

环境:CentOS6.6  Hadoop1.2.1

样例数据:第一列是 child ,第二列是 parent ,用空格分开,要求输出 grandchild  grandparent

[[email protected] ~]$ hadoop fs -cat ./in/genealogy.txt
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma

程序:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SelfJoinMapper extends Mapper<Text, Text, Text, Text> {
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value, new Text("1" + key.toString())); //左表的 parent 做 key
        context.write(key, new Text("2" + value.toString())); //右表的 child 做 key
    }
}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        List<String> childList = new ArrayList<String>();
        List<String> grandList = new ArrayList<String>();
        for (Text value : values) {
            if (value.toString().startsWith("1")) {
                childList.add(value.toString().substring(1));
            } else {
                grandList.add(value.toString().substring(1));
            }
        }
        for (String child : childList) {
            for (String grand : grandList) {
                context.write(new Text(child), new Text(grand));
            }
        }
    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SelfJoin {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: SelfJoin <input path> <output path>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); //设置分割符
        //conf.set("mapred.jar", "./out/SelfJoin.jar");
        //conf.set("fs.default.name", "hdfs://hadoop1:9000");
        //conf.set("mapred.job.tracker", "hadoop1:9001");

        Job job = new Job(conf);
        job.setInputFormatClass(KeyValueTextInputFormat.class); //设置 InputFormat
        job.setJarByClass(SelfJoin.class);
        job.setJobName("SelfJoin");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SelfJoinMapper.class);
        job.setReducerClass(SelfJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

查看程序输出如下:

[[email protected] ~]$ hadoop fs -cat ./out/9/part-r-00000
Tom	Alice
Tom	Jesse
Jone	Alice
Jone	Jesse
Tom	Mary
Tom	Ben
Jone	Mary
Jone	Ben
Philip	Alice
Philip	Jesse
Mark	Alice
Mark	Jesse

时间: 2024-10-09 19:04:26

Hadoop 自连接的相关文章

Hadoop阅读笔记(三)——深入MapReduce排序和单表连接

继上篇了解了使用MapReduce计算平均数以及去重后,我们再来一探MapReduce在排序以及单表关联上的处理方法.在MapReduce系列的第一篇就有说过,MapReduce不仅是一种分布式的计算方法,更是一种解决问题的新思维.新思路.将原先看似可以一条龙似的处理一刀切成两端,一端是Map.一端是Reduce,Map负责分,Reduce负责合. 1.MapReduce排序 问题模型: 给出多个数据文件输入如: sortfile1.txt 11 13 15 17 19 21 23 25 27

Spark-MapReduce编程-自连接(Scala)

关于SQL和Hadoop的实现参考这里 MapReduce编程-自连接 这里用相同的原理,使用spark实现.本人也是刚学Scala,可能写的不好,还请指正. object SelfUion { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SelfUnion") val sc = new SparkContext(conf) val cpFile = sc.textFile("c

hadoop实例

一篇讲得很好的hadoop实例,非常适合初学者学习hadoop. 本文转载自:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html,感谢作者虾皮的分享. 1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进

Hadoop集群(第9期)_MapReduce初级案例

1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进行去重.数据文件中的每行都是一个数据. 样例输入如下所示: 1)file1: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7

Hadoop MapReduce编程学习

一直在搞spark,也没时间弄hadoop,不过Hadoop基本的编程我觉得我还是要会吧,看到一篇不错的文章,不过应该应用于hadoop2.0以前,因为代码中有  conf.set("mapred.job.tracker", "192.168.1.2:9001");新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的mapre

Hadoop集群(第9期)_MapReduce初级案例 - 虾皮 - 博客园

body{ font-family: "Microsoft YaHei UI","Microsoft YaHei",SimSun,"Segoe UI",Tahoma,Helvetica,Sans-Serif,"Microsoft YaHei", Georgia,Helvetica,Arial,sans-serif,宋体, PMingLiU,serif; font-size: 10.5pt; line-height: 1.5;}

hadoop 入门实例【转】

原文链接:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html 1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进行去重.数据文件中的每行都是一个数据. 样例输入如下所示: 1)file1: 2012-3

2018-08-02 期 MapReduce实现多表查询自连接

1.员工对象EmployeeBean package cn.sjq.bigdata.mr.self.join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 员工对象EmployeeBean * 由于该对象需要做为Mapper的输出,因此需要实现Writable接口 * @author song

Hadoop:Windows 7 32 Bit 编译与运行

所需工具 1.Windows 7 32 Bit OS(你懂的) 2.Apache Hadoop 2.2.0-bin(hadoop-2.2.0.tar.gz) 3.Apache Hadoop 2.2.0-src(hadoop-2.2.0-src.tar.gz) 3.JDK 1.7 4.Maven 3.2.1(apache-maven-3.2.1-bin.zip) 5.Protocol Buffers 2.5.0 6.Unix command-line tool Cygwin(Setup-x86.e