使用hadoop mapreduce分析mongodb数据:(1)

最近考虑使用hadoop mapreduce来分析mongodb上的数据,从网上找了一些demo,东拼西凑,终于运行了一个demo,下面把过程展示给大家

环境

  • ubuntu 14.04 64bit
  • hadoop 2.6.4
  • mongodb 2.4.9
  • Java 1.8
  • mongo-hadoop-core-1.5.2.jar
  • mongo-java-driver-3.0.4.jar

mongo-hadoop-core-1.5.2.jar以及mongo-java-driver-3.0.4.jar的下载和配置

  • 编译mongo-hadoop-core-1.5.2.jar
  • $ git clone https://github.com/mongodb/mongo-hadoop
    $ cd mongo-hadoop
    $ ./gradlew jar
    • 编译时间比较长,成功编译之后mongo-hadoop-core-1.5.2.jar存在的路径是core/build/libs
  • 下载mongo-java-driver-3.0.4.jar
  • http://central.maven.org/maven2/org/mongodb/mongo-java-driver/3.0.4/选择 mongo-java-driver-3.0.4.jar

数据

  • 数据样例
  • > db.in.find({})
    { "_id" : ObjectId("5758db95ab12e17a067fbb6f"), "x" : "hello world" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb70"), "x" : "nice to meet you" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb71"), "x" : "good to see you" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb72"), "x" : "world war 2" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb73"), "x" : "see you again" }
    { "_id" : ObjectId("5758db95ab12e17a067fbb74"), "x" : "bye bye" }
  • 最后的结果
  • > db.out.find({})
    { "_id" : "2", "value" : 1 }
    { "_id" : "again", "value" : 1 }
    { "_id" : "bye", "value" : 2 }
    { "_id" : "good", "value" : 1 }
    { "_id" : "hello", "value" : 1 }
    { "_id" : "meet", "value" : 1 }
    { "_id" : "nice", "value" : 1 }
    { "_id" : "see", "value" : 2 }
    { "_id" : "to", "value" : 2 }
    { "_id" : "war", "value" : 1 }
    { "_id" : "world", "value" : 2 }
    { "_id" : "you", "value" : 3 }
  • 目标是统计每个文档中出现的词频,并且把单词作为key,词频作为value存在mongodb中

Hadoop mapreduce代码

  • Mapreduce 代码

     1 import java.util.*;
     2 import java.io.*;
     3
     4 import org.bson.*;
     5
     6 import com.mongodb.hadoop.MongoInputFormat;
     7 import com.mongodb.hadoop.MongoOutputFormat;
     8
     9 import org.apache.hadoop.conf.Configuration;
    10 import org.apache.hadoop.io.*;
    11 import org.apache.hadoop.mapreduce.*;
    12
    13
    14 public class WordCount {
    15     public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {
    16         private final static IntWritable one = new IntWritable(1);
    17         private Text word = new Text();
    18         public void map(Object key, BSONObject value, Context context )
    19                 throws IOException, InterruptedException {
    20             System.out.println( "key: " + key );
    21             System.out.println( "value: " + value );
    22             StringTokenizer itr = new StringTokenizer(value.get( "x" ).toString());
    23             while (itr.hasMoreTokens()) {
    24                 word.set(itr.nextToken());
    25                 context.write(word, one);
    26             }
    27         }
    28     }
    29     public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    30         private IntWritable result = new IntWritable();
    31         public void reduce(Text key, Iterable<IntWritable> values, Context context )
    32             throws IOException, InterruptedException {
    33             int sum = 0;
    34             for (IntWritable val : values) {
    35                 sum += val.get();
    36             }
    37             result.set(sum);
    38             context.write(key, result);
    39         }
    40     }
    41     public static void main(String[] args) throws Exception {
    42         Configuration conf = new Configuration();
    43         conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.in" );
    44         conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.out" );
    45         @SuppressWarnings("deprecation")
    46         Job job = new Job(conf, "word count");
    47         job.setJarByClass(WordCount.class);
    48         job.setMapperClass(TokenizerMapper.class);
    49         job.setCombinerClass(IntSumReducer.class);
    50         job.setReducerClass(IntSumReducer.class);
    51         job.setOutputKeyClass(Text.class);
    52         job.setOutputValueClass(IntWritable.class);
    53         job.setInputFormatClass( MongoInputFormat.class );
    54         job.setOutputFormatClass( MongoOutputFormat.class );
    55         System.exit(job.waitForCompletion(true) ? 0 : 1);
    56     }
    57 }
    • 注意:设置mongo.input.uri和mongo.output.uri

      1 conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.in" );
      2 conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.out" );
  • 编译
    • 编译

      $ hadoop com.sun.tools.javac.Main WordCount.java -Xlint:deprecation
    • 编译jar包

      $ jar cf wc.jar WordCount*.class
  • 运行
    • 启动hadoop,运行mapreduce代码必须启动hadoop

      $ start-all.sh
    • 运行程序
    • $ hadoop jar  wc.jar WordCount
  • 查看结果
  • $ mongo
    MongoDB shell version: 2.4.9
    connecting to: test
    > use testmr;
    switched to db testmr
    > db.out.find({})
    { "_id" : "2", "value" : 1 }
    { "_id" : "again", "value" : 1 }
    { "_id" : "bye", "value" : 2 }
    { "_id" : "good", "value" : 1 }
    { "_id" : "hello", "value" : 1 }
    { "_id" : "meet", "value" : 1 }
    { "_id" : "nice", "value" : 1 }
    { "_id" : "see", "value" : 2 }
    { "_id" : "to", "value" : 2 }
    { "_id" : "war", "value" : 1 }
    { "_id" : "world", "value" : 2 }
    { "_id" : "you", "value" : 3 }
    > 

