spark集群中的节点可以只处理自身独立数据库里的数据,然后汇总吗?修改
我将spark搭建在两台机器上,其中一台既是master又是slave,另一台是slave,两台机器上均装有独立的mongodb数据库。我是否可以让它们只统计自身数据库的内容,然后将结果汇总到一台服务器上的数据库里?目前我的代码如下,但是最终只统计了master里的数据,另一个worker没有统计上。
val config = new Configuration()
//以下代码表示只统计本机数据库上的数据,猜测问题可能出在这里
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/local.test")
//统计结果输出到服务器上
config.set("mongo.output.uri", "mongodb://103.25.23.80:60013/test_hao.result")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
// Input contains tuples of (ObjectId, BSONObject)
val countsRDD = mongoRDD.flatMap(arg => {
var str = arg._2.get("type").toString
str = str.toLowerCase().replaceAll("[.,!?\n]", " ")
str.split(" ")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val saveRDD = countsRDD.map((tuple) => {
var bson = new BasicBSONObject()
bson.put("word", tuple._1)
bson.put("count", tuple._2.toString() )
(null, bson)
})
// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
自问自答。原因可能是这样:
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
这行代码表示这是由driver读取数据库,然后将符合条件的数据载入RDD,由于之前设置了是将127.0.0.1作为输入,也就是从driver的mongodb上读取数据。由于driver就在master上,所以读取的数据也自然就是master上的数据了。
怎么掌握HA下的Spark集群工作原理?
默认情况下,Standalone的Spark集群是Master-Slaves架构的集群模式,由一台master来调度资源,这就和大部分的Master-Slaves结构集群一样,存在着Master单点故障的问题。如何解决这个单点故障的问题呢?Spark提供了两种方案:基于文件系统的单点恢复(Single-Node Recovery with Local File system)和基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)。其中ZooKeeper是生产环境下的最佳选择。
ZooKeeper提供了一个Leader Election机制,利用这个机制你可以在集群中开启多个master并使它们都注册到ZooKeeper实例,ZooKeeper会管理使其中只有一个是Active的,其他的都是Standby的,Active状态的master可以提供服务,standby状态的则不可以。ZooKeeper保存了集群的状态信息,该信息包括所有的Worker,Driver 和Application。当Active的Master出现故障时,ZooKeeper会从其他standby的master中选举出一台,然后该新选举出来的master会恢复挂掉了的master的状态信息,之后该Master就可以正常提供调度服务。整个恢复过程只需要1到2分钟。需要注意的是,在这1到2分钟内,只会影响新程序的提交,那些在master崩溃时已经运行在集群中的程序并不会受影响。
下面我们就实战如何配置ZooKeeper下的spark HA。
注:
DT_大数据梦工厂(IMF传奇行动绝密课程)有所有大数据实战资料
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
spark集群搭建:https://zhuanlan.zhihu.com/p/20819281 http://www.cnblogs.com/shishanyuan/p/4699644.htmlhttp://www.cnblogs.com/kinglau/p/3794433.htmlhttp://www.cnblogs.com/tec-vegetables/p/3778358.htmlhttp://blog.csdn.net/laoyi_grace/article/details/6254743http://www.cnblogs.com/tina-smile/p/5045927.htmlhttp://edu.51cto.com/lesson/id-31786.html
没有hdfs,spark不能集群方式跑是吧?修改
可以读本地文件,但是只能spark本地模式运行计算这个从本地读的数据,是吧?修改
一个计算服务 HDFS只是输入的一种 本地文件、队列、Netcat 都可以作为输入 咋就不能跑了
另外 区分下YARN和HDFS YARN也不是必须依赖HDFS的
kafka,redis,mysql,都可以作为数据源。只要你能拉到数,管从哪拉呢?
可以跑。
配置你每个节点上都存一份,或者放在 NFS 里。
数据你不从 HDFS 取就行了。
可以集群方式运行。但是要求所有节点都能用同样的路径访问到数据文件。解决方案一是在每个节点上的同样路径放上同样的数据文件。解决方案二是采用网络共享目录和文件方式。比如linux的nfs方式共享。这两种方法都比较麻烦,因此,还是用hdfs吧。
spark-standalone模式可以不启动hdfs跑吗