MapReduce编程系列 — 5:单表关联

1、项目名称:

2、项目数据:

chile    parent
Tom    Lucy
Tom    Jack
Jone    Lucy
Jone    Jack
Lucy    Mary
Lucy    Ben
Jack    Alice
Jack    Jesse
Terry    Alice
Terry    Jesse
Philip    Terry
Philip    Alima
Mark    Terry
Mark    Alma

3、设计思路:

分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表。连接结果中除去连接的两列就是所需要的结果——grandchild-grandparent表。要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接;其次就是连接列的设置;最后就是结果的整理。考虑到MapReduce的shuffle过程会将相同的key值放在一起,所以可以将map结果的key值设置成待连接的列,然后列中相同的值就自然会连接在一起了。再与最开始的分析联系起来:要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在map阶段将读入数据分割成child和parent之后,会将parent设置成key,child设置成value进行输出,并作为左表;再将同一对child和parent中child设置成key,parent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中在加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了grandchild和grandparent关系。取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,然后对两个数组求笛卡儿积就是最后的结果了。

4、程序代码:

版本1(详细版):

package com.stjoin;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class STjoin {
    public static int time = 0;
    //map将输入分割成child和parent,然后正序输出一次作为右表,
    //反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志
    public static class Map extends Mapper<Object, Text, Text, Text>{
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
            String childname = new String();
            String parentname = new String();
            String relationtype = new String();
            String line = value.toString();
            System.out.println("mapper...............");
            int i = 0;
            while(line.charAt(i) != ‘    ‘){
                i++;
            }
            String[] values = {line.substring(0, i),line.substring(i+1)};
            System.out.println("child:"+values[0]+"  parent:"+values[1]);
            if(values[0].compareTo("child") != 0){//如果是child,则为0,否则为-1
                childname=values[0];
                parentname=values[1];
                //左表
                relationtype="1";
                context.write(new Text(values[1]),new Text(relationtype+"+"+childname+"+"+parentname));
                System.out.println("key:"+values[1]+"  value: "+relationtype+"+"+childname+"+"+parentname);
                //右表
                relationtype = "2";
                context.write(new Text(values[0]), new Text(relationtype+"+"+childname+"+"+parentname));
                System.out.println("key:"+values[0]+"  value: "+relationtype+"+"+childname+"+"+parentname);
            }
        }
    }
    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            System.out.println("reduce.....................");
            System.out.println("key:"+key+"  values:"+values);
            //输出表头
            if(time==0){
                context.write(new Text("grandchild"), new Text("grandparent"));
                time++;
            }
            int grandchildnum = 0;
            String grandchild[] = new String[10];
            int grandparentnum = 0;
            String grandparent[] = new String[10];

            Iterator ite = values.iterator();
            while(ite.hasNext()){
                String record = ite.next().toString();
                System.out.println("record: "+record);

                int len = record.length();
                int i = 2;
                if(len==0) continue;
                char relationtype = record.charAt(0);
                String childname = new String();
                String parentname = new String();
                //获取value-list中的value的child
                while(record.charAt(i)!=‘+‘){
                    childname = childname + record.charAt(i);
                    i++;
                }
                System.out.println("childname: "+childname);
                i=i+1;
                //获取value-list中的value的parent
                while(i<len){
                    parentname=parentname+record.charAt(i);
                    i++;
                }
                System.out.println("parentname: "+parentname);
                //左表,取出child放入grandchild数组中
                if (relationtype==‘1‘) {
                    grandchild[grandchildnum] = childname;
                    grandchildnum++;
                }
                //右表,取出child放入grandparent数组中
                else{
                    grandparent[grandparentnum]=parentname;
                    grandparentnum++;
                }
            }
            //grandchild和grandparent数组求笛卡儿积
            if(grandparentnum!=0&&grandchildnum!=0){
                for(int m = 0 ; m < grandchildnum ; m++){
                    for(int n = 0 ; n < grandparentnum; n++){
                        context.write(new Text(grandchild[m]), new Text(grandparent[n]));
                        System.out.println("grandchild: "+grandchild[m]+"  grandparent: "+grandparent[n]);
                    }
                }
            }
        }
    }

    public static void main(String [] args)throws Exception{
        Configuration conf = new Configuration();
        String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length != 2){
            System.err.println("Usage: sort<in><out>");
            System.exit(2);
        }
        Job job = new Job(conf,"single table join");
        job.setJarByClass(STjoin.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true)? 0 : 1);
    }
}

