MapReduce 中的两表 join 实例(二)

package com.baidu.uilt;
import java.io.*;

import org.apache.hadoop.io.*;

public class TextPair implements WritableComparable<TextPair> {

  private Text first;
  private Text second;
  
  public TextPair() {
    set(new Text(), new Text());
  }
  
  public TextPair(String first, String second) {
    set(new Text(first), new Text(second));
  }
  
  public TextPair(Text first, Text second) {
    set(first, second);
  }
  
  public void set(Text first, Text second) {
    this.first = first;
    this.second = second;
  }
  
  public Text getFirst() {
    return first;
  }

  public Text getSecond() {
    return second;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }
  
  @Override
  public int hashCode() {
    return first.hashCode() * 163 + second.hashCode();
  }
  
  @Override
  public boolean equals(Object o) {
    if (o instanceof TextPair) {
      TextPair tp = (TextPair) o;
      return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
  }

  @Override
  public String toString() {
    return first + "\t" + second;
  }
  
  @Override
  public int compareTo(TextPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
      return cmp;
    }
    return second.compareTo(tp.second);
  }

  public static class Comparator extends WritableComparator {
    
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    
    public Comparator() {
      super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      
      try {
        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
        int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        if (cmp != 0) {
          return cmp;
        }
        return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
                                       b2, s2 + firstL2, l2 - firstL2);
      } catch (IOException e) {
        throw new IllegalArgumentException(e);
      }
    }
  }

  static {
    WritableComparator.define(TextPair.class, new Comparator());
  }
  public static class FirstComparator extends WritableComparator {
    
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    
    public FirstComparator() {
      super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      
      try {
        int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
        int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
        return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
      } catch (IOException e) {
        throw new IllegalArgumentException(e);
      }
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      if (a instanceof TextPair && b instanceof TextPair) {
        return ((TextPair) a).first.compareTo(((TextPair) b).first);
      }
      return super.compare(a, b);
    }
  }

}
package com.baidu.loan;
/***
 * 
 * /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar  com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6  /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928  /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928  /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928
 * 
 * **/
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import com.baidu.uilt.TextPair;

public class LoanIdeainfoJoinIterialByDAILI6 extends Configured implements Tool {

	public static class JoinUnitMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, TextPair, Text> {

		public void map(LongWritable key, Text value,
				OutputCollector<TextPair, Text> output, Reporter reporter)
				throws IOException {
			String gbkStr = value.toString();
			if (gbkStr.split("\t").length < 2 && gbkStr.split(",").length == 4) {
				String[] strs = gbkStr.split(",");
				output.collect(new TextPair(strs[0], "0"), value);
			}

		}
	}

	public static class JoinIterialMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, TextPair, Text> {

		public void map(LongWritable key, Text value,
				OutputCollector<TextPair, Text> output, Reporter reporter)
				throws IOException {
			String gbkStr = value.toString();
			if (gbkStr.split("\t").length > 4) {// LoanIterial
				String[] strs = gbkStr.split("\t");
				output.collect(new TextPair(strs[0], "1"), value);
			}
		}
	}

	public static class JoinReducer extends MapReduceBase implements
			Reducer<TextPair, Text, Text, Text> {

		public void reduce(TextPair key, Iterator<Text> values,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {

			Text stationName = new Text(values.next());
			while (values.hasNext()) {
				Text record = values.next();
				Text outValue = new Text(stationName.toString() + "\t"
						+ record.toString());
				output.collect(stationName, record);
				//output.collect(key.getFirst(), outValue);
			}
		}
	}

	public static class KeyPartitioner implements Partitioner<TextPair, Text> {
	    @Override
	    public void configure(JobConf job) {}
	    
	    @Override
	    public int getPartition(TextPair key, Text value, int numPartitions) {
	      return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
	    }
	  }

	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 3) {
		      return -1;
		    }
		    
		    JobConf conf = new JobConf(getConf(), getClass());
		    conf.setJobName("Join record with station name");
		    
		    String strPathUnit =args[0];
		    String strPathIterial =args[1];
		    Path outputPath= new Path(args[2]);
		    
		    MultipleInputs.addInputPath(conf, new Path(strPathUnit),
		        TextInputFormat.class, JoinUnitMapper.class);
		    MultipleInputs.addInputPath(conf, new Path(strPathIterial),
		        TextInputFormat.class, JoinIterialMapper.class);
		    FileOutputFormat.setOutputPath(conf, outputPath);

		    conf.setPartitionerClass(KeyPartitioner.class);
		    conf.setOutputValueGroupingComparator(TextPair.FirstComparator.class);
		    
		    conf.setMapOutputKeyClass(TextPair.class);
		    
		    conf.setReducerClass(JoinReducer.class);

		    conf.setOutputKeyClass(Text.class);
		    
		    JobClient.runJob(conf);
		    return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new LoanIdeainfoJoinIterialByDAILI6(), args);
	    System.exit(exitCode);
	}

}
时间: 2024-09-30 15:13:42

MapReduce 中的两表 join 实例(二)的相关文章

MapReduce 中的两表 join 实例

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. reduce side join是一种最简单的join方式,其主要思想如下: 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标

(转)MapReduce 中的两表 join 几种方案简介

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

(转)MapReduce中的两表join几种方案简介

转自:http://blog.csdn.net/leoleocmm/article/details/8602081 1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和Fi

一个Activity中使用两个layout实例

package com.sbs.aas2l; import android.app.Activity; import android.os.Bundle; import android.view.View; import android.view.View.OnClickListener; import android.widget.AdapterView; import android.widget.ArrayAdapter; import android.widget.Button; imp

WPF中的多进程(Threading)处理实例(二)

原文:WPF中的多进程(Threading)处理实例(二) 1 //错误的处理 2 private void cmdBreakRules_Click(object sender, RoutedEventArgs e) 3 { 4 Thread thread = new Thread(UpdateTextWrong); 5 thread.Start(); 6 } 7 8 private void UpdateTextWrong() 9 { 10 txt.Text = "Here is some n

Hadoop中两表JOIN的处理方法(转)

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

MapReduce实现两表join

一.方法介绍 假设要进行join的数据分别来自File1和File2. 参考:https://blog.csdn.net/yimingsilence/article/details/70242604 1.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,t

Hadoop中两表JOIN的处理方法

http://dongxicheng.org/mapreduce/hadoop-join-two-tables/ http://dongxicheng.org/mapreduce/run-hadoop-job-problems/ http://dongxicheng.org/mapreduce/hdfs-small-files-solution/

java中注解的使用与实例 (二)

java 注解,从名字上看是注释,解释.但功能却不仅仅是注释那么简单.注解(Annotation) 为我们在代码中添加信息提供了一种形式化的方法,是我们可以在稍后 某个时刻方便地使用这些数据(通过 解析注解 来使用这些数据),常见的作用有以下几种: 生成文档.这是最常见的,也是java 最早提供的注解.常用的有@see @param @return 等 跟踪代码依赖性,实现替代配置文件功能.比较常见的是spring 2.5 开始的基于注解配置.作用就是减少配置.现在的框架基本都使用了这种配置来减