以上是一个简单的例子,接下来我要用hadoop mapreduce处理mongodb中的更加复杂的数据。敬请期待,如果有疑问,请在留言区提出 ^_^

参考资料以及文档

  1. The elephant in the room mongo db + hadoop
  2. http://chenhua-1984.iteye.com/blog/2162576
  3. http://api.mongodb.com/java/2.12/com/mongodb/MongoURI.html
  4. http://stackoverflow.com/questions/27020075/mongo-hadoop-connector-issue

如果The elephant in the room mongo db +

时间: 2024-10-15 16:24:39

使用hadoop mapreduce分析mongodb数据:(1)的相关文章

使用hadoop mapreduce分析mongodb数据

使用hadoop mapreduce分析mongodb数据 (现在很多互联网爬虫将数据存入mongdb中,所以研究了一下,写此文档) 版权声明:本文为yunshuxueyuan原创文章.如需转载请标明出处: http://www.cnblogs.com/sxt-zkys/QQ技术交流群:299142667 一. mongdb的安装和使用 1. 官网下载mongodb-linux-x86_64-rhel70-3.2.9.tgz 2. 解压 (可以配置一下环境变量) 3. 启动服务端 ./mongo

使用hadoop mapreduce分析mongodb数据:(2)

在上一篇使用hadoop mapreduce分析mongodb数据:(1)中,介绍了如何使用Hadoop MapReduce连接MongoDB数据库以及如何处理数据库,本文结合一个案例来进一步说明Hadoop MapReduce处理MongoDB的细节 原始数据 > db.stackin.find({}) { "_id" : ObjectId("575ce909aa02c3b21f1be0bb"), "summary" : "go

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Hadoop源代码分析(包mapreduce.lib.input)

接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能.首先是input部分,它实现了MapReduce的数据输入部分.类图如下: 类图的右上角是InputFormat,它描述了一个MapReduceJob的输入,通过InputFormat,Hadoop可以: l          检查MapReduce输入数据的正确性: l          将输入数据切分为逻辑块InputSplit,这

MapReduce分析明星微博数据

互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离.歌星.影星.体育明星.作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单.同时,互联网的飞速发展本身也造就了一批互联网明星,这些人借助新的手段,最大程度发挥了粉丝经济的能量和作用,在互联网时代赚得盆满钵满. 正是基于这样一个大背景,今天我们做一个分析明星微博数据的小项目 1.项目需求 自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中. 2.数据集 明星 明星微博名称 粉丝

使用mapReduce分析简单天气数据

做demo前需要先搭建Hadoop集群,并且有linux基础,可参考 https://www.cnblogs.com/linyufeng/p/10831240.html 1.引出问题 给一串数据,找出每年的每个月温度最高的2天.其中有可能包含着相同的数据. 1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)

前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了Hadoop MapReduce部分的应用API,用于用户实现自己的MapReduce应用.但这些接口是给未来的MapReduce应用的,目前MapReduce框架还是使用老系统(参考补丁HADOOP-1230).下面我们来分析org.apache.hadoop.mapred,首先还是从mapred的MapReduce框架开始分析,下面的类图(灰色部分为标记为@Deprecated的类/接口): 我们把包m