基于MongoDB分布式存储进行MapReduce并行查询

中介绍了如何基于Mongodb进行关系型数据的分布式存储,有了存储就会牵扯到查询。虽然用普通的方式也可以进行查询,但今天要介绍的是如何使用MONGODB中提供的MapReduce功能进行查询。
      有关MongoDb的MapReduce之前我写过一篇文章 Mongodb Mapreduce 初窥

今天介绍如何基于sharding机制进行mapreduce查询。在MongoDB的官方文档中,这么一句话:

Sharded Environments
      In sharded environments, data processing of map/reduce operations runs in parallel on all shards.

即: map/reduce操作会并行运行在所有的shards上。
      下面我们就用之前这篇文章中白搭建的环境来构造mapreduce查询:

首先要说的是,基于sharding的mapreduce与非sharding的数据在返回结构上有一些区别,我目前注意到的主要是不支持定制式的json格式的返回数据,也就是下面方式可能会出现问题:

return { count : total };

注意:上面的情况目前出现在了我的测试环境下,如下图:
     
         
     
     就需要改成 return count;
     
     下面是测试代码,首先是按帖子id来查询相应数量(基于分组查询实例方式):

public partial class getfile : System.Web.UI.Page
    {

public Mongo Mongo { get; set; }

public IMongoDatabase DB
        {
            get
            {
                return this.Mongo["dnt_mongodb"];
            }
        }

/// <summary>
        /// Sets up the test environment.  You can either override this OnInit to add custom initialization.
        /// </summary>
        public virtual void Init()
        {
            string ConnectionString = "Server=10.0.4.85:27017;ConnectTimeout=30000;ConnectionLifetime=300000;MinimumPoolSize=512;MaximumPoolSize=51200;Pooled=true";
            if (String.IsNullOrEmpty(ConnectionString))
                throw new ArgumentNullException("Connection string not found.");
            this.Mongo = new Mongo(ConnectionString);
            this.Mongo.Connect();         
        }
        string mapfunction = "function(){\n" +
                        "  if(this._id==‘548111‘) { emit(this._id, 1); } \n" +   
                        "};";

string reducefunction = "function(key, current ){" +
                                "   var count = 0;" +
                                "   for(var i in current) {" +
                                "       count+=current[i];" +
                                "   }" +
                                "   return count ;\n" +
                              "};";

protected void Page_Load(object sender, EventArgs e)
        {
            Init();

var mrb = DB["posts1"].MapReduce();//attach_gfstream.files
            int groupCount = 0;
            using (var mr = mrb.Map(mapfunction).Reduce(reducefunction))
            {
                foreach (Document doc in mr.Documents)
                {
                    groupCount = int.Parse(doc["value"].ToString());
                }
            }
            this.Mongo.Disconnect();
        }     
     }

下面是运行时的查询结果,如下:
     
          
     
     
     接着演示一下如何把查询到的帖子信息返回并装入list集合,这里只查询ID为548110和548111两个帖子:

string mapfunction = "function(){\n" +
                        "  if(this._id==‘548110‘|| this._id==‘548111‘) { emit(this, 1); } \n" +    
                        "};";

string reducefunction = "function(doc, current ){" +
                                "   return doc;\n" +
                               "};";
      
        protected void Page_Load(object sender, EventArgs e)
        {
            Init();

var mrb = DB["posts1"].MapReduce();//attach_gfstream.files
            List<Document> postDoc = new List<Document>();
            using (var mr = mrb.Map(mapfunction).Reduce(reducefunction))
            {
                foreach (Document doc in mr.Documents)
                {
                    postDoc.Add((Document)doc["value"]);
                }
            }
            this.Mongo.Disconnect();
        }

下面是运行时的查询结果,如下:
     
    