版本2(简化版):

package com.stjoin;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class STjoin {
    public static int time = 0; public static class Map extends Mapper<Object, Text, Text, Text>{
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
            String relationtype = new String();
            String line = value.toString();
            System.out.println("mapper...............");
            int i = 0;
            //遍历方法一:一个字符一个字符对照确定分割点
        /*    while(line.charAt(i) != ‘    ‘){
                i++;
            }
            String[] values = {line.substring(0, i),line.substring(i+1)};
        */
            //遍历方法二:使用迭代器取出child和parent
            String[] values = new String[10];
            StringTokenizer itr = new StringTokenizer(line);
            while(itr.hasMoreTokens()){
                values[i] = itr.nextToken();
                i = i+1;
            }

            System.out.println("child:"+values[0]+"  parent:"+values[1]);
            if(values[0].compareTo("child") != 0){//如果是child,则为0,否则为-1

                relationtype="1";
                context.write(new Text(values[1]),new Text(relationtype+"+"+values[0]));
                System.out.println("key:"+values[1]+"  value: "+relationtype+"+"+values[0]);
                relationtype = "2";
                context.write(new Text(values[0]), new Text(relationtype+"+"+values[1]));
                System.out.println("key:"+values[0]+"  value: "+relationtype+"+"+values[1]);
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
                System.out.println("reduce.....................");
                System.out.println("key:"+key+"  values:"+values);
                if(time==0){
                    context.write(new Text("grandchild"), new Text("grandparent"));
                    time++;
                }
                int grandchildnum = 0;
                String grandchild[] = new String[10];
                int grandparentnum = 0;
                String grandparent[] = new String[10];

                String name = new String();
                //遍历方法一:用迭代器
        //        Iterator ite = values.iterator();
        //        while(ite.hasNext()){

                //遍历方法二:用for循环
                for(Text val : values){
                //    String record = ite.next().toString();
                    String record = val.toString();
                    System.out.println("record: "+record);

                    int i = 2;
                    char relationtype = record.charAt(0);
                    name = record.substring(i);

                    System.out.println("name: "+name);

                    if (relationtype==‘1‘) {
                        grandchild[grandchildnum] = name;
                        grandchildnum++;
                    }
                    else{
                        grandparent[grandparentnum]=name;
                        grandparentnum++;
                    }
                }
                //遍历方法三:就是详细方法的charAt(),一个一个字符遍历
                if(grandparentnum!=0&&grandchildnum!=0){
                    for(int m = 0 ; m < grandchildnum ; m++){
                        for(int n = 0 ; n < grandparentnum; n++){
                            context.write(new Text(grandchild[m]), new Text(grandparent[n]));
                            System.out.println("grandchild: "+grandchild[m]+"  grandparent: "+grandparent[n]);
                        }
                    }
                }
        }
    }
    public static void main(String [] args)throws Exception{
        Configuration conf = new Configuration();
        String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length != 2){
            System.err.println("Usage: sort<in><out>");
            System.exit(2);
        }
        Job job = new Job(conf,"single table join");
        job.setJarByClass(STjoin.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true)? 0 : 1);
    }
}

5、运行过程:

1)详细版的输出:

