tf-idf hadoop map reduce

package com.jumei.robot.mapreduce.tfidf;

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.jumei.robot.common.beans.Word;
import com.jumei.robot.preprocess.IFilterStopwordService;
import com.jumei.robot.preprocess.IWordSegService;

/**
 * <pre>
 * TF-IDF 算法MapReduce实现
 * 分3job
 * job 1: 统计文档中单词在该文档中出现的次数(n)及该文档所有单词的总数(N)
 * job 2: 统计单词所包含的文档数(d),根据所有文档的总数(D),计算tf-idf值
 * job 3: 对job2进行排序,输出tf-idf值最大的前top N个词
 * 数学公式:
 * tf = n / N
 * idf = Math.log(D / d);
 * tf-idf = tf * idf
 * </pre>
 * @author deyin
 *
 */
public class TfIdfMapReduce {

  private static Configuration conf;

  public static void main(String[] args) throws Exception {
    conf = new Configuration();
    if (args.length < 3) {
      System.err.println("arguments invalid, usgae: hadoop jar tfidf.jar com.jumei.robot.mapreduce.tfidf.TfIdfMapReduce <hdfs input folder> <hdfs output folder> <number of documents> <topN>");
      return;
    }

    String pathin = args[0];
    String pathout = args[1];
    int nrOfDocuments = Integer.parseInt(args[2]);
    int topN = Integer.parseInt(args[3]);

    System.out.println("==========================================");
    System.out.println("pathin: " + pathin);
    System.out.println("pathout: " + pathout);
    System.out.println("nrOfDocuments: " + nrOfDocuments);
    System.out.println("topN: " + topN);
    System.out.println("==========================================");

    FileSystem fs = FileSystem.get(conf);

    if (!fs.exists(new Path(pathout))) {
      fs.mkdirs(new Path(pathout));
    }

    Path firstJobOut  = new Path(pathout, "job1_output");
    Path secondJobOut = new Path(pathout, "job2_output");
    Path thirdJobOut = new Path(pathout, "job3_output");
    // empty if exists output
    fs.delete(firstJobOut, true);
    fs.delete(secondJobOut, true);
    fs.delete(thirdJobOut, true);

    // Run job 1
    runFirstJob(new Path(pathin), firstJobOut, nrOfDocuments);

    // Run job 2
    runSecondJob(firstJobOut, secondJobOut, nrOfDocuments); // job1的输出作为job2的输入+

    // Run job 3
    runThirdJob(secondJobOut, thirdJobOut, topN); // job1的输出作为job2的输入+

  }

  private static int runFirstJob(Path pathin, Path pathout, final int reduceTaskSize) throws Exception {
    String jobName = "tfidf_first_job";
    System.out.println("==================" + jobName + "=======================");

    Job job = new Job(conf, jobName);
    job.setJarByClass(TfIdfMapReduce.class);

    job.setMapperClass(FirstMapReduce.Mapper.class);
    job.setCombinerClass(FirstMapReduce.Combiner.class);
    job.setReducerClass(FirstMapReduce.Reducer.class);

    job.setNumReduceTasks(reduceTaskSize);
    // 自定义分区器
    job.setPartitionerClass(FirstMapReduce.Partitioner.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, pathin);
    FileOutputFormat.setOutputPath(job, pathout);

    boolean success = job.waitForCompletion(true);
    return success ? 0 : -1;
  }

  private static int runSecondJob(Path pathin, Path pathout, final int nrOfDocuments) throws Exception {
    String jobName = "tfidf_second_job";
    System.out.println("==================" + jobName + "=======================");

    conf.setInt("nrOfDocuments", nrOfDocuments);
    Job job = new Job(conf, jobName);

    job.setJarByClass(TfIdfMapReduce.class);

    job.setMapperClass(SecondMapReduce.Mapper.class);
    job.setCombinerClass(SecondMapReduce.Combiner.class);
    job.setReducerClass(SecondMapReduce.Reducer.class);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, pathin);
    FileOutputFormat.setOutputPath(job, pathout);

