mapreduce求前k个最大值(topk 问题)

需要先统计词频,再进行排序

----------词频统计---------

package TopK;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 /**
  * 统计词频
  * @author mlj
  */
 public class WordCount {

     /**
     * 读取单词
      */
    public static class Map extends Mapper<Object,Text,Text,IntWritable>{

       IntWritable count = new IntWritable(1);

       @Override
       protected void map(Object key, Text value, Context context)
               throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(value.toString());
             while(st.hasMoreTokens()){
               String word = st.nextToken().replaceAll("\"", "").replace("‘", "").replace(".", "");
                context.write(new Text(word), count);
            }
       }

     }

    /**
     * 统计词频
     */
    public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{

       @SuppressWarnings("unused")
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                throws IOException, InterruptedException {
           int count = 0;
            for (IntWritable intWritable : values) {
                count ++;
           }
             context.write(key,new IntWritable(count));
        }

     }

    @SuppressWarnings("deprecation")
    public static boolean run(String in,String out) throws IOException, ClassNotFoundException, InterruptedException{

         Configuration conf = new Configuration();

       Job job = new Job(conf,"WordCount");
        job.setJarByClass(WordCount.class);
         job.setMapperClass(Map.class);
       job.setReducerClass(Reduce.class);

        // 设置Map输出类型
        job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(IntWritable.class);

        // 设置Reduce输出类型
       job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);

      // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(in));
        FileOutputFormat.setOutputPath(job, new Path(out));

         return job.waitForCompletion(true);
   }

 }

  ----------排序---------