14/09/22 20:31:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/09/22 20:31:48 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/09/22 20:31:48 INFO input.FileInputFormat: Total input paths to process : 1
14/09/22 20:31:48 WARN snappy.LoadSnappy: Snappy native library not loaded
14/09/22 20:31:48 INFO mapred.JobClient: Running job: job_local_0001
14/09/22 20:31:48 INFO util.ProcessTree: setsid exited with exit code 0
14/09/22 20:31:48 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/22 20:31:48 INFO mapred.MapTask: io.sort.mb = 100
14/09/22 20:31:48 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/22 20:31:48 INFO mapred.MapTask: record buffer = 262144/327680
mapper...............
child:child  parent:parent
mapper...............
child:Tom  parent:Lucy
key:Lucy  value: 1+Tom+Lucy
key:Tom  value: 2+Tom+Lucy
mapper...............
child:Tom  parent:Jack
key:Jack  value: 1+Tom+Jack
key:Tom  value: 2+Tom+Jack
mapper...............
child:Jone  parent:Lucy
key:Lucy  value: 1+Jone+Lucy
key:Jone  value: 2+Jone+Lucy
mapper...............
child:Jone  parent:Jack
key:Jack  value: 1+Jone+Jack
key:Jone  value: 2+Jone+Jack
mapper...............
child:Lucy  parent:Mary
key:Mary  value: 1+Lucy+Mary
key:Lucy  value: 2+Lucy+Mary
mapper...............
child:Lucy  parent:Ben
key:Ben  value: 1+Lucy+Ben
key:Lucy  value: 2+Lucy+Ben
mapper...............
child:Jack  parent:Alice
key:Alice  value: 1+Jack+Alice
14/09/22 20:31:49 INFO mapred.MapTask: Starting flush of map output
key:Jack  value: 2+Jack+Alice
mapper...............
child:Jack  parent:Jesse
key:Jesse  value: 1+Jack+Jesse
key:Jack  value: 2+Jack+Jesse
mapper...............
child:Terry  parent:Alice
key:Alice  value: 1+Terry+Alice
key:Terry  value: 2+Terry+Alice
mapper...............
child:Terry  parent:Jesse
key:Jesse  value: 1+Terry+Jesse
key:Terry  value: 2+Terry+Jesse
mapper...............
child:Philip  parent:Terry
key:Terry  value: 1+Philip+Terry
key:Philip  value: 2+Philip+Terry
mapper...............
child:Philip  parent:Alima
key:Alima  value: 1+Philip+Alima
key:Philip  value: 2+Philip+Alima
mapper...............
child:Mark  parent:Terry
key:Terry  value: 1+Mark+Terry
key:Mark  value: 2+Mark+Terry
mapper...............
child:Mark  parent:Alma
key:Alma  value: 1+Mark+Alma
key:Mark  value: 2+Mark+Alma
14/09/22 20:31:49 INFO mapred.MapTask: Finished spill 0
14/09/22 20:31:49 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/09/22 20:31:49 INFO mapred.JobClient:  map 0% reduce 0%
14/09/22 20:31:51 INFO mapred.LocalJobRunner: 
14/09/22 20:31:51 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0‘ done.
14/09/22 20:31:51 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/22 20:31:51 INFO mapred.LocalJobRunner: 
14/09/22 20:31:51 INFO mapred.Merger: Merging 1 sorted segments
14/09/22 20:31:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 564 bytes
14/09/22 20:31:51 INFO mapred.LocalJobRunner: 
reduce.....................
key:Alice  values:[email protected]
record: 1+Jack+Alice
childname: Jack
parentname: Alice
record: 1+Terry+Alice
childname: Terry
parentname: Alice
reduce.....................
key:Alima  values:[email protected]
record: 1+Philip+Alima
childname: Philip
parentname: Alima
reduce.....................
key:Alma  values:[email protected]
record: 1+Mark+Alma
childname: Mark
parentname: Alma
reduce.....................
key:Ben  values:[email protected]
record: 1+Lucy+Ben
childname: Lucy
parentname: Ben
reduce.....................
key:Jack  values:[email protected]
record: 2+Jack+Alice
childname: Jack
parentname: Alice
record: 2+Jack+Jesse
childname: Jack
parentname: Jesse
record: 1+Tom+Jack
childname: Tom
parentname: Jack
record: 1+Jone+Jack
childname: Jone
parentname: Jack
grandchild: Tom  grandparent: Alice
grandchild: Tom  grandparent: Jesse
grandchild: Jone  grandparent: Alice
grandchild: Jone  grandparent: Jesse
reduce.....................
key:Jesse  values:[email protected]
record: 1+Jack+Jesse
childname: Jack
parentname: Jesse
record: 1+Terry+Jesse
childname: Terry
parentname: Jesse
reduce.....................
key:Jone  values:[email protected]
record: 2+Jone+Lucy
childname: Jone
parentname: Lucy
record: 2+Jone+Jack
childname: Jone
parentname: Jack
reduce.....................
key:Lucy  values:[email protected]
record: 1+Tom+Lucy
childname: Tom
parentname: Lucy
record: 1+Jone+Lucy
childname: Jone
parentname: Lucy
record: 2+Lucy+Mary
childname: Lucy
parentname: Mary
record: 2+Lucy+Ben
childname: Lucy
parentname: Ben
grandchild: Tom  grandparent: Mary
grandchild: Tom  grandparent: Ben
grandchild: Jone  grandparent: Mary
grandchild: Jone  grandparent: Ben
reduce.....................
key:Mark  values:[email protected]
record: 2+Mark+Terry
childname: Mark
parentname: Terry
record: 2+Mark+Alma
childname: Mark
parentname: Alma
reduce.....................
key:Mary  values:[email protected]
record: 1+Lucy+Mary
childname: Lucy
parentname: Mary
reduce.....................
key:Philip  values:[email protected]
record: 2+Philip+Terry
childname: Philip
parentname: Terry
record: 2+Philip+Alima
childname: Philip
parentname: Alima
reduce.....................
key:Terry  values:[email protected]
record: 2+Terry+Alice
childname: Terry
parentname: Alice
record: 2+Terry+Jesse
childname: Terry
parentname: Jesse
record: 1+Philip+Terry
childname: Philip
parentname: Terry
record: 1+Mark+Terry
childname: Mark
parentname: Terry
grandchild: Philip  grandparent: Alice
grandchild: Philip  grandparent: Jesse
grandchild: Mark  grandparent: Alice
grandchild: Mark  grandparent: Jesse
reduce.....................
key:Tom  values:[email protected]
record: 2+Tom+Jack
childname: Tom
parentname: Jack
record: 2+Tom+Lucy
childname: Tom
parentname: Lucy
14/09/22 20:31:52 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/09/22 20:31:52 INFO mapred.LocalJobRunner: 
14/09/22 20:31:52 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/09/22 20:31:52 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://localhost:9000/user/hadoop/stjoin_output07
14/09/22 20:31:52 INFO mapred.JobClient:  map 100% reduce 0%
14/09/22 20:31:54 INFO mapred.LocalJobRunner: reduce > reduce
14/09/22 20:31:54 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0‘ done.
14/09/22 20:31:55 INFO mapred.JobClient:  map 100% reduce 100%
14/09/22 20:31:55 INFO mapred.JobClient: Job complete: job_local_0001
14/09/22 20:31:55 INFO mapred.JobClient: Counters: 22
14/09/22 20:31:55 INFO mapred.JobClient:   Map-Reduce Framework
14/09/22 20:31:55 INFO mapred.JobClient:     Spilled Records=56
14/09/22 20:31:55 INFO mapred.JobClient:     Map output materialized bytes=568
14/09/22 20:31:55 INFO mapred.JobClient:     Reduce input records=28
14/09/22 20:31:55 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/09/22 20:31:55 INFO mapred.JobClient:     Map input records=15
14/09/22 20:31:55 INFO mapred.JobClient:     SPLIT_RAW_BYTES=117
14/09/22 20:31:55 INFO mapred.JobClient:     Map output bytes=506
14/09/22 20:31:55 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/09/22 20:31:55 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/09/22 20:31:55 INFO mapred.JobClient:     Reduce input groups=13
14/09/22 20:31:55 INFO mapred.JobClient:     Combine output records=0
14/09/22 20:31:55 INFO mapred.JobClient:     Reduce output records=13
14/09/22 20:31:55 INFO mapred.JobClient:     Map output records=28
14/09/22 20:31:55 INFO mapred.JobClient:     Combine input records=0
14/09/22 20:31:55 INFO mapred.JobClient:     CPU time spent (ms)=0
14/09/22 20:31:55 INFO mapred.JobClient:     Total committed heap usage (bytes)=408420352
14/09/22 20:31:55 INFO mapred.JobClient:   File Input Format Counters 
14/09/22 20:31:55 INFO mapred.JobClient:     Bytes Read=163
14/09/22 20:31:55 INFO mapred.JobClient:   FileSystemCounters
14/09/22 20:31:55 INFO mapred.JobClient:     HDFS_BYTES_READ=326
14/09/22 20:31:55 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=81802
14/09/22 20:31:55 INFO mapred.JobClient:     FILE_BYTES_READ=912
14/09/22 20:31:55 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=149
14/09/22 20:31:55 INFO mapred.JobClient:   File Output Format Counters 
14/09/22 20:31:55 INFO mapred.JobClient:     Bytes Written=149

 

