使用hadoop mapreduce分析mongodb数据:(2)

在上一篇使用hadoop mapreduce分析mongodb数据:(1)中,介绍了如何使用Hadoop MapReduce连接MongoDB数据库以及如何处理数据库,本文结合一个案例来进一步说明Hadoop MapReduce处理MongoDB的细节

  • 原始数据
  • > db.stackin.find({})
    { "_id" : ObjectId("575ce909aa02c3b21f1be0bb"), "summary" : "good good day",  "url" : "url_1" }
    { "_id" : ObjectId("575ce909aa02c3b21f1be0bc"), "summary" : "hello world good world", "url" : "url_2" }
    { "_id" : ObjectId("575ce909aa02c3b21f1be0bd"), "summary" : "hello world good hello good", "url" : "url_3" }
    { "_id" : ObjectId("575ce909aa02c3b21f1be0be"), "summary" : "hello world hello",  "url" : "url_4" }

    每一个记录表示一个网页,summary对应的值是网页的文章,url对应的值是该文章的链接

  • 目标结果
  • > db.stackout.find({})
    { "_id" : "world", "data" : [  {  "url_2" : 2 },  {  "url_3" : 1 },  {  "url_4" : 1 } ], "index" : 0, "maxindex" : 3 }
    { "_id" : "good", "data" : [  {  "url_1" : 2 },  {  "url_2" : 1 },  {  "url_3" : 2 } ], "index" : 0, "maxindex" : 3 }
    { "_id" : "day", "data" : [  {  "url_1" : 1 } ], "index" : 0, "maxindex" : 1 }
    { "_id" : "hello", "data" : [  {  "url_2" : 1 },  {  "url_3" : 2 },  {  "url_4" : 2 } ], "index" : 0, "maxindex" : 3 }

    我们需要统计每个单词在每个网页中分别出现的次数,从结果可知,单词world在每个url出现的次数

  • 设计代码

     1 import java.util.*;
     2 import java.io.*;
     3
     4 import org.bson.*;
     5
     6 import com.mongodb.hadoop.MongoInputFormat;
     7 import com.mongodb.hadoop.MongoOutputFormat;
     8 import com.mongodb.hadoop.io.BSONWritable;
     9
    10 import org.apache.hadoop.conf.Configuration;
    11 import org.apache.hadoop.io.*;
    12 import org.apache.hadoop.mapreduce.*;
    13
    14
    15 public class WordCount {
    16
    17     public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, BSONWritable> {
    18         //private final static
    19         private Text word = new Text();
    20
    21         public void map(Object key, BSONObject value, Context context )
    22                 throws IOException, InterruptedException {
    23             String url = value.get("url").toString();
    24             StringTokenizer itr = new StringTokenizer(value.get("summary").toString().
    25                     replaceAll("\\p{Punct}|\\d","").replaceAll("\r\n", " ").replace("\r", " ").
    26                     replace("\n", " ").toLowerCase());
    27             while (itr.hasMoreTokens()) {
    28                 word.set(itr.nextToken());
    29                 BasicBSONObject urlCounts = new BasicBSONObject();
    30                 urlCounts.put(url, 1);
    31                 context.write(word, new BSONWritable(urlCounts));
    32             }
    33         }
    34     }
    35
    36     public static class IntSumReducer extends Reducer<Text, BSONWritable, Text, BSONWritable> {
    37         //private BasicBSONObject result = new BasicBSONObject();
    38
    39         public void reduce(Text key, Iterable<BSONWritable> values, Context context)
    40             throws IOException, InterruptedException {
    41             HashMap<String, Integer> mymap = new HashMap<String, Integer>();
    42             BasicBSONObject result = new BasicBSONObject();
    43             BasicBSONObject urlcount = new BasicBSONObject();
    44             for (BSONWritable val : values) {
    45                 @SuppressWarnings("unchecked")
    46                 BSONObject temp2 = val.getDoc();
    47                 @SuppressWarnings("unchecked")
    48                 HashMap<String, Integer> temp = (HashMap<String, Integer>) val.getDoc().toMap();
    49                 for (Map.Entry<String, Integer> entry :  temp.entrySet()) {
    50                     if (mymap.containsKey(entry.getKey())) {
    51                         mymap.put(entry.getKey(), entry.getValue()+1);
    52                     }
    53                     else {
    54                         mymap.put(entry.getKey(), 1);
    55                     }
    56                 }
    57             }
    58             result.putAll(mymap);
    59             context.write(key, new BSONWritable(result));
    60         }
    61     }
    62
    63     public static void main(String[] args) throws Exception {
    64         Configuration conf = new Configuration();
    65         conf.set( "mongo.input.uri" , "mongodb://localhost/stackoverflow.stackin" );
    66         conf.set( "mongo.output.uri" , "mongodb://localhost/stackoverflow.stackout" );
    67         @SuppressWarnings("deprecation")
    68         Job job = new Job(conf, "word count");
    69         job.setJarByClass(WordCount.class);
    70         job.setMapperClass(TokenizerMapper.class);
    71         //job.setCombinerClass(IntSumReducer.class);
    72         job.setReducerClass(IntSumReducer.class);
    73         job.setMapOutputKeyClass(Text.class);
    74         job.setMapOutputValueClass(BSONWritable.class);
    75         job.setOutputKeyClass(Text.class);
    76         job.setOutputValueClass(BSONWritable.class);
    77         job.setInputFormatClass( MongoInputFormat.class );
    78         job.setOutputFormatClass( MongoOutputFormat.class );
    79         System.exit(job.waitForCompletion(true) ? 0 : 1);
    80     }
    81 }

    设计的思路是,在map部分得到一个word以及键为url值为1的Bson对象,然后写入content中。对应的,在reduce部分对传入的值进行统计。