package TopK;
  import java.io.IOException;
  import java.util.Comparator;
  import java.util.Map.Entry;
  import java.util.Set;
  import java.util.StringTokenizer;
  import java.util.TreeMap;
  import java.util.regex.Pattern;

  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 /**
 * 以单词出现的频率排序*/
  public class Sort {

     /**
      * 读取单词(词频 word)
      *
     * @author mlj
     *
       */
     public static class Map extends Mapper<Object, Text, IntWritable, Text> {

        // 输出key 词频
         IntWritable outKey = new IntWritable();
         Text outValue = new Text();

       @Override
       protected void map(Object key, Text value, Context context)
               throws IOException, InterruptedException {

            StringTokenizer st = new StringTokenizer(value.toString());
            while (st.hasMoreTokens()) {
                String element = st.nextToken();
               if (Pattern.matches("\\d+", element)) {
                   outKey.set(Integer.parseInt(element));
               } else {
                   outValue.set(element);
                }
             }

           context.write(outKey, outValue);
          }

    }

    /**
     * 根据词频排序
    */
    public static class Reduce extends
             Reducer<IntWritable, Text, Text, IntWritable> {

        private static MultipleOutputs<Text, IntWritable> mos = null;

        //要获得前K个频率最高的词
        private static final int k = 10;

      //用TreeMap存储可以利用它的排序功能
        //这里用 MyInt 因为TreeMap是对key排序,且不能唯一,而词频可能相同,要以词频为Key就必需对它封装
          private static TreeMap<MyInt, String> tm = new TreeMap<MyInt, String>(new Comparator<MyInt>(){
           /**
            * 默认是从小到大的顺序排的,现在修改为从大到小
            * @param o1
           * @param o2
             * @return
        */
             @Override
          public int compare(MyInt o1, MyInt o2) {
                return o2.compareTo(o1);
           }

       }) ;

      /*
         * 以词频为Key是要用到reduce的排序功能
         */
         @Override
         protected void reduce(IntWritable key, Iterable<Text> values,
                 Context context) throws IOException, InterruptedException {
            for (Text text : values) {
                context.write(text, key);
               tm.put(new MyInt(key.get()),text.toString());

                 //TreeMap以对内部数据进行了排序,最后一个必定是最小的
               if(tm.size() > k){
                   tm.remove(tm.lastKey());
                }

            }
         }

        @Override
         protected void cleanup(Context context)
                throws IOException, InterruptedException {
             String path = context.getConfiguration().get("topKout");
            mos = new MultipleOutputs<Text, IntWritable>(context);
            Set<Entry<MyInt, String>> set = tm.entrySet();
           for (Entry<MyInt, String> entry : set) {
                 mos.write("topKMOS", new Text(entry.getValue()), new IntWritable(entry.getKey().getValue()), path);
            }
             mos.close();
        }

     }

     @SuppressWarnings("deprecation")
     public static void run(String in, String out,String topKout) throws IOException,
             ClassNotFoundException, InterruptedException {

         Path outPath = new Path(out);

        Configuration conf = new Configuration();

         //前K个词要输出到哪个目录
        conf.set("topKout",topKout);

        Job job = new Job(conf, "Sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(Map.class);
         job.setReducerClass(Reduce.class);

        // 设置Map输出类型
         job.setMapOutputKeyClass(IntWritable.class);
         job.setMapOutputValueClass(Text.class);

       // 设置Reduce输出类型
         job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置MultipleOutputs的输出格式
        //这里利用MultipleOutputs进行对文件输出
         MultipleOutputs.addNamedOutput(job,"topKMOS",TextOutputFormat.class,Text.class,Text.class);

       // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(in));
        FileOutputFormat.setOutputPath(job, outPath);
        job.waitForCompletion(true);

    }

}

  ---------自定义int---------

package TopK;

public class MyInt implements Comparable<MyInt>{
    private Integer value;

     public MyInt(Integer value){
        this.value = value;
    }

    public int getValue() {
       return value;
    }

     public void setValue(int value) {
       this.value = value;
   }

   @Override
    public int compareTo(MyInt o) {
     return value.compareTo(o.getValue());
   }

}

  ------------------驱动--------------

package TopK;
  import java.io.IOException;

/**
 *
  * @author mlj
  */
 public class TopK {
    public static void main(String args[]) throws ClassNotFoundException, IOException, InterruptedException{

       //要统计字数,排序的文字
       String in = "hdfs://localhost:9000/input/1.text";

       //统计字数后的结果
        String wordCout = "hdfs://mlj:9000/out/wordCout";

       //对统计完后的结果再排序后的内容
        String sort = "hdfs://mlj:9000/out/sort";

         //前K条
        String topK = "hdfs://mlj:9000/out/topK";

       //如果统计字数的job完成后就开始排序
        if(WordCount.run(in, wordCout)){
          Sort.run(wordCout, sort,topK);
      }

  }
 }

  

时间: 2024-10-18 20:30:13

mapreduce求前k个最大值(topk 问题)的相关文章

算法导论学习之线性时间求第k小元素+堆思想求前k大元素

对于曾经,假设要我求第k小元素.或者是求前k大元素,我可能会将元素先排序,然后就直接求出来了,可是如今有了更好的思路. 一.线性时间内求第k小元素 这个算法又是一个基于分治思想的算法. 其详细的分治思路例如以下: 1.分解:将A[p,r]分解成A[p,q-1]和A[q+1,r]两部分.使得A[p,q-1]都小于A[q],A[q+1,r]都不小于A[q]; 2.求解:假设A[q]恰好是第k小元素直接返回,假设第k小元素落在前半区间就到A[p,q-1]递归查找.否则到A[q+1,r]中递归查找. 3

6041 I Curse Myself(点双联通加集合合并求前K大) 2017多校第一场

题意: 给出一个仙人掌图,然后求他的前K小生成树. 思路: 先给出官方题解 由于图是一个仙人掌,所以显然对于图上的每一个环都需要从环上取出一条边删掉.所以问题就变为有 M 个集合,每个集合里面都有一堆数字,要从每个集合中选择一个恰好一个数加起来.求所有的这样的和中,前 K 大的是哪些.这就是一个经典问题了. 点双联通就不说了 都一眼能看出来做法就是缩点之后每个环每次取一个,然后找最大的k个所以这道题的难点就在这里,做法当然是不知道啦,看了题解和博客才懂的.以前做过两个集合合并的,这个是k个合并,

序列合并求前K小项 POJ2442

1 #include <iostream> 2 #include <cstring> 3 #include <cstdio> 4 #include <algorithm> 5 #include <queue> 6 7 using namespace std; 8 9 priority_queue<int>pq; 10 int an[3100]; 11 int bn[3100]; 12 13 int main() 14 { 15 int

[csu/coj 1080]划分树求区间前k大数和

题意:从某个区间内最多选择k个数,使得和最大 思路:首先题目给定的数有负数,如果区间前k大出现负数,那么负数不选和更大,于是对于所有最优选择,负数不会出现,所以用0取代负数,问题便转化为区间的前k大数和. 划分树: [1  6  3  8  5  4  7  2] [6  8  5  7][1  3  4  2] [8  7][6  5][3  4][1  2] [8][7][6][5][4][3][2][1] 把快排的结果从上至下依次放入线段树,就构成了划分树,划分的意思就是选定一个数,把原序

前K个高频元素

数据结构--堆 Heap是一种数据结构具有以下的特点: 1)完全二叉树: 2)heap中存储的值是偏序: Min-heap: 父节点的值小于或等于子节点的值: Max-heap: 父节点的值大于或等于子节点的值: 1.堆的存储: 一般都用数组来表示堆,i结点的父结点下标就为(i–1)/2.它的左右子结点下标分别为2 * i + 1和2 * i + 2.如第0个结点左右子结点下标分别为1和2 2.堆的操作:insert 插入一个元素:新元素被加入到heap的末尾,然后更新树以恢复堆的次序. 每次插

HDU5102(树的前k路径+队列)

The K-th Distance Time Limit: 8000/4000 MS (Java/Others)    Memory Limit: 65536/65536 K (Java/Others) Total Submission(s): 332    Accepted Submission(s): 90 Problem Description Given a tree, which has n node in total. Define the distance between two

求f(k)=k^k(k=1...n)的前n项和

求f(k)=k^k(k=1...n)的前n项和. 程序实现: #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> long long My_Mul_Sum(int *n)//封装了一个求k^k的前n项和的函数 { int k = 1; long long sum = 0;//定义为long long是为了防止数据较大,容易溢出 for (k = 1; k <= n; k++) { int count = 0, mul = 1;//count

light_oj 1282 求n^k的前几位数和后几位数

light_oj 1282 求n^k的前几位数和后几位数 E - Leading and Trailing Time Limit:2000MS     Memory Limit:32768KB     64bit IO Format:%lld & %llu Submit Status Practice LightOJ 1282 Description You are given two integers: n and k, your task is to find the most signif

Uva 11029 Leading and Trailing (求n^k前3位和后3位)

题意:给你 n 和 k ,让你求 n^k 的前三位和后三位 思路:后三位很简单,直接快速幂就好,重点在于如何求前三位,注意前导0 资料:求n^k的前m位 博客连接地址 代码: #include <iostream> #include <cmath> #include <cstdio> #include <algorithm> #define ll long long using namespace std; ll qmod(ll a,ll b,ll mod)