十二道MR习题 - 4 - TopN问题

题目:

有一个很大的文件,这文件中的内容全部都是数字,要求尝试从这个文件中找出最大的10个数字。

分析:

看起来像是一个比较简单的问题。不用大数据框架的话,也能比较轻易的实现:就是逐个读取文件中的每个数字,放到一个大顶堆结构中;将大顶堆放满以后,每读取一个数字就将之和大顶堆中的最小值进行比较,如果其大于这个最小值的话,就将其放入堆中,并将堆中的最小值删除;这样读取到最后,堆中剩下来的内容就是top 10了。

用MapReduce实现的话也说不上困难:我们只使用Map任务读取文件,而reduce中输出的内容就是一个有序的结果集,那么后十位自然就是Top10了。这方案虽说可行,但绝说不上是好的方案。

换个思路:map任务中先完成一轮过滤(没必要多添一重Combiner),先取出每个Map中的top10来,而后在reduce中再进行一轮筛选,从所有map的top10中再选出个top10来。这样处理效率应该会高一些。

看看实现过程:

package com.zhyea.dev;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.TreeSet;

public class TopN {

    private static final Logger logger = LoggerFactory.getLogger(TopN.class);

    public static class SplitterMapper extends Mapper<Object, Text, IntWritable, NullWritable> {

        private static final IntWritable intWritable = new IntWritable();

        private static final TreeSet<Integer> set = new TreeSet<>();

        @Override
        public void map(Object key, Text value, Context context) {
            int num = Integer.valueOf(value.toString());

            if (set.size() < 10) {
                set.add(num);
                return;
            }

            if (num > set.first()) {
                set.add(num);
                set.pollFirst();
            }
        }