    boolean success = job.waitForCompletion(true);
    return success ? 0 : -1;
  }

  private static int runThirdJob(Path pathin, Path pathout, final int topN) throws Exception {
    String jobName = "tfidf_third_job";
    System.out.println("==================" + jobName + "=======================");

    conf.setInt("topN", topN);
    conf.set("topN_out", new Path(pathin.getParent(), "" + topN).getName());
    Job job = new Job(conf, jobName);

    job.setJarByClass(TfIdfMapReduce.class);

    job.setMapperClass(ThirdMapReduce.Mapper.class);
    job.setReducerClass(ThirdMapReduce.Reducer.class);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    MultipleOutputs.addNamedOutput(job, "top" + topN, TextOutputFormat.class, Text.class, Text.class);

    FileInputFormat.addInputPath(job, pathin);
    FileOutputFormat.setOutputPath(job, pathout);

    boolean success = job.waitForCompletion(true);
    return success ? 0 : -1;
  }

  static class FirstMapReduce {

    // 分词接口
    static IWordSegService wordSegService;

    //停用词过滤接口
    static IFilterStopwordService filterStopwordService;

    static {
      init();
    }

    static void init() {
      ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:spring/robot-preprocess.xml");
      wordSegService = (IWordSegService) ctx.getBean("wordSegService");
      filterStopwordService = (IFilterStopwordService) ctx.getBean("filterStopwordService");
    }

    static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {

      static final Text one = new Text("1");

      String filename = "";

      long totalWordCount = 0; // 当前文档中单词总数

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " map================");
      }

      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Segment the line into words and output each word
        // Input (LineNr, Line in document)
        // Output (filename;word, 1)
        String line = value.toString();
        if (line.trim().isEmpty() || line.startsWith("#")) { // ignore empty or comment line
          return;
        }
        FileSplit split = (FileSplit) context.getInputSplit();
        filename = split.getPath().toString();
        // 分词
        Collection<Word> words = wordSegService.segment(line);
        // 去掉停用词
        filterStopwordService.filter(words);
        for (Word word : words) {
          String outputKey = filename + ";" + word.getName();
          //System.out.println("<" + outputKey + ", " + one.toString() + ">");
          context.write(new Text(outputKey), one);
          ++totalWordCount;
        } // end for
      } // end map

      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new Text(filename + ";" + "!"), new Text("" + totalWordCount)); // 写入文件中词的总数目, ‘!‘的ascii码比所有字母都小,sort后排在最前面
      }

    } // end class Mapper

    static class Partitioner extends org.apache.hadoop.mapreduce.Partitioner<Text, Text> {
      @Override
      public int getPartition(Text key, Text value, int numPartitions) {
        // partition by filename
        StringTokenizer tokenizer = new StringTokenizer(key.toString(), ";");
        String filename = tokenizer.nextToken();
        int hashCode = new Text(filename).hashCode();
        return Math.abs((hashCode * 127) % numPartitions);
      }
    } // end class Partitioner

    static class Combiner extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " combiner================");
      }

      long totalWordCount = 0;
      @Override
      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // Calculate word count of each document and total word count
        // Input  (filename;word, 1)
        // Output (word;filename, n;N)
        StringTokenizer tokenizer = new StringTokenizer(key.toString(), ";");
        String filename = tokenizer.nextToken();
        String word = tokenizer.nextToken();
        if(word.endsWith("!")) {
          for (Text value : values) {
            totalWordCount = Long.parseLong(value.toString());
            System.out.println("File " + filename + " total word count " + totalWordCount);
            return;
          }
        }
        long wordCount = 0;
        for(Text value: values) {
          wordCount += Integer.parseInt(value.toString());
        }
        String outputKey = word + ";" + filename;
        String outputValue = wordCount + ";" + totalWordCount;
        //System.out.println("<" + outputKey + ", " + outputValue + ">");
        context.write(new Text(outputKey), new Text(outputValue));
      } // end reduce
    } // end class Combiner

    static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {

      @Override
      protected void setup(Context context)
          throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " reducer================");
      }

      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
          //System.out.println("<" + key.toString() + ", " + value.toString() + ">");
          context.write(key, value);
        }
      }
    } // end reduce
  } // end class reducer

  static class SecondMapReduce {

    static class Mapper extends org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, Text> {

      static Text one = new Text("1");

      @Override
      protected void setup(Context context)
          throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " map================");
      }

      @Override
      protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        // Word occur in document
        // Input  (word;filename, n;N)
        // Output (word;filename;n;N, 1)
        String outputKey = key.toString() + ";" + value.toString();
        String outputValue = one.toString();
        //System.out.println("<" + outputKey + ", " + outputValue  + ">");
        context.write(new Text(outputKey), one);
      }
    } // end map

    static class Combiner extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {

      int D = 1;

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        D = context.getConfiguration().getInt("nrOfDocuments", 0);
        System.out.println("=================" + context.getJobName() + " combiner================");
      }

      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // Calculate word contains document count
        // Input  (word;filename;n;N, 1)
        // Output (word;filename;n;N, d;D)
        int d = 0; // 该单词包含的文档总数
        for (Text value : values) {
          d += Integer.parseInt(value.toString());
        }
        String outputKey = key.toString();
        String outputValue = d + ";" + D;
        //System.out.println("<" + outputKey + ", " + outputValue  + ">");
        context.write(key, new Text(outputValue));
      } // end reduce
    } // end class Combiner

    static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("=================" + context.getJobName() + " reducer================");
      }

      protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // Calculate tf-idf
        // Input  (word;filename;n;N, d;D)
        // Output (word;filename, tf-idf)
        StringTokenizer keyTokenizer = new StringTokenizer(key.toString(), ";");
        String word = keyTokenizer.nextToken();
        String filename = keyTokenizer.nextToken();
        long n = Long.parseLong(keyTokenizer.nextToken()); // 单词出现次数
        long N = Long.parseLong(keyTokenizer.nextToken()); // 单词总数
        StringTokenizer valueTokenizer = new StringTokenizer(values.iterator().next().toString(), ";");
        int d = Integer.parseInt(valueTokenizer.nextToken()); // 单词包含的文档数
        int D = Integer.parseInt(valueTokenizer.nextToken()); // 文档总数
        double tf = n / 1.0d / N;
        double idf = Math.log(D / 1.0d / d);
        double tfidf = tf * idf;
        String outputKey = word + ";" + filename;
        String outputValue = "" + tfidf;
        //System.out.println("<" + outputKey + ", " + outputValue  + ">");
        context.write(new Text(outputKey), new Text(outputValue));
      } // end reduce

      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
      }
    } // end Reducer
  } // end class SecondMapReduce

  static class ThirdMapReduce {

    static class Pair implements Comparable<Pair>{
      final String key;
      final Double value;
      public Pair(String key, Double value) {
        this.key = key;
        this.value = value;
      }

      public int compareTo(Pair o) {
        int value = o.value.compareTo(this.value);
        if(value != 0) {
          return value;
        }
        return o.key.compareTo(this.key);
      }

      @Override
      public String toString() {
        return key;
      }
    }

    static class Mapper extends org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, Text> {

      static TreeMap<Pair, String> treemap = new TreeMap<Pair, String>(new Comparator<Pair>() {
        public int compare(Pair o1, Pair o2) {
          return o1.compareTo(o2);
        }
      });

      int topN;

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 100); // default 100
        System.out.println("=================" + context.getJobName() + " map================");
      }

      @Override
      protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        // Input  (word;filename, tf-idf)
        treemap.put(new Pair(key.toString(), Double.parseDouble(value.toString())), value.toString());
        if(treemap.size() > topN) {
          treemap.remove(treemap.lastKey());
        }
      } // end map

      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        Set<Entry<Pair,String>> entrySet = treemap.entrySet();
        for (Entry<Pair, String> entry : entrySet) {
          String outputKey = entry.getKey().toString();
          String outputValue = entry.getValue();
          //System.out.println("<" + outputKey + ", " + outputValue  + ">");
          context.write(new Text(outputKey), new Text(outputValue));
        }
      }
    } // end class mapper

    static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {

      int topN;

      static TreeMap<Pair, String> treemap = new TreeMap<Pair, String>(new Comparator<Pair>() {
        public int compare(Pair o1, Pair o2) {
          return o1.compareTo(o2);
        }
      });

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 100); // default 100
        System.out.println("=================" + context.getJobName() + " reduce================");
      }

      @Override
      protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
        // Input (word;filename, tf-idf)
        Text value = values.iterator().next();
        treemap.put(new Pair(key.toString(), Double.parseDouble(value.toString())), value.toString());
        if(treemap.size() > topN) {
          treemap.remove(treemap.lastKey());
        }
      } // end reduce

      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        // 输出前topN个
        String path = context.getConfiguration().get("topN_out");
        MultipleOutputs<Text, Text> output = null;
        try {
          output = new MultipleOutputs<Text, Text>(context);
          Set<Entry<Pair, String>> entrySet = treemap.entrySet();
          System.out.println("================TF-IDF top " + topN + "==================");
          for (Entry<Pair, String> entry : entrySet) {
            String key = entry.getKey().toString();
            String value = entry.getValue();
            output.write("top" + topN, key, value, path);
            System.out.println("<" + key + ", " + value  + ">");
          }
        } catch (IOException e) {
          throw e;
        } catch (InterruptedException e) {
          throw e;
        } finally {
          if (output != null) {
            output.close();
          }
        }
      }
    } // end class Reducer
  }

}
时间: 2024-10-11 23:28:28

