MapReduce分析明星微博数据

互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离。歌星、影星、体育明星、作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单。同时,互联网的飞速发展本身也造就了一批互联网明星,这些人借助新的手段,最大程度发挥了粉丝经济的能量和作用,在互联网时代赚得盆满钵满。

正是基于这样一个大背景,今天我们做一个分析明星微博数据的小项目

1、项目需求

自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中。

2、数据集

明星 明星微博名称 粉丝数 关注数 微博数

俞灏明 俞灏明 10591367 206 558

李敏镐 李敏镐 22898071 11 268

林心如 林心如 57488649 214 5940

黄晓明 黄晓明 22616497 506 2011

张靓颖 张靓颖 27878708 238 3846

李娜 李娜 23309493 81 631

徐小平 徐小平 11659926 1929 13795

唐嫣 唐嫣 24301532 200 2391

有斐君 有斐君 8779383 577 4251

3、分析

自定义InputFormat读取明星微博数据,通过自定义getSortedHashtableByValue方法分别对明星的fan、followers、microblogs数据进行排序,然后利用MultipleOutputs输出不同项到不同的文件中

4、实现

1、定义WeiBo实体类,实现WritableComparable接口

package com.buaa;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
* @ProjectName MicroblogStar
* @PackageName com.buaa
* @ClassName WeiBo
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-07 14:54:29
*/
public class WeiBo implements WritableComparable<Object> {
    // 粉丝
    private int fan;
    // 关注
    private int followers;
    // 微博数
    private int microblogs;

    public WeiBo(){};

    public WeiBo(int fan,int followers,int microblogs){
        this.fan = fan;
        this.followers = followers;
        this.microblogs = microblogs;
    }

    public void set(int fan,int followers,int microblogs){
        this.fan = fan;
        this.followers = followers;
        this.microblogs = microblogs;
    }

    // 实现WritableComparable的readFields()方法,以便该数据能被序列化后完成网络传输或文件输入
    @Override
    public void readFields(DataInput in) throws IOException {
        fan  = in.readInt();
        followers = in.readInt();
        microblogs = in.readInt();
    }

    // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(fan);
        out.writeInt(followers);
        out.writeInt(microblogs);
    }

    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        return 0;
    }

    public int getFan() {
        return fan;
    }

    public void setFan(int fan) {
        this.fan = fan;
    }

    public int getFollowers() {
        return followers;
    }

    public void setFollowers(int followers) {
        this.followers = followers;
    }

    public int getMicroblogs() {
        return microblogs;
    }

    public void setMicroblogs(int microblogs) {
        this.microblogs = microblogs;
    }
}

2、自定义WeiboInputFormat,继承FileInputFormat抽象类

package com.buaa;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

/**
* @ProjectName MicroblogStar
* @PackageName com.buaa
* @ClassName WeiboInputFormat
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-07 10:23:28
*/
public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{

     @Override
     public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
          // 自定义WeiboRecordReader类,按行读取
          return new WeiboRecordReader();
     }

     public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
            public LineReader in;
            // 声明key类型
            public Text lineKey = new Text();
            // 声明 value类型
            public WeiBo lineValue = new WeiBo();

            @Override
            public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {
                // 获取split
                FileSplit split = (FileSplit)input;
                // 获取配置
                Configuration job = context.getConfiguration();
                // 分片路径
                Path file = split.getPath();

                FileSystem fs = file.getFileSystem(job);
                // 打开文件
                FSDataInputStream filein = fs.open(file);

                in = new LineReader(filein,job);
            }

            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                // 一行数据
                Text line = new Text();

                int linesize = in.readLine(line);

                if(linesize == 0)
                    return false; 

                // 通过分隔符‘\t‘,将每行的数据解析成数组
                String[] pieces = line.toString().split("\t");

                if(pieces.length != 5){
                    throw new IOException("Invalid record received");
                } 

                int a,b,c;
                try{
                    // 粉丝
                    a = Integer.parseInt(pieces[2].trim());
                    // 关注
                    b = Integer.parseInt(pieces[3].trim());
                    // 微博数
                    c = Integer.parseInt(pieces[4].trim());
                }catch(NumberFormatException nfe){
                    throw new IOException("Error parsing floating poing value in record");
                }

                //自定义key和value值
                lineKey.set(pieces[0]);
                lineValue.set(a, b, c);

                return true;
            }

            @Override
            public void close() throws IOException {
                if(in != null){
                    in.close();
                }
            }

            @Override
            public Text getCurrentKey() throws IOException, InterruptedException {
                return lineKey;
            }

            @Override
            public WeiBo getCurrentValue() throws IOException, InterruptedException {
                return lineValue;
            }

            @Override
            public float getProgress() throws IOException, InterruptedException {
                return 0;
            }

        }
}

