MapReduce明星搜索指数统计,找出人气王

我们继续通过项目强化掌握Combiner和Partitioner优化Hadoop性能

1、项目介绍

本项目我们使用明星搜索指数数据,分别统计出搜索指数最高的男明星和女明星。

2、数据集

3、分析

基于项目的需求,我们通过以下几步完成:

1、编写Mapper类,按需求将数据集解析为key=gender,value=name+hotIndex,然后输出。

2、编写Combiner类,合并Mapper输出结果,然后输出给Reducer。

3、编写Partitioner类,按性别,将结果指定给不同的Reduce执行。

4、编写Reducer类,分别统计出男、女明星的最高搜索指数。

5、编写run方法执行MapReduce任务

4、实现

package com.buaa;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @ProjectName CountStarSearchIndex
* @PackageName com.buaa
* @ClassName SearchStarIndex
* @Description 统计分别统计出男女明星最大搜索指数
* @Author 刘吉超
* @Date 2016-05-12 16:30:23
*/
public class SearchStarIndex extends Configured implements Tool {
    // 分隔符\t
    private static String TAB_SEPARATOR = "\t";
    // 男
    private static String MALE = "male";
    // 女
    private static String FEMALE = "female";

    /*
     * 解析明星数据
     */
    public static class IndexMapper extends Mapper<Object, Text, Text, Text> {
        /*
         * 每次调用map(LongWritable key, Text value, Context context)解析一行数据。
         * 每行数据存储在value参数值中。然后根据‘\t‘分隔符,解析出明星姓名,性别和搜索指数
         */
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            // 将数据解析为数组
            String[] tokens = value.toString().split(TAB_SEPARATOR);

            if(tokens != null && tokens.length >= 3){
                // 性别
                String gender = tokens[1].trim();
                // 名称、关注指数
                String nameHotIndex = tokens[0].trim() + TAB_SEPARATOR + tokens[2].trim();

                // 输出key=gender value=name+hotIndex
                context.write(new Text(gender), new Text(nameHotIndex));
            }
        }
    }

    /*
     * 根据性别对数据进行分区,将 Mapper的输出结果均匀分布在 reduce上
     */
    public static class IndexPartitioner extends Partitioner<Text, Text> {
        @Override
        public int getPartition(Text key, Text value, int numReduceTasks) {
            // 按性别分区
            String sex = key.toString();

            // 默认指定分区 0
            if(numReduceTasks == 0)
                return 0;

            // 性别为男,选择分区0
            if(MALE.equals(sex)){
                return 0;
            }else if(FEMALE.equals(sex)){ // 性别为女,选择分区1
                return 1 % numReduceTasks;
            }else // 性别未知,选择分区2
                return 2 % numReduceTasks;

        }
    }

    /*
     * 定义Combiner,对 map端的输出结果,先进行一次合并,减少数据的网络输出
     */
    public static class IndexCombiner extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
            int maxHotIndex = Integer.MIN_VALUE;
            String name= "";

            for (Text val : values) {
                String[] valTokens = val.toString().split(TAB_SEPARATOR);

                int hotIndex = Integer.parseInt(valTokens[1]);

                if(hotIndex > maxHotIndex){
                    name = valTokens[0];
                    maxHotIndex = hotIndex;
                }
            }

            context.write(key, new Text(name + TAB_SEPARATOR + maxHotIndex));
        }
    }

    /*
     * 统计男、女明星最高搜索指数
     */
    public static class IndexReducer extends Reducer<Text, Text, Text, Text> {
        /*
         * 调用reduce(key, Iterable< Text> values, context)方法来处理每个key和values的集合。
         * 我们在values集合中,计算出明星的最大搜索指数
         */
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
            int maxHotIndex = Integer.MIN_VALUE;
            String name = " ";

            // 根据key,迭代 values集合,求出最高搜索指数
            for (Text val : values) {
                String[] valTokens = val.toString().split(TAB_SEPARATOR);

                int hotIndex = Integer.parseInt(valTokens[1]);

                if (hotIndex > maxHotIndex) {
                    name = valTokens[0];
                    maxHotIndex = hotIndex;
                }
            }

            context.write(new Text(name), new Text(key + TAB_SEPARATOR + maxHotIndex));
        }
    }

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
        // 读取配置文件
        Configuration conf = new Configuration();

        // 如果目标文件夹存在,则删除
        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }

        // 新建一个任务
        Job job = new Job(conf, "searchStarIndex");
        // 主类
        job.setJarByClass(SearchStarIndex.class);

        // reduce的个数设置为2
        job.setNumReduceTasks(2);
        // 设置Partitioner类
        job.setPartitionerClass(IndexPartitioner.class);

        // Mapper
        job.setMapperClass(IndexMapper.class);
        // Reducer
        job.setReducerClass(IndexReducer.class);

        // map 输出key类型
        job.setMapOutputKeyClass(Text.class);
        // map 输出value类型
        job.setMapOutputValueClass(Text.class);

        // 设置Combiner类
        job.setCombinerClass(IndexCombiner.class);

        // 输出结果 key类型
        job.setOutputKeyClass(Text.class);
        // 输出结果 value类型
        job.setOutputValueClass(Text.class);

        // 输入路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交任务
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = {
                "hdfs://ljc:9000/buaa/index/index.txt",
                "hdfs://ljc:9000/buaa/index/out/"
        };
        int ec = ToolRunner.run(new Configuration(), new SearchStarIndex(), args0);
        System.exit(ec);
    }
}