tf-idf hadoop map reduce的相关文章

一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解Mapper和Reducer接口,应用通常须要提供map和reduce方法以实现他们. 接着我们须要对JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等进行讨

基于hadoop (map/reduce)的大规模分布式并行计算生产排程系统的设计

map/reduce是大数据处理的技术,它的思路是把大规模数据分成一个个小数据块,每个数据块由一个map任务来处理,处理的中间结果汇总到reduce,最后生成最终的处理结果,这个处理和汇总的过程是可以反复多次的,也就是map-reduce-map-reduce 基于map/reduce的思路,我们可以设计基于hadoop(map/reduce)的大规模分布式并行计算生产排程系统,怎么样把大数据处理变成大规模分布式并行计算呢,就是我们不切分数据,每个map任务的数据都是相同的,每个map任务对排程

Hadoop Map/Reduce

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集.一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们.框架会对map的输出先进行排序, 然后把结果输入给reduce任务.通常作业的输入和输出都会被存储在文件系统中. 整个框架负责任务的调度和监控,以及重新执行已经失败的任务.通常,Map/R

Hadoop map reduce 任务数量优化

mapred.tasktracker.map.tasks.maximum 官方解释:The maximum number of map tasks that will be run  simultaneously by a task tracker. 我的理解:一个tasktracker最多可以同时运行的map任务数量 默认值:2 优化值:mapred.tasktracker.map.tasks.maximum = cpu数量 cpu数量 = 服务器CPU总核数 / 每个CPU的核数服务器CPU

