由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释:
package com.txq.spark.test import org.apache.spark.graphx.util.GraphGeneratorsimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext, SparkException, graphx} import scala.reflect.ClassTag /** * spark GraphX 测试 * @authorTongXueQiang */object test { System.setProperty("hadoop.home.dir","D://hadoop-2.6.2"); val conf = new SparkConf().setMaster("local").setAppName("testRDDMethod"); val sc = new SparkContext(conf); def main(args: Array[String]): Unit = { /* val rdd = sc.textFile("hdfs://spark:9000/user/spark/data/SogouQ.sample");//搜狗搜索日志解析 val rdd1 = rdd.map(_.split("\t")).map(line=>line(3)).map(_.split(" ")); println("共有"+rdd1.count+"行"); val rdd2 = rdd1.filter(_(0).toInt == 1).filter(_(1).toInt == 1); println("搜索结果和点击率均排第一的共有"+rdd2.count+"行"); val users:RDD[(VertexId,(String,String))] = sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof")))); val relationships:RDD[Edge[String]] = sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi"))); val defaultUser = ("jone","Missing"); val graph = Graph(users,relationships,defaultUser); val result = graph.vertices.filter{case(id,(name,pos)) => pos == "prof"}.count(); println("职位名称为prof的个数有:" + result + "个"); println(graph.edges.filter(e => e.srcId > e.dstId).count()); graph.triplets.collect().foreach(println) graph.edges.collect().foreach(println)*/ /* val graph:Graph[Double,Int] = GraphGenerators.logNormalGraph(sc,numVertices = 100).mapVertices((id,_) => id.toDouble) println(graph); println(graph.vertices)*/ /* val oderFollowers:VertexRDD[(Int,Double)] = graph.mapReduceTriplets[(Int,Double)]( triplet =>{ if(triplet.srcAttr > triplet.dstAttr){ Iterator((triplet.dstId,(1,triplet.srcAttr))); } else { Iterator.empty } }, (a,b) =>(a._1 + b._1,a._2 + b._2) ) val avgAgeOfolderFollower:VertexRDD[Double] = oderFollowers.mapValues((id,value) => { value match{ case (count,totalAge) => totalAge / count } }) avgAgeOfolderFollower.collect().foreach(println)*/ //收集邻居节点,后面有自定义方法 //collectNeighborIds(EdgeDirection.In,graph).foreach(line => {print(line._1+":"); for (elem <- line._2) {print(elem + " ")};println;}); //以Google的网页链接文件(后面由下载地址)为例,演示pregel方法,找出从v0网站出发,得到经过的步数最少的链接网站,类似于附近地图最短路径算法 val graph:Graph[Double,Double] = GraphLoader.edgeListFile(sc,"hdfs://spark/user/spark/data/web-Google.txt",numEdgePartitions = 4).mapVertices((id,_) => id.toDouble).mapEdges(edge => edge.attr.toDouble); val sourceId:VertexId = 0;//定义源网页Id val g:Graph[Double,Double] = graph.mapVertices((id,attr) => if(id == 0) 0.0 else Double.PositiveInfinity) //pregel底层调用GraphOps的mapReduceTriplets方法,一会儿解释源代码 val result = pregel[Double,Double,Double](g,Double.PositiveInfinity)( (id,vd,newVd) => math.min(vd,newVd),//这个方法的作用是更新节点VertexId的属性值为新值,以利于innerJoin操作 triplets => {//map函数 if(triplets.srcAttr + triplets.attr < triplets.dstAttr){ Iterator((triplets.dstId,triplets.srcAttr + triplets.attr)) } else { Iterator.empty } }, (a,b) => math.min(a,b)//reduce函数 ) //输出结果,注意pregel返回的是更新VertexId属性的graph,而不是VertexRDD[(VertexId,VD)] print("最短节点:"+result.vertices.filter(_._1 != 0).reduce(min));//注意过滤掉源节点 } //找出路径最短的点 def min(a:(VertexId,Double),b:(VertexId,Double)):(VertexId,Double) = { if(a._2 < b._2) a else b } /** * 自定义收集VertexId的neighborIds * @author TongXueQiang */ def collectNeighborIds[T,U](edgeDirection:EdgeDirection,graph:Graph[T,U])(implicit m:scala.reflect.ClassTag[T],n:scala.reflect.ClassTag[U]):VertexRDD[Array[VertexId]] = { val nbrs = graph.mapReduceTriplets[Array[VertexId]]( //map函数 edgeTriplets => { val msgTosrc = (edgeTriplets.srcId,Array(edgeTriplets.dstId)); val msgTodst = (edgeTriplets.dstId,Array(edgeTriplets.srcId)); edgeDirection match { case EdgeDirection.Either =>Iterator(msgTosrc,msgTodst) case EdgeDirection.Out => Iterator(msgTosrc) case EdgeDirection.In => Iterator(msgTodst) case EdgeDirection.Both => throw new SparkException("It doesn‘t make sense to collect neighbors without a " + "direction.(EdgeDirection.Both is not supported.use EdgeDirection.Either instead.)") } },_ ++ _)//reduce函数 nbrs } /** * 自定义pregel函数 * @param graph 图 * @param initialMsg 返回的vertexId属性 * @param maxInterations 迭代次数 * @param activeDirection 边的方向 * @param vprog 更新节点属性的函数,以利于innerJoin操作 * @param sendMsg map函数,返回Iterator[A],一般A为Tuple2,其中id为接受消息方 * @param mergeMsg reduce函数,一般为合并,或者取最小、最大值……操作 * @tparam A 想要得到的VertexId属性 * @tparam VD graph中vertices的属性 * @tparam ED graph中的edge属性 * @return 返回更新后的graph */ def pregel[A:ClassTag,VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],initialMsg:A,maxInterations:Int = Int.MaxValue,activeDirection:EdgeDirection = EdgeDirection.Either)( vprog:(VertexId,VD,A) => VD, sendMsg:EdgeTriplet[VD,ED] =>Iterator[(VertexId,A)], mergeMsg:(A,A) => A) : Graph[VD,ED] = { Pregel0(graph,initialMsg,maxInterations,activeDirection)(vprog,sendMsg,mergeMsg)//调用apply方法 } //此为节点内连接函数,返回VertexRDD def innerJoin[U:ClassTag,VD:ClassTag](table:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,U) => VertexRDD[(VertexId,VD)]) = { val uf = (id: VertexId, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) case None => data } } } //测试Option[T] def test():Unit = { val map = Map("a" -> "1","b" -> "2","c" -> "3"); def show(value:Option[String]):String = { value match{ case Some(x) => x case None => "no value found!" } } println(show(map.get("a")) == "1"); }}
下面重点研究Pregel,为了方便,自己重新定义了一个Pregel0
package com.txq.spark.test import org.apache.spark.Loggingimport org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}import scala.reflect.ClassTag /** * 自定义Pregel object,处理思路: */object Pregel0 extends Logging { def apply[VD:ClassTag,ED:ClassTag,A:ClassTag] (graph:Graph[VD,ED], initialMsg:A, maxIterations:Int = Int.MaxValue, activeDirection:EdgeDirection = EdgeDirection.Either) (vprog:(VertexId,VD,A) => VD, sendMsg:EdgeTriplet[VD,ED] => Iterator[(VertexId,A)], mergeMsg:(A,A) => A) : Graph[VD,ED] = { //①对vertices进行更新操作 var g = graph.mapVertices((vid,vdata) => vprog(vid,vdata,initialMsg)).cache(); //②compute the messages,注意调用的是mapReduceTriplets方法,源代码:
def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduce: (A, A) => A),
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None )
: VertexRDD[A]
var messages = g.mapReduceTriplets(sendMsg,mergeMsg); print("messages:"+messages.take(10).mkString("\n")) var activeMessages = messages.count(); //LOAD var prevG:Graph[VD,ED] = null var i = 0; while(activeMessages > 0 && i < maxIterations){ //③Receive the messages.Vertices that didn‘t get any message do not appear in newVerts. //内联操作,返回的结果是VertexRDD,可以参看后面的调试信息 val newVerts = g.vertices.innerJoin(messages)(vprog).cache(); print("newVerts:"+newVerts.take(10).mkString("\n")) //④update the graph with the new vertices. prevG = g;//先把旧的graph备份,以利于后面的graph更新和unpersist掉旧的graph //④外联操作,返回整个更新的graph g = g.outerJoinVertices(newVerts){(vid,old,newOpt) => newOpt.getOrElse(old)}//getOrElse方法,意味,如果newOpt存在,返回newOpt,不存在返回old print(g.vertices.take(10).mkString("\n")) g.cache();//新的graph cache起来,下一次迭代使用 val oldMessages = messages;//备份,同prevG = g操作一样 //Send new messages.Vertices that didn‘t get any message do not appear in newVerts.so //don‘t send messages.We must cache messages.so it can be materialized on the next line. //allowing us to uncache the previous iteration. //⑤下一次迭代要发送的新的messages,先cache起来 messages = g.mapReduceTriplets(sendMsg,mergeMsg,Some((newVerts,activeDirection))).cache() print("下一次迭代要发送的messages:"+messages.take(10).mkString("\n")) activeMessages = messages.count();//⑥ print("下一次迭代要发送的messages的个数:"+ activeMessages)//如果activeMessages==0,迭代结束 logInfo("Pregel finished iteration" + i); //原来,旧的message和graph不可用了,unpersist掉 oldMessages.unpersist(blocking= false); newVerts.unpersist(blocking=false)//unpersist之后,就不可用了 prevG.unpersistVertices(blocking=false) prevG.edges.unpersist(blocking=false) i += 1; } g//返回最后的graph } }输出的调试信息:(距离v0节点最近的节点)第一次跌代:
messages:(11342,1.0)
(824020,1.0)
(867923,1.0)
(891835,1.0)
newVerts:(11342,1.0)
(824020,1.0)
(867923,1.0)
(891835,1.0)
下一次迭代要发送的messages:(302284,2.0)
(760842,2.0)
(417728,2.0)
(322178,2.0)
(387543,2.0)
(846213,2.0)
(857527,2.0)
(856657,2.0)
(695578,2.0)
(27469,2.0)
下一次迭代要发送的messages的个数:29
下一次迭代要发送的messages:(754862,3.0)
(672426,3.0)
(320258,3.0)
(143557,3.0)
(789355,3.0)
(596104,3.0)
(118398,3.0)
(30115,3.0)
下一次迭代要发送的messages的个数:141
依次不断类推,直到activeMessages = 0跌代结束。
上面需要cache的有:graph,messages,newVertis.spark中的创建RDD和transformation操作都是lazy的,存储的只是内存地址,并非真正创建对象,当进行action时,需要从头至尾运行一遍,所以cache之后,重复利用RDD,再次进行action时,速度会大大提升。unpersist之后,就不能后了,所以需要把旧的备份。
一般情况使用mapReduceTriplets可以解决很多问题,为什么Spark GraphX会提供Pregel API?主要是为了更方便地去做迭代操作。因为在GraphX里面,Graph这张图并没有自动cache,而是手动cache。但是为了每次迭代更快,需要手动去做cache,每次迭代完就需要把没用的删除掉而把有用的保留,这比较难以控制。因为Graph中的点和边是分开进行Cache的,而Pregel能够帮助我们。例如,PangeRank就非常适合用Pregel来做。
web-Google.txt.gz文件下载地址:http://snap.stanford.edu/data/web-Google.html
佟氏出品,必属精品!专注spark GraphX、数据挖掘、机器学习的源代码和算法,扎扎实实,写好每一行代码!