3、编写mr程序

package com.buaa;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @ProjectName MicroblogStar
* @PackageName com.buaa
* @ClassName WeiboCount
* @Description TODO
* @Author 刘吉超
* @Date 2016-05-07 09:07:36
*/
public class WeiboCount extends Configured implements Tool {
    // tab分隔符
    private static String TAB_SEPARATOR = "\t";
    // 粉丝
    private static String FAN = "fan";
    // 关注
    private static String FOLLOWERS = "followers";
    // 微博数
    private static String MICROBLOGS = "microblogs";

    public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {
        @Override
        protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {
            // 粉丝
            context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));
            // 关注
            context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));
            // 微博数
            context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));
        }
    }

    public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
        private MultipleOutputs<Text, IntWritable> mos;

        protected void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs<Text, IntWritable>(context);
        }

        protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {
            Map<String,Integer> map = new HashMap< String,Integer>();

            for(Text value : Values){
                // value = 名称 + (粉丝数 或 关注数 或 微博数)
                String[] records = value.toString().split(TAB_SEPARATOR);
                map.put(records[0], Integer.parseInt(records[1].toString()));
            }

            // 对Map内的数据进行排序
            Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);

            for(int i = 0; i < entries.length;i++){
                mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());
            }
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }
    }

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
        // 配置文件对象
        Configuration conf = new Configuration();

        // 判断路径是否存在,如果存在,则删除
        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }

        // 构造任务
        Job job = new Job(conf, "weibo");
        // 主类
        job.setJarByClass(WeiboCount.class);

        // Mapper
        job.setMapperClass(WeiBoMapper.class);
        // Mapper key输出类型
        job.setMapOutputKeyClass(Text.class);
        // Mapper value输出类型
        job.setMapOutputValueClass(Text.class);

        // Reducer
        job.setReducerClass(WeiBoReducer.class);
        // Reducer key输出类型
        job.setOutputKeyClass(Text.class);
        // Reducer value输出类型
        job.setOutputValueClass(IntWritable.class);

        // 输入路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 自定义输入格式
        job.setInputFormatClass(WeiboInputFormat.class) ;
        //自定义文件输出类别
        MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);

        // 去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 

         //提交任务
        return job.waitForCompletion(true)?0:1;
    }

    // 对Map内的数据进行排序(只适合小数据量)
    @SuppressWarnings("unchecked")
    public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {
        Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);
        // 排序
        Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {
            public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
                return entry2.getValue().compareTo(entry1.getValue());
            }
        });
        return entries;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = {
                "hdfs://ljc:9000/buaa/microblog/weibo.txt",
                "hdfs://ljc:9000/buaa/microblog/out/"
        };
        int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
        System.exit(ec);
    }
}

5、运行结果

如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

实现代码及数据:下载

时间: 2024-07-28 15:55:28

MapReduce分析明星微博数据的相关文章

使用mapReduce分析简单天气数据

做demo前需要先搭建Hadoop集群,并且有linux基础,可参考 https://www.cnblogs.com/linyufeng/p/10831240.html 1.引出问题 给一串数据,找出每年的每个月温度最高的2天.其中有可能包含着相同的数据. 1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1

使用hadoop mapreduce分析mongodb数据

使用hadoop mapreduce分析mongodb数据 (现在很多互联网爬虫将数据存入mongdb中,所以研究了一下,写此文档) 版权声明:本文为yunshuxueyuan原创文章.如需转载请标明出处: http://www.cnblogs.com/sxt-zkys/QQ技术交流群:299142667 一. mongdb的安装和使用 1. 官网下载mongodb-linux-x86_64-rhel70-3.2.9.tgz 2. 解压 (可以配置一下环境变量) 3. 启动服务端 ./mongo