        @Override
        public void cleanup(Context context) {
            for (Integer i : set) {
                intWritable.set(i);
                try {
                    context.write(intWritable, NullWritable.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class IntegrateReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {

        private static final IntWritable intWritable = new IntWritable();
        private static final TreeSet<Integer> set = new TreeSet<>();

        @Override
        public void reduce(IntWritable key, Iterable<NullWritable> values, Context context) {
            try {
                int num = key.get();
                if (set.size() < 10) {
                    set.add(num);
                    return;
                }

                if (num > set.first()) {
                    set.add(num);
                    set.pollFirst();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void cleanup(Context context) {
            for (Integer i : set) {
                intWritable.set(i);
                try {
                    context.write(intWritable, NullWritable.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "top-n");
        job.setJarByClass(TopN.class);

        job.setMapperClass(SplitterMapper.class);
        job.setReducerClass(IntegrateReducer.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

程序里在map或reduce方法中没有做任何输出,只是实现了比较逻辑,真正的输出是在cleanup方法中完成的。

用spark实现的话可以先做全排序,然后排重,take前N个记录就可以了。当然也可以按照上面的思路来做实现,下面的代码就是按照我们前面的思路来做的实现:

package com.zhyea.dev

import java.util

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}

import collection.JavaConversions.asScalaIterator

object TopTen {

  def main(args: Array[String]): Unit = {
    val inputPath = args(0)
    val outputPath = args(1)
    val conf = new SparkConf().setAppName("Top Ten")
    val sc = new SparkContext(conf)
    val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath)
    data.mapPartitions[Long](findTopTen)
      .repartition(1)
      .distinct()
      .sortBy(_.toLong, false)
      .mapPartitions(itr => itr.slice(0, 10))
      .saveAsTextFile(outputPath)

    def findTopTen(itr: Iterator[(LongWritable, Text)]) = {
      val set = new util.TreeSet[Long]()
      itr.foreach(p => {
        val v = p._2.toString.toLong
        if (set.size <= 10) {
          set.add(v)
        } else if (v > set.first) {
          set.pollFirst()
          set.add(v)
        }
      })
      set.iterator
    }

  }

}

############################

时间: 2024-10-11 10:54:58

十二道MR习题 - 4 - TopN问题的相关文章

十二道MR习题 - 2 - 多文件保存

题目: 需要将MR的执行结果保存到3个文件中,该怎么做. 又是一个送分题. 对于Hadoop的MapReduce来说只需要设置一下reduce任务的数量即可.MR的Job默认reduce数量是1,需要调用job的setNumReduceTasks()方法来调整reduce任务的数量. 对于spark来说,可以调用coalesce方法或repartition方法来调整分区的数量,这样也可以调整最终结果输出文件的数量.关于coalesce方法和分区的关系这里不展开了,随便搜搜就能找到了. #####

十二道MR习题 &ndash; 1 &ndash; 排序

题目: 一个文件,大小约为100G.文件的每一行都是一个数字,要求对文件中的所有数字进行排序. 对于这个题目,了解过Hadoop的同学可以笑而不语了.即使用spark实现也是非常简单的事情. 先说下如何用Hadoop实现.实际上也没什么好说的:Map任务逐行读入数字,而后在Reduce中输出就可以了,简单粗暴到令人发指. 看下代码好了: package com.zhyea.dev; import org.apache.hadoop.conf.Configuration; import org.a

十二道MR习题 - 3 - 交集并集差集

题目 有两个文件A和B,两个文件中都有几百万行数字,现在需要找出A文件和B文件中数字集合的交集.并集.以及A对B的差集. 简单说一下思路: 这个问题关键在于key和value的设计.这里我将文件中的数字设置为key,将文件名称设置为value.这样在reduce阶段很容易就能找出A.B两个文件中数字的交并差集了. 并集就是reduce阶段能输出的全部记录:交集则需要做下过滤,即一个记录中的value需要同时有A.B两个文件的名称:差集则是文件名称集合中只包含A或B的记录. 看下用MapReduc

C primer plus 第五版十二章习题

看完C prime plus(第五版)第十二章,随带完成了后面的习题. 1.不使用全局变量,重写程序清单12.4的程序. 先贴出12.4的程序,方便对照: 1 /* global.c --- 使用外部变量 */ 2 #include <stdio.h> 3 int units = 0; //一个外部变量 4 void critic(void); 5 int main(void) 6 { 7 extern int units; 8 9 printf ("How many pounds

C和指针 第十四章 习题

14.1 打印函数 #include <stdio.h> void print_ledger_long(){ printf("function print_ledger_long\n"); } void print_ledger_detailed(){ printf("function print_ledger_detailed\n"); } void print_ledger_default(){ printf("function print

C和指针 第十六章 习题

16.8 计算平均年龄 #include <stdlib.h> #include <stdio.h> #define MAX_LEN 512 int main() { int age; int totalAge; float avgAge; int peopleNum; FILE *file; char info[MAX_LEN]; char *infoPtr; file = fopen("D:/family.txt", "r"); //按行

小甲鱼第十五课后习题--016列表

1.列表.元组和字符串的共同点: 1)都可以通过索引得到每一个元素 2)默认索引值总是从0开始 3)可以通过分片的方法得到一个范围内的元素的集合 4)由很多共同的操作符(重复操作符,拼接操作符,成员关系操作符) 2.迭代: 重复反馈过程的活动,其目的通常是为了接近并达到所需的目标成果,每一次对过程的重复我们称之为迭代.每一次迭代的结果作为下一次迭代的初始值. 3.一些重要的BIF: 1)list():把一个可迭代的对象转换为列表 2)tuple([iterable]):把一个可迭代的对象转化为元

网络操作系统第十二、十三章习题

第十二章习题 1.简述FTP的连接模式. 答:FTP的连接模式有RORT和PASV两种,其中RORT是主动模式,PASV是被动模式,这里说的主动和被动都是相对与服务器而言的.如果是主动模式,数据端口为20,如果是被动模式,则由服务器端和客户端协商而定. 2.简述FTP的传输模式. 答:FTP的传输模式包括ASCII传输模式和二进制模式.ASCII传输模式适合用于文本传输,二进制传输模式适合用于非文本传输. 3.如何在Windows系统中配置FTP服务器? 答:在安装服务器时,在"FTP站点&qu

中国历史上十大伪君子

首先说明评选中国历史上的十大伪君子的标准:第一,他们是伪君子,不是大恶人,所以凶狠好杀的秦始皇.楚霸王等人都不算;第二,他们都是隐蔽得很好的伪君子,到现在还受到称赞,至少还存在争议,象王莽那种生前就被揭穿的,虽然也是伪君子,但属於不高明的伪君子,所以也没有列入.这十大伪君子按照时间排序. 1.舜: 舜本来出生贫寒,他的父亲是个盲乐师,知子没如父,瞎眼的老音乐家对自己儿子的底细知道得一清二楚,几次想杀了他,可都被舜逃脱了. 后来舜得到了尧的信赖而步步高升,等到掌握了大权之后,就把尧的儿子丹朱杀死,