上面的map/reduce方法还有许多写法,如果大家感兴趣可以看一下如下这些链接:     
     http://cookbook.mongodb.org/patterns/unique_items_map_reduce/
     http://www.mongodb.org/display/DOCS/MapReduce
     
     以及之前我写的这篇文章:http://www.cnblogs.com/daizhj/archive/2010/06/10/1755761.html
     
     
     当然在mongos进行map/reduce运算时,会生成一些临时文件,如下图:
  
     
     我猜这些临时文件可能会对再次查询系统时的性能有一些提升(但目前未观察到)。
     
     当然对于mongodb的gridfs系统(可使用它搭建分布式文件存储系统,我之前在这篇文章中已介绍过,我也做了测试,但遗憾的是并未成功,它经常会报一些错误,比如:

Thu Sep 09 12:09:29   Assertion failure _grab client\parallel.cpp 461

看来mapreduce程序链接到mongodb上时,会产生一些问题,但不知道是不是其自身稳定性的原因,还是我的机器环境设置问题(内存或配置的64位系统mongos与32位的client连接问题)。
     
     好了,今天的文章就先到这里了。

时间: 2024-10-09 21:03:05

基于MongoDB分布式存储进行MapReduce并行查询的相关文章

基于Solr的HBase多条件查询测试

转自:http://www.cnblogs.com/chenz/articles/3229997.html 背景: 某电信项目中采用HBase来存储用户终端明细数据,供前台页面即时查询.HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力.针对HBase的多条件查询也有多种方案,但是这些方案要么太复杂,要么效率太低,本文只对基于Solr的HBase多条件查询方案进行测试和验证. 原理: 基于Solr的HBase多条件查询原理很简单,将HBas

【MongoDB】MongoDB数据库之MapReduce编程模型

刚开始阅读<Mongodb入门手册>时候看到mapreduce,当时感觉好难,就直接忽略了.现在重新看到这部分知识的时候,痛下决心学习这块知识. 一.概念说明 MongoDB的MapReduce相当于Mysql中"group by",在mongodb上使用mapreduce执行并行数据统计很容易:使用MapReduce要实现两个函数: map 和 reduce. map函数调用emit(key,value)遍历collection中所有的记录,将key和value传递给Re

基于mongodb的地理检索实现

使用mongoDB不是很多,记得以前做"家长助手"的时候,使用过一点.只是在去年做"派单系统"的时候,又再一次使用mongoDB. 在这里先简单介绍一下派单系统,派单系统在云足疗(O2O,上门足疗)里一个专门负责订单派送,提高订单完成效率的一个系统.主要是当一个来了之后,会根据订单的服务项目.服务时间和服务地点,快速找到最合适最优秀的技师,返回给用户.由于上门足疗特殊行业的要求,不能给订单指定技师直接下单.而是将筛选的一些优秀的技师返回给用户,让用户自己去选择指派给

基于Solr的HBase多条件查询

基于Solr的HBase多条件查询——转载: 背景: 某电信项目中采用HBase来存储用户终端明细数据,供前台页面即时查询.HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力. 针对HBase的多条件查询也有多种方案,但是这些方案要么太复杂,要么效率太低,本文只对基于Solr的HBase多条件查询方案进行测试和验证. 原理: 基于Solr的HBase多条件查询原理很简单,将HBase表中涉及条件过滤的字段和rowkey在Solr中建立索引,

MongoShake——基于MongoDB的跨数据中心的数据复制平台

摘要: MongoShake是基于MongoDB的通用型平台服务,作为数据连通的桥梁,打通各个闭环节点的通道.通过MongoShake的订阅消费,可以灵活对接以适应不同场景,例如日志订阅.数据中心同步.监控审计等.其中,集群数据同步作为核心应用场景,能够灵活实现灾备和多活的业务场景. 背景 在当前的数据库系统生态中,大部分系统都支持多个节点实例间的数据同步机制,如Mysql Master/Slave主从同步,Redis AOF主从同步等,MongoDB更是支持3节点及以上的副本集同步,上述机制很

基于MongoDB.Driver的扩展

由于MongoDB.Driver中的Find方法也支持表达式写法,结合[通用查询设计思想]这篇文章中的查询思想,个人基于MongoDB扩展了一些常用的方法. 首先我们从常用的查询开始,由于MongoDB.Driver支持类似于AutoMapper返回的指定属性(Project<TDto>方法),所以这里都是基于泛型的扩展 /// <summary> /// 同步查询指定条件的数据,并且返回指定类型TDto /// </summary> /// <typeparam

mongodb aggregate and mapReduce

Aggregate MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果.有点类似sql语句中的 count(*) 语法如下: db.collection.aggregate() db.collection.aggregate(pipeline,options) db.runCommand({ aggregate: "<collection>", pipeline: [ <stage>, <...&g

基于py3和pymysql的数据库查询,查询某几列的数据

#python3 #xiaodeng #基于py3和pymysql的数据库查询,查询某几列的数据 import pymysql conn=pymysql.connect(....) cur=conn.cursor() cur.execute("select name,age from nlist") data=cur.fethall() for name,age in data: print name,age conn.close() cur.close()

MongoDB之DBref(关联插入,查询,删除) 实例深入

MongoDB之DBref(关联插入,查询,删除) 实例深入 如图所示,A,B,C三个Collection互相关联. 其中的数字为document的value值. 关于DBref的入门可以看http://blog.csdn.net/crazyjixiang/article/details/6616678这篇文章. 我们先建立A collection. Cpp代码 > var a={value:"1"} > var b={value:"2"} > v