2)简化版的输出:

14/09/22 20:26:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/09/22 20:26:02 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/09/22 20:26:02 INFO input.FileInputFormat: Total input paths to process : 1
14/09/22 20:26:02 WARN snappy.LoadSnappy: Snappy native library not loaded
14/09/22 20:26:03 INFO mapred.JobClient: Running job: job_local_0001
14/09/22 20:26:03 INFO util.ProcessTree: setsid exited with exit code 0
14/09/22 20:26:03 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/22 20:26:03 INFO mapred.MapTask: io.sort.mb = 100
14/09/22 20:26:03 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/22 20:26:03 INFO mapred.MapTask: record buffer = 262144/327680
mapper...............
child:child  parent:parent
mapper...............
child:Tom  parent:Lucy
key:Lucy  value: 1+Tom
key:Tom  value: 2+Lucy
mapper...............
child:Tom  parent:Jack
key:Jack  value: 1+Tom
key:Tom  value: 2+Jack
mapper...............
child:Jone  parent:Lucy
key:Lucy  value: 1+Jone
key:Jone  value: 2+Lucy
mapper...............
child:Jone  parent:Jack
key:Jack  value: 1+Jone
key:Jone  value: 2+Jack
mapper...............
child:Lucy  parent:Mary
key:Mary  value: 1+Lucy
key:Lucy  value: 2+Mary
mapper...............
child:Lucy  parent:Ben
key:Ben  value: 1+Lucy
key:Lucy  value: 2+Ben
mapper...............
child:Jack  parent:Alice
key:Alice  value: 1+Jack
key:Jack  value: 2+Alice
mapper...............
child:Jack  parent:Jesse
key:Jesse  value: 1+Jack
key:Jack  value: 2+Jesse
mapper...............
child:Terry  parent:Alice
key:Alice  value: 1+Terry
key:Terry  value: 2+Alice
mapper...............
child:Terry  parent:Jesse
key:Jesse  value: 1+Terry
key:Terry  value: 2+Jesse
mapper...............
child:Philip  parent:Terry
key:Terry  value: 1+Philip
key:Philip  value: 2+Terry
mapper...............
child:Philip  parent:Alima
key:Alima  value: 1+Philip
key:Philip  value: 2+Alima
mapper...............
child:Mark  parent:Terry
key:Terry  value: 1+Mark
key:Mark  value: 2+Terry
mapper...............
child:Mark  parent:Alma
key:Alma  value: 1+Mark
key:Mark  value: 2+Alma
14/09/22 20:26:03 INFO mapred.MapTask: Starting flush of map output
14/09/22 20:26:03 INFO mapred.MapTask: Finished spill 0
14/09/22 20:26:03 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/09/22 20:26:04 INFO mapred.JobClient:  map 0% reduce 0%
14/09/22 20:26:06 INFO mapred.LocalJobRunner: 
14/09/22 20:26:06 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0‘ done.
14/09/22 20:26:06 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/22 20:26:06 INFO mapred.LocalJobRunner: 
14/09/22 20:26:06 INFO mapred.Merger: Merging 1 sorted segments
14/09/22 20:26:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 414 bytes
14/09/22 20:26:06 INFO mapred.LocalJobRunner: 
reduce.....................
key:Alice  values:[email protected]
record: 1+Jack
name: Jack
record: 1+Terry
name: Terry
reduce.....................
key:Alima  values:[email protected]
record: 1+Philip
name: Philip
reduce.....................
key:Alma  values:[email protected]
record: 1+Mark
name: Mark
reduce.....................
key:Ben  values:[email protected]
record: 1+Lucy
name: Lucy
reduce.....................
key:Jack  values:[email protected]
record: 2+Alice
name: Alice
record: 2+Jesse
name: Jesse
record: 1+Tom
name: Tom
record: 1+Jone
name: Jone
grandchild: Tom  grandparent: Alice
grandchild: Tom  grandparent: Jesse
grandchild: Jone  grandparent: Alice
grandchild: Jone  grandparent: Jesse
reduce.....................
key:Jesse  values:[email protected]
record: 1+Jack
name: Jack
record: 1+Terry
name: Terry
reduce.....................
key:Jone  values:[email protected]
record: 2+Lucy
name: Lucy
record: 2+Jack
name: Jack
reduce.....................
key:Lucy  values:[email protected]
record: 1+Tom
name: Tom
record: 1+Jone
name: Jone
record: 2+Mary
name: Mary
record: 2+Ben
name: Ben
grandchild: Tom  grandparent: Mary
grandchild: Tom  grandparent: Ben
grandchild: Jone  grandparent: Mary
grandchild: Jone  grandparent: Ben
reduce.....................
key:Mark  values:[email protected]
record: 2+Terry
name: Terry
record: 2+Alma
name: Alma
reduce.....................
key:Mary  values:[email protected]
record: 1+Lucy
name: Lucy
reduce.....................
key:Philip  values:[email protected]
record: 2+Terry
name: Terry
record: 2+Alima
name: Alima
reduce.....................
key:Terry  values:[email protected]
record: 2+Alice
name: Alice
record: 2+Jesse
name: Jesse
record: 1+Philip
name: Philip
record: 1+Mark
name: Mark
grandchild: Philip  grandparent: Alice
grandchild: Philip  grandparent: Jesse
grandchild: Mark  grandparent: Alice
grandchild: Mark  grandparent: Jesse
reduce.....................
key:Tom  values:[email protected]
record: 2+Jack
name: Jack
record: 2+Lucy
name: Lucy
14/09/22 20:26:06 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/09/22 20:26:06 INFO mapred.LocalJobRunner: 
14/09/22 20:26:06 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/09/22 20:26:06 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://localhost:9000/user/hadoop/stjoin_output06
14/09/22 20:26:07 INFO mapred.JobClient:  map 100% reduce 0%
14/09/22 20:26:09 INFO mapred.LocalJobRunner: reduce > reduce
14/09/22 20:26:09 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0‘ done.
14/09/22 20:26:10 INFO mapred.JobClient:  map 100% reduce 100%
14/09/22 20:26:10 INFO mapred.JobClient: Job complete: job_local_0001
14/09/22 20:26:10 INFO mapred.JobClient: Counters: 22
14/09/22 20:26:10 INFO mapred.JobClient:   Map-Reduce Framework
14/09/22 20:26:10 INFO mapred.JobClient:     Spilled Records=56
14/09/22 20:26:10 INFO mapred.JobClient:     Map output materialized bytes=418
14/09/22 20:26:10 INFO mapred.JobClient:     Reduce input records=28
14/09/22 20:26:10 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/09/22 20:26:10 INFO mapred.JobClient:     Map input records=15
14/09/22 20:26:10 INFO mapred.JobClient:     SPLIT_RAW_BYTES=117
14/09/22 20:26:10 INFO mapred.JobClient:     Map output bytes=356
14/09/22 20:26:10 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/09/22 20:26:10 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/09/22 20:26:10 INFO mapred.JobClient:     Reduce input groups=13
14/09/22 20:26:10 INFO mapred.JobClient:     Combine output records=0
14/09/22 20:26:10 INFO mapred.JobClient:     Reduce output records=13
14/09/22 20:26:10 INFO mapred.JobClient:     Map output records=28
14/09/22 20:26:10 INFO mapred.JobClient:     Combine input records=0
14/09/22 20:26:10 INFO mapred.JobClient:     CPU time spent (ms)=0
14/09/22 20:26:10 INFO mapred.JobClient:     Total committed heap usage (bytes)=406847488
14/09/22 20:26:10 INFO mapred.JobClient:   File Input Format Counters 
14/09/22 20:26:10 INFO mapred.JobClient:     Bytes Read=163
14/09/22 20:26:10 INFO mapred.JobClient:   FileSystemCounters
14/09/22 20:26:10 INFO mapred.JobClient:     HDFS_BYTES_READ=326
14/09/22 20:26:10 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=81502
14/09/22 20:26:10 INFO mapred.JobClient:     FILE_BYTES_READ=762
14/09/22 20:26:10 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=149
14/09/22 20:26:10 INFO mapred.JobClient:   File Output Format Counters 
14/09/22 20:26:10 INFO mapred.JobClient:     Bytes Written=149