Hadoop学习:Map/Reduce初探与小Demo实现

一.    概念知识介绍 Hadoop MapReduce是一个用于处理海量数据的分布式计算框架.这个框架攻克了诸如数据分布式存储.作业调度.容错.机器间通信等复杂问题,能够使没有并行 处理或者分布式计算经验的project师,也能非常轻松地写出结构简单的.应用于成百上千台机器处理大规模数据的并行分布式程序. Hadoop MapReduce基于"分而治之"的思想,将计算任务抽象成map和reduce两个计算过程,能够简单理解为"分散运算-归并结果"的过程.一个 M

分布式基础学习(2)分布式计算系统(Map/Reduce)

二. 分布式计算(Map/Reduce) 分 布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架.在Hadoop中,分布式文件 系统,很大程度上,是为各种分布式计算需求所服务的.我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分布式计算上,我们可以将其视为增 加了分布式支持的计算函数.从计算的角度上看,Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的输出文件. 而从分布式的角度

Map Reduce和流处理

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文由@从流域到海域翻译,发表于腾讯云+社区 map()和reduce()是在集群式设备上用来做大规模数据处理的方法,用户定义一个特定的映射,函数将使用该映射对一系列键值对进行处理,直接产生出一系列键值对. Map Reduce和流处理 Hadoop的Map / Reduce模型在并行处理大量数据方面非常出色.它提供了一个通用的分区机制(基于数据的关键)来分配不同机器上的聚合式工作负载.基本上, map / reduce的算法设计都是关

Hadoop :map+shuffle+reduce和YARN笔记分享

今天在公司做了一个hadoop分享,包括mapreduce,及shuffle深度讲解,还有YARN框架的详细说明等. v\:* {behavior:url(#default#VML);} o\:* {behavior:url(#default#VML);} w\:* {behavior:url(#default#VML);} .shape {behavior:url(#default#VML);} Normal 0 false 7.8 磅 0 2 false false false EN-US

hadoop学习WordCount+Block+Split+Shuffle+Map+Reduce技术详解

转自:http://blog.csdn.net/yczws1/article/details/21899007 纯干货:通过WourdCount程序示例:详细讲解MapReduce之Block+Split+Shuffle+Map+Reduce的区别及数据处理流程. Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分.要想了解MR,Shuffle是必须要理解的.了解Shuffle的过程,更有利于我们在对MapReduce job性能调优的工作有帮助,以及进一步加深我们对MR内