总结:本案例很简单,但是需要明白Hadoop MapReduce的原理以及mongo-hadoop API中对象的使用。如果有疑问,可以在评论区提出~

时间: 2024-07-28 15:55:22

使用hadoop mapreduce分析mongodb数据:(2)的相关文章

使用hadoop mapreduce分析mongodb数据

使用hadoop mapreduce分析mongodb数据 (现在很多互联网爬虫将数据存入mongdb中,所以研究了一下,写此文档) 版权声明:本文为yunshuxueyuan原创文章.如需转载请标明出处: http://www.cnblogs.com/sxt-zkys/QQ技术交流群:299142667 一. mongdb的安装和使用 1. 官网下载mongodb-linux-x86_64-rhel70-3.2.9.tgz 2. 解压 (可以配置一下环境变量) 3. 启动服务端 ./mongo

使用hadoop mapreduce分析mongodb数据:(1)

最近考虑使用hadoop mapreduce来分析mongodb上的数据,从网上找了一些demo,东拼西凑,终于运行了一个demo,下面把过程展示给大家 环境 ubuntu 14.04 64bit hadoop 2.6.4 mongodb 2.4.9 Java 1.8 mongo-hadoop-core-1.5.2.jar mongo-java-driver-3.0.4.jar mongo-hadoop-core-1.5.2.jar以及mongo-java-driver-3.0.4.jar的下载

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Hadoop源代码分析(包mapreduce.lib.input)

接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能.首先是input部分,它实现了MapReduce的数据输入部分.类图如下: 类图的右上角是InputFormat,它描述了一个MapReduceJob的输入,通过InputFormat,Hadoop可以: l          检查MapReduce输入数据的正确性: l          将输入数据切分为逻辑块InputSplit,这

MapReduce分析明星微博数据

互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离.歌星.影星.体育明星.作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单.同时,互联网的飞速发展本身也造就了一批互联网明星,这些人借助新的手段,最大程度发挥了粉丝经济的能量和作用,在互联网时代赚得盆满钵满. 正是基于这样一个大背景,今天我们做一个分析明星微博数据的小项目 1.项目需求 自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中. 2.数据集 明星 明星微博名称 粉丝

使用mapReduce分析简单天气数据

做demo前需要先搭建Hadoop集群,并且有linux基础,可参考 https://www.cnblogs.com/linyufeng/p/10831240.html 1.引出问题 给一串数据,找出每年的每个月温度最高的2天.其中有可能包含着相同的数据. 1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)

前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了Hadoop MapReduce部分的应用API,用于用户实现自己的MapReduce应用.但这些接口是给未来的MapReduce应用的,目前MapReduce框架还是使用老系统(参考补丁HADOOP-1230).下面我们来分析org.apache.hadoop.mapred,首先还是从mapred的MapReduce框架开始分析,下面的类图(灰色部分为标记为@Deprecated的类/接口): 我们把包m