spark/hadoop整合mongodb

MongoDB是一个文档型数据库,它可以方便的应用于大多数语言,其次是实现是C++,根据相关人员的测试证明mongodb的查询性能要好于现在市面上好多nosql数据库,相关的测试连接如下:

http://www.kuqin.com/shuoit/20140928/342398.html

下面简单介绍mongodb:

一、Mongodb特性

1、模式自由,支持动态查询、完全索引,可轻易查询文档中内嵌的对象及数组。

2、面向集合存储,易存储对象类型的数据,包括文档内包括文档内嵌对象及数组。

3、高效的数据存储,支持二进制数据及大型对象

4、支持复制和故障恢复:提供了主-从、主-主模式的数据复制及服务器之间的数据复制

5、自动分片以支持云级别的伸缩性,支持水平的数据库集群,可动态添加额外的服务器。

二、使用场景

1、适合作为信息基础设施的持久化缓存层

2、适合实时的插入,更新与查询,并具备应用程序实时数据存储所需的复制及高度伸缩性。

3、Mongodb的BSON数据格式非常适合文档化格式的存储及查询

4、适合由数十或数百台服务器组成的数据库。因为Mongodb已经包含了对MapReduce引擎的内置支持

三、不适合的场景

1、要求高度事务性的系统

2、传统的商业智能应用

3、复杂的跨文档(表)级联查询。

现在面临着大数据时代的挑战,下面开始讲述spark计算框架整合mongodb的使用。

先讲述spark读取mongodb:本人常用有两个方法,分别开始介绍:

spark整合mongodb之从mongodb读取:

 //方案1
val mongoConfig = new Configuration( )
mongoConfig.set("mongo.input.uri", "mongodb://master:20000,slave1:20000,slave2:20000/yang.relation2")
     mongoConfig.set( "mongo.input.split_size", "32" )//输入的大小
     mongoConfig.set( "mongo.input.split.read_shard_chunks", "true" )//读取分片
     mongoConfig.set( "mongo.input.fields{\"srcid\":\"1\",\"dstid\":\"1\"}" ) //读取的时候只读取自己需要的列 1表示读出,0表示不需要类似mongodb里面的projecttion
mongoConfig.set( "mongo.input.query","{\"dstid\":{\"$gt\":\"0\"}}" )
val readfile = sc.newAPIHadoopRDD( mongoConfig, classOf[ MongoInputFormat ], classOf[ Object ],classOf[ BSONObject ] )
readfile.count( )

//方案2
 val sqlContex = new SQLContext( sc )
 val builder = MongodbConfigBuilder(Map(Host -> Host -> List("master:27017","slave1:27017","slave2:27017"), Database -> "graphdb",
  Collection -> "mongo", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal ) )
 val mconf = builder.build( )
 val readfile2 = sqlContex.fromMongoDB( mconf )
readfile2.count()

spark整合mongodb之写入mongodb:

方案1:

val mongoConfig = new Configuration()
    mongoConfig.set("mongo.auth.uri","mongodb://"+ userName +":"+ pwd+"@"+hosts+"/admin")
    mongoConfig.set("mongo.output.uri","mongodb://"+ hosts + "/GRAPHDB.DB_GRAPH")
    saveRdd.saveAsNewAPIHadoopFile("", classOf[Object], classOf[BSONObject],
      classOf[MongoOutputFormat[Object, BSONObject]], mongoConfig)

方案2:

import MongodbConfig._
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern, MongoClient}
import com.stratio.provider.mongodb._
 val sqlContext = new SQLContext( sc )
 val property = Array("id","name","age","sex","info")
 val dataFrame = sqlContext.createDataFrame(  data ).toDF( property:_*)
 val builder = MongodbConfigBuilder(Map(Host -> List("master:27017","slave1:27017","slave2:27017"), Database -> "test",
      Collection -> "test", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal))
 val mongoConf = builder.build()
 val dataFrame: DataFrame = sqlcontex.createDataFrame( rdd )
 dataFrame.saveToMongodb(mongoConf,true)