5、运行效果

如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

实现代码及数据:下载

时间: 2024-12-12 08:42:25

MapReduce明星搜索指数统计,找出人气王的相关文章

明星搜索指数统计,找出人气王

项目介绍 本项目我们使用明星搜索指数数据,分别统计出搜索指数最高的男明星和女明星. 数据集 明星搜索指数数据集,如下图所示. 思路分析 基于项目的需求,我们通过以下几步完成: 1.编写 Mapper类,按需求将数据集解析为 key=gender,value=name+hotIndex,然后输出. 2.编写 Combiner 类,合并 Mapper 输出结果,然后输出给 Reducer. 3.编写 Partitioner 类,按性别,将结果指定给不同的 Reduce 执行. 4.编写 Reduce

[ jquery 过滤器 find(expr|obj|ele) ] 此方法用于在选择器的基础之上搜索所有与指定表达式匹配的元素,这个函数是找出正在处理的元素的后代元素的好方法

此方法用于在选择器的基础之上搜索所有与指定表达式匹配的元素,这个函数是找出正在处理的元素的后代元素的好方法 此方法可以算作是与children()方法相对应的一种方法... 实例: <html lang='zh-cn'> <head> <title>Insert you title</title> <meta http-equiv='description' content='this is my page'> <meta http-equ

Python list去重及找出,统计重复项

http://bbs.chinaunix.net/thread-1680208-1-1.html 如何找出 python list 中有重复的项 http://www.cnblogs.com/feisky/archive/2012/12/06/2805251.html 比较容易记忆的是用内置的setl1 = ['b','c','d','b','c','a','a']l2 = list(set(l1))print l2 还有一种据说速度更快的,没测试过两者的速度差别l1 = ['b','c','d

Dijkstra 算法,用于对有权图进行搜索,找出图中两点的最短距离

Dijkstra 算法,用于对有权图进行搜索,找出图中两点的最短距离,既不是DFS搜索,也不是BFS搜索. 把Dijkstra 算法应用于无权图,或者所有边的权都相等的图,Dijkstra 算法等同于BFS搜索. http://www.cnblogs.com/biyeymyhjob/archive/2012/07/31/2615833.html 2.算法描述 1)算法思想:设G=(V,E)是一个带权有向图,把图中顶点集合V分成两组,第一组为已求出最短路径的顶点集合(用S表示,初始时S中只有一个源

C语言:对传入sp的字符进行统计,三组两个相连字母“ea”&quot;ou&quot;&quot;iu&quot;出现的次数,并将统计结果存入ct所指的数组中。-在数组中找出最小值,并与第一个元素交换位置。

//对传入sp的字符进行统计,三组两个相连字母“ea”"ou""iu"出现的次数,并将统计结果存入ct所指的数组中. 1 #include <stdio.h> 2 #include <string.h> 3 #pragma warning (disable:4996) 4 void fun(char*sp ,int *ct) 5 { 6 int a=0, b=0, c=0; 7 while (*sp != '\0') 8 { 9 if (*s

怎样从10亿查询词找出出现频率最高的10个

转自:http://dongxicheng.org/big-data/select-ten-from-billions/ 1. 问题描述 在大规模数据处理中,常遇到的一类问题是,在海量数据中找出出现频率最高的前K个数,或者从海量数据中找出最大的前K个数,这类问题通常称为“top K”问题,如:在搜索引擎中,统计搜索最热门的10个查询词:在歌曲库中统计下载率最高的前10首歌等等. 2. 当前解决方案 针对top k类问题,通常比较好的方案是[分治+trie树/hash+小顶堆],即先将数据集按照h

关于使用一条SQL语句 找出同时符合多个tag条件的记录集合算法

表结构 Tag Table:{tag_id, tag_name}  #标签表 News Table:{news_id, title,......}  #新闻表 NewsTags Table:{tag_id, news_id}  #新闻的标签关系表 解释: 一条新闻,有多个tag标签,例如: 新闻a{Tag1,Tag2, Tag3, Tag4} 新闻b{Tag1,Tag6, Tag7, Tag8} 新闻c{Tag8,Tag9, Tag10, Tag1} 新闻...{Tag..., .....} 搜

水贴王之续,找出数组里出现频率最高的元素

找出数组里出现频率最高的元素 个人信息:就读于燕大本科软件工程专业 目前大三; 本人博客:google搜索"cqs_2012"即可; 个人爱好:酷爱数据结构和算法,希望将来从事算法工作为人民作出自己的贡献; 博客内容:水贴王问题之续 博客时间:2014-5-12; 编程语言:Java ; 编程坏境:Windows 7 专业版 x64; 编程工具:jdk,eclipse x64; 制图工具:office 2007 powerpoint; 硬件信息:7G-3 笔记本; 真言: 痛苦的活着比

面试题-10亿个数中找出最大的10000个数(top K问题)

一个较好的方法:先拿出10000个建立小根堆,对于剩下的元素,如果大于堆顶元素的值,删除堆顶元素,再进行插入操作,否则直接跳过,这样知道所有元素遍历完,堆中的10000个就是最大的10000个.时间复杂度: m + (n-1)logm = O(nlogm) 优化的方法:可以把所有10亿个数据分组存放,比如分别放在1000个文件中(如果是字符串hash(x)%M).对每个文件,建立大小为10000的小根堆,然后按有序数组的合并合并起来,取出最大的10000个即是答案. top K问题 在大规模数据