Spark下的PageRank实现

val sc = new SparkContext(...)
val links = sc.parallelize(Array(('A',Array('D')),('B',Array('A')),('C',Array('A','B')),('D',Array('A','C'))),2).map(x => (x._1,x._2)).cache()

var ranks = sc.parallelize(Array(('A',1.0),('B',1.0),('C',1.0),('D',1.0)),2)

val iterations_num = 50

for(i <- 1 to iterations_num){
	val contribs = links.join(ranks,2).flatMap{
		case(url,(links,rank)) => links.map(dest => (dest,rank/links,size))
	}

	ranks = contribs.reduceByKey(_ + _,2).mapValues(0.15 + 0.85 * _)
}

ranks.saveAsTextFile(...)

时间: 2024-11-08 11:06:00

Spark下的PageRank实现的相关文章

Spark下的FP-Growth和Apriori(频繁项集挖掘并行化算法)

频繁项集挖掘是一个关联式规则挖掘问题.关联挖掘是数据挖掘中研究最早也是最活跃的领域,其中频繁模式的挖掘是关联挖掘的核心和基础,是产生关联规则挖掘的基础.频繁项集最经典的应用就是超市的购物篮分析. 首先要理解频繁项集中的以下概念. 频繁项:在多个集合中,频繁出现的元素项. 频繁项集:在一系列集合中每项都含有某些相同的元素,这些元素形成一个子集,满足一定阀值就是频繁项集. K项集:K个频繁项组成的一个集合. 支持度:包含频繁项集(F)的集合的数目. 可信度:频繁项与某项的并集的支持度与频繁项集支持度

Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2

Spark下生成2000w测试数据(每条记录150列) 使用spark生成大量数据过程中遇到问题,如果sc.parallelize(fukeData, 64);的记录数特别大比如500w,1000w时,会特别慢,而且会抛出内存溢出over head错误.解决方案,一次生成的数据量不高于100w,多次调用,这样下来一共生成2000w耗时十几分钟. 如果环境允许你可以在本地生成测试数据,然后上传到hdfs供spark测试. import java.io.BufferedWriter; import

PageRank在Hadoop和spark下的实现以及对比

关于PageRank的地位,不必多说. 主要思想:对于每个网页,用户都有可能点击网页上的某个链接,例如 A:B,C,D B:A,D C:AD:B,C 由这个我们可以得到网页的转移矩阵 A    B    C    D A  0    1/2  1    0 B 1/3   0    0    0 C 1/3  1/2  0    0 D 1/3  0     0    1/2 Aij表示网页j到网页i的转移概率.假设起始状态每个用户对ABCD四个网站的点击概率相同都是0.25,那么各个网站第一次

关于Spark下的standalone模式的搭建

1.介绍standalone Standalone模式是Spark自身管理资源的一个模式,类似Yarn Yarn的结构: ResourceManager: 负责集群资源的管理 NodeManager:负责当前机器的资源管理 CPU&内存 Spark的Standalone的结构: Master: 负责集群资源管理 Worker: 负责当前机器的资源管理 CPU&内存 2.在local得基础上搭建standalone 3.修改env.sh 4.修改slave 5.启动

Spark下的RHive的连接

Sys.setenv(HADOOP_CONF_DIR='/etc/hadoop/conf.cloudera.hdfs')Sys.setenv(YARN_CONF_DIR='/etc/hadoop/conf.cloudera.yarn')library(SparkR, lib.loc = "/opt/modules/spark/R/lib")library(RHive) sc <- sparkR.init(sparkHome = "/opt/modules/spark&q

使用Spark下的corr计算皮尔森相似度Pearson时,报错Can only zip RDDs with same number of elements in each partition....

package com.huawei.bigdata.spark.examples import org.apache.spark.mllib.stat.Statistics import org.apache.spark.sql.types.DoubleType import org.apache.spark.{SparkConf, SparkContext} /** * Created by wulei on 2017/8/3. */ object PointCorrPredict { de

spark下dataframe转为rdd格式

dataframe可以实现很多操作,但是存储到本地的时候,只能存 parquest格式 需要存储源格式,需要转换为rdd类型 将dataframe中的每一行都map成有逗号相连的string,就变为了一个rdd

在Ubuntu下搭建Spark群集

在前一篇文章中,我们已经搭建好了Hadoop的群集,接下来,我们就是需要基于这个Hadoop群集,搭建Spark的群集.由于前面已经做了大量的工作,所以接下来搭建Spark会简单很多. 首先打开三个虚拟机,现在我们需要安装Scala,因为Spark是基于Scala开发的,所以需要安装Scala.在Ubuntu下安装Scala很简单,我们只需要运行 sudo apt-get install scala 就可以安装Scala了. 安装完成后运行scala -version可以看到安装的Scala的版

简单记录下spark环境搭建时的几个小问题

折腾了好几天,终于可以在CenOS下用eclipse写spark程序了 当前环境是CentOS6.5,jdk7,scala2.10.4,spark0.9.1 前期因为对linux不熟悉的原因花了不少时间查阅命令.折腾了不少时间,终于把jdk8,scala2.11.0环境变量配置好了.在此环境下可以正常编译scala程序,但是在spark下运行sbt/sbt assembly时报了错误:rt.jar is broken.上google查了不少资料,大概是因为jdk8和jdk7之间的差异造成的,所以