方案3:
利用rdd的foreachPartition在每个paritition建立连接,导入数据,此时如果分区输比较多,分配给spark的cpu核数比较多的话,会出现很多问题,比如:在查看mongodb日志的时候,mongos进程有时候会挂掉,是因为mongodb在分配读写锁的时候出现了问题,而且还会出现OOM(无法创建本地线程,这一点本小白正在解决)。一定要在里面创建连接哟,否则会出现序列化问题。

hadoo整合mongodb更新:

val mongoConfig = new Configuration()

mongoConfig.set(“mongo.output.uri”,”mongodb://master:27017/db.table”)

saveRdd.saveAsNewAPIHadoopFile(“”, classOf[ Object ], classOf[ MongoUpdateWritable ],

classOf[ MongoOutputFormat[ Object,MongoUpdateWritable ] ],mongoConfig ).

更新的时候可以结合mongodb的数值修改器使用。以后有时间了给大家分享数据修改器的使用。本人小白一枚,如果有问题,希望大家给予指出,小杨在这里拜谢各位大神了。

时间: 2024-10-08 19:32:28

spark/hadoop整合mongodb的相关文章

Spark Streaming整合Kafka

0)摘要 主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式.这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html). 1)Kafka准备 启动zookeeper ./zkServer.sh start 启动kafka ./kafka-server-star

Spark Streaming整合Flume

1 目的 Spark Streaming整合Flume.参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html) 2 整合方式一:基于推 2.1 基本要求 flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据 streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据 添加如下依赖 groupId = org.

spring项目整合mongodb进行开发

spring项目整合mongodb进行开发: MongoDB的性能指标: 100个并发,插入550万条记录的平均吞吐量:大约4100条/秒 MONGODB实际上是一个内存数据库,先将数据保存到内存,然后再写入磁盘中 1.官网下载mongodb. https://www.mongodb.org/downloads 2.redhat上安装好mongodb 3.

SpringMVC整合MongoDB

首先,在pom文件中新增spring-data-mongodb的依赖: <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-mongodb</artifactId> <version>1.8.1.RELEASE</version></dependency>然后,新建spring-mongo.xml &l

spring MVC 整合mongodb

Spring Mongodb 目录 1 SPRING整合MONGODB 1 1.1 环境准备 1 1.2 包依赖 1 1.3 配置 2 2 案列 5 2.1 SPRING MVC整合MONGODB代码案例 5 1 Spring整合Mongodb 1.1 环境准备 1. mongodb官网 http://www.mongodb.org/,下载mongodb安装包和mongodb的java驱动包. mongodb安装包(下载地址http://www.mongodb.org/downloads).Mo

springmvc与hadoop整合时jackson包冲突

因项目需要,将springmvc和hbase整合,启动tomcat时报错如下: SEVERE: Exception sending context initialized event to listener instance of class org.springframework.web.context.ContextLoaderListenerorg.springframework.beans.factory.BeanCreationException: Error creating bea

Spark+hadoop+mllib及相关概念与操作笔记

Spark+hadoop+mllib及相关概念与操作笔记 作者: lw 版本: 0.1 时间: 2016-07-18 1.调研相关注意事项 a) 理解调研 调研的意义在于了解当前情况,挖掘潜在的问题,解决存在的疑问,并得到相应的方案. b) 调研流程 首先明确和梳理现有的疑问是什么,要通过调研解决什么问题,然后再去做调研,发现问题,再解决问题. c) 调研成果 最终需要得到结论与方案,以及详尽的论证理由,让别人信服. d) 书写格式 版本与作者以及时间可以以表格的形式,整齐明了. 结论简洁明了,

hadoop整合到web工程发布到tomcat报错

我是用的maven,至少要移出如下的jar包: <!-- hadoop相关包 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.0.4</version> <exclusions> <exclusion> <artifactId&

Spark 系列(十五)—— Spark Streaming 整合 Flume

一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 Flume 的整合. 二.推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口.这里以监听日志文件为例,具体整合方式如