2020不平凡的90天,Python分析三个月微博热搜数据带你回顾

前言 文的文字及图片来源于网络,仅供学习.交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理. 作者:刘早起早起 PS:如有需要Python学习资料的小伙伴可以加点击下方链接自行获取http://t.cn/A6Zvjdun 北京时间4月3日凌晨,全球新冠病毒感染人数突破100万,死亡人数超过5万.而这一切都在2020年刚开始的三个月内发生.可能你觉得这三个月很快,有些事情已经逐渐忘记,而互联网的记忆不会消失,数据也会说话.因此作者抓取了2020年1月1日至4月2日的每

使用hadoop mapreduce分析mongodb数据:(2)

在上一篇使用hadoop mapreduce分析mongodb数据:(1)中,介绍了如何使用Hadoop MapReduce连接MongoDB数据库以及如何处理数据库,本文结合一个案例来进一步说明Hadoop MapReduce处理MongoDB的细节 原始数据 > db.stackin.find({}) { "_id" : ObjectId("575ce909aa02c3b21f1be0bb"), "summary" : "go

用Map-Reduce的思维处理数据

在很多人的眼里,Map-Reduce等于Hadoop,没有Hadoop谈Map-Reduce犹如自上谈兵,实则不然,Map-Reduce是一种计算模型,只是非常适合在并行的环境下运行,Hadoop是Map-Reduce的一种实现,没有Hadoop照样可以跑Map-Reduce程序.python就内置有map()和reduce方法(虽然与hadoop的map-reduce有区别). 这篇文章主要介绍如何用python在linux的管道进行map-reduce编程,本文写的所有map-reduce程

PHP 基于laravel框架获取微博数据之二 用户数据的使用

开始抓取微博数据的时候,只是想获得一条热门微博下的所有评论,因为里面有不少图片广告,所以想试试能不能分析出热门微博评论里的异常用户. 使用PHP的Laravel框架后,通过队列.命令等各种功能,最后构架了一套完整的微博用户数据抓取平台,经过一段时间的运行积累了大量数据,那么使用这些数据能做什么呢? 微博数据分析很早就有人在做了,网上采集分析工具貌似有很多,搜索一下想找一些微博数据分析的具体方案.世事变幻,发现很多几年前的微博数据分析平台都不能用了,可能微博数据分析和微博一样在商业上还是没有什么更

PHP 基于laravel框架获取微博数据之一 模拟新浪微博登录

参考资料:http://www.csuldw.com/2016/11/10/2016-11-10-simulate-sina-login/http://blog.csdn.net/fly_leopard/article/details/51148904http://www.tuicool.com/articles/uIJzYff http://blog.csdn.net/u010029983/article/details/46364113等 模拟新浪微博登录是抓取新浪数据的基础,网上的参考资料

MapReduce分析流量汇总

一.MapReduce编程规范 一.MapReduce编程规范 用户编写mr程序主要分为三个部分:Mapper,Reducer,Driver 1.Mapper阶段 (1)用户自定义Mapper类 要继承父类Mapper (2)Mapper的输入数据的kv对形式(kv类型可以自定义) (3)Mapper的map方法的重写(加入业务逻辑) (4)Mapper的数据输出kv对的形式(kv类型可以自定义) (5)map()方法(maptask进程)对每个<k,v>调用一次 2.Reducer阶段 (1

基于微博数据用 Python 打造一颗“心”

一年一度的虐狗节刚过去不久,朋友圈各种晒,晒自拍,晒娃,晒美食,秀恩爱的.程序员在晒什么,程序员在加班.但是礼物还是少不了的,送什么好?作为程序员,我准备了一份特别的礼物,用以往发的微博数据打造一颗“爱心”,我想她一定会感动得哭了吧.哈哈 准备工作 有了想法之后就开始行动了,自然最先想到的就是用 Python 了,大体思路就是把微博数据爬下来,数据经过清洗加工后再进行分词处理,处理后的数据交给词云工具,配合科学计算工具和绘图工具制作成图像出来,涉及到的工具包有: requests 用于网络请求爬