6、输出结果:

grandchild    grandparent
Tom    Alice
Tom    Jesse
Jone    Alice
Jone    Jesse
Tom    Mary
Tom    Ben
Jone    Mary
Jone    Ben
Philip    Alice
Philip    Jesse
Mark    Alice
Mark    Jesse

时间: 2024-11-03 21:36:43

MapReduce编程系列 — 5:单表关联的相关文章

MapReduce编程之实现多表关联

多表关联和单表关联类似.它也是通过对原始数据进行一定的处理.从当中挖掘出关心的信息.例如以下 输入的是两个文件,一个代表工厂表,包括工厂名列和地址编号列:还有一个代表地址表,包括地址名列和地址编号列. 要求从输入数据中找出工厂名和地址名的相应关系.输出工厂名-地址名表 样本例如以下: factory: <span style="font-size:14px;">factoryname addressed Beijing Red Star 1 Shenzhen Thunder

MapReduce程序之实现单表关联

设计思路 分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表. 连接结果中除去连接的两列就是所需要的结果--"grandchild--grandparent"表.要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接:其次就是连接列的设置:最后是结果的整理. 考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接的列,然后列中相同的值就自然会连接在一起了.再

MapReduce应用案例--单表关联

1. 实例描述 单表关联这个实例要求从给出的数据中寻找出所关心的数据,它是对原始数据所包含信息的挖掘. 实例中给出child-parent 表, 求出grandchild-grandparent表. 输入数据 file01: child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Marry Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip

Hadoop on Mac with IntelliJ IDEA - 8 单表关联NullPointerException

简化陆喜恒. Hadoop实战(第2版)5.4单表关联的代码时遇到空指向异常,经分析是逻辑问题,在此做个记录. 环境:Mac OS X 10.9.5, IntelliJ IDEA 13.1.5, Hadoop 1.2.1 改好的代码如下,在reduce阶段遇到了NullPointerException. 1 public class STjoinEx { 2 private static final String TIMES = "TIMES"; 3 4 public static v

MySQL 性能优化系列之一 单表预处理

MySQL 性能优化系列之一 单表预处理 背景介绍 我们经常在写多表关联的SQL时,会想到 left jion(左关联),right jion(右关联),inner jion(内关联)等. 但是,当表中数据量过大时,如果没有写好查询条件或者查询条件书写的先后顺序不同,可能会有明显的性能差别. 近期,有个同事遇到一个SQL查询比较慢的问题:tableA,tableB,tableC三张表联合查询的SQL,查询用时将近50s. 原因分析 1.分别确认3张表的数据量 tableA:3千万+ 条记录: t

MapReduce编程系列 — 6:多表关联

1.项目名称: 2.程序代码: 版本一(详细版): package com.mtjoin; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.J

MapReduce单表关联学习~

首先考虑表的自连接,其次是列的设置,最后是结果的整理. 文件内容: import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.ha

案例3,mapreduce单表关联,根据child-parient表解析出grandchild-grandparient表

1.数据样例如下 Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma 2.map的代码如下: public static class ChildParentMapper extends MapReduceBase implements Mapper<Ob

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处