Spark中GraphX图运算pregel详解

由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释:

package com.txq.spark.test

import org.apache.spark.graphx.util.GraphGeneratorsimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext, SparkException, graphx}

import scala.reflect.ClassTag

/**  * spark GraphX 测试  * @authorTongXueQiang  */object test {

System.setProperty("hadoop.home.dir","D://hadoop-2.6.2");    val conf = new SparkConf().setMaster("local").setAppName("testRDDMethod");    val sc = new SparkContext(conf);

def main(args: Array[String]): Unit = {      /*      val rdd = sc.textFile("hdfs://spark:9000/user/spark/data/SogouQ.sample");//搜狗搜索日志解析      val rdd1 = rdd.map(_.split("\t")).map(line=>line(3)).map(_.split(" "));      println("共有"+rdd1.count+"行");      val rdd2 = rdd1.filter(_(0).toInt == 1).filter(_(1).toInt == 1);      println("搜索结果和点击率均排第一的共有"+rdd2.count+"行");      val users:RDD[(VertexId,(String,String))] = sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof"))));      val relationships:RDD[Edge[String]] = sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi")));      val defaultUser = ("jone","Missing");      val graph = Graph(users,relationships,defaultUser);      val result = graph.vertices.filter{case(id,(name,pos)) => pos == "prof"}.count();      println("职位名称为prof的个数有:" + result + "个");      println(graph.edges.filter(e => e.srcId > e.dstId).count());      graph.triplets.collect().foreach(println)      graph.edges.collect().foreach(println)*/      /*      val graph:Graph[Double,Int] = GraphGenerators.logNormalGraph(sc,numVertices = 100).mapVertices((id,_) => id.toDouble)      println(graph);      println(graph.vertices)*/

/*      val oderFollowers:VertexRDD[(Int,Double)] = graph.mapReduceTriplets[(Int,Double)](        triplet =>{          if(triplet.srcAttr > triplet.dstAttr){            Iterator((triplet.dstId,(1,triplet.srcAttr)));          } else {            Iterator.empty          }        },        (a,b) =>(a._1 + b._1,a._2 + b._2)      )      val avgAgeOfolderFollower:VertexRDD[Double] = oderFollowers.mapValues((id,value) => {        value match{          case (count,totalAge) => totalAge / count        }      })

avgAgeOfolderFollower.collect().foreach(println)*/    //收集邻居节点,后面有自定义方法      //collectNeighborIds(EdgeDirection.In,graph).foreach(line => {print(line._1+":"); for (elem <- line._2) {print(elem + " ")};println;});   //以Google的网页链接文件(后面由下载地址)为例,演示pregel方法,找出从v0网站出发,得到经过的步数最少的链接网站,类似于附近地图最短路径算法      val graph:Graph[Double,Double] = GraphLoader.edgeListFile(sc,"hdfs://spark/user/spark/data/web-Google.txt",numEdgePartitions = 4).mapVertices((id,_) => id.toDouble).mapEdges(edge => edge.attr.toDouble);      val sourceId:VertexId = 0;//定义源网页Id      val g:Graph[Double,Double] = graph.mapVertices((id,attr) => if(id == 0) 0.0 else Double.PositiveInfinity)      //pregel底层调用GraphOps的mapReduceTriplets方法,一会儿解释源代码      val result = pregel[Double,Double,Double](g,Double.PositiveInfinity)(        (id,vd,newVd) => math.min(vd,newVd),//这个方法的作用是更新节点VertexId的属性值为新值,以利于innerJoin操作        triplets => {//map函数          if(triplets.srcAttr + triplets.attr < triplets.dstAttr){            Iterator((triplets.dstId,triplets.srcAttr + triplets.attr))          } else {            Iterator.empty          }        },        (a,b) => math.min(a,b)//reduce函数      )   //输出结果,注意pregel返回的是更新VertexId属性的graph,而不是VertexRDD[(VertexId,VD)]      print("最短节点:"+result.vertices.filter(_._1 != 0).reduce(min));//注意过滤掉源节点    }   //找出路径最短的点    def min(a:(VertexId,Double),b:(VertexId,Double)):(VertexId,Double) = {    if(a._2 < b._2) a else b   }  /**    * 自定义收集VertexId的neighborIds    * @author TongXueQiang    */  def collectNeighborIds[T,U](edgeDirection:EdgeDirection,graph:Graph[T,U])(implicit m:scala.reflect.ClassTag[T],n:scala.reflect.ClassTag[U]):VertexRDD[Array[VertexId]] = {    val nbrs = graph.mapReduceTriplets[Array[VertexId]](      //map函数      edgeTriplets => {        val msgTosrc = (edgeTriplets.srcId,Array(edgeTriplets.dstId));        val msgTodst = (edgeTriplets.dstId,Array(edgeTriplets.srcId));        edgeDirection match {          case EdgeDirection.Either =>Iterator(msgTosrc,msgTodst)          case EdgeDirection.Out => Iterator(msgTosrc)          case EdgeDirection.In => Iterator(msgTodst)          case EdgeDirection.Both =>  throw new SparkException("It doesn‘t make sense to collect neighbors without a " + "direction.(EdgeDirection.Both is not supported.use EdgeDirection.Either instead.)")        }      },_ ++ _)//reduce函数    nbrs  }

/**    * 自定义pregel函数    * @param graph 图    * @param initialMsg 返回的vertexId属性    * @param maxInterations 迭代次数    * @param activeDirection 边的方向    * @param vprog 更新节点属性的函数,以利于innerJoin操作    * @param sendMsg map函数,返回Iterator[A],一般A为Tuple2,其中id为接受消息方    * @param mergeMsg reduce函数,一般为合并,或者取最小、最大值……操作    * @tparam A 想要得到的VertexId属性    * @tparam VD graph中vertices的属性    * @tparam ED graph中的edge属性    * @return 返回更新后的graph    */  def pregel[A:ClassTag,VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],initialMsg:A,maxInterations:Int = Int.MaxValue,activeDirection:EdgeDirection =  EdgeDirection.Either)(      vprog:(VertexId,VD,A) => VD,      sendMsg:EdgeTriplet[VD,ED] =>Iterator[(VertexId,A)],      mergeMsg:(A,A) => A)  : Graph[VD,ED] = {    Pregel0(graph,initialMsg,maxInterations,activeDirection)(vprog,sendMsg,mergeMsg)//调用apply方法  } 

 //此为节点内连接函数,返回VertexRDD  def innerJoin[U:ClassTag,VD:ClassTag](table:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,U) => VertexRDD[(VertexId,VD)]) = {    val uf = (id: VertexId, data: VD, o: Option[U]) => {      o match {        case Some(u) => mapFunc(id, data, u)        case None => data      }    }  } //测试Option[T]  def test():Unit = {    val map = Map("a" -> "1","b" -> "2","c" -> "3");    def show(value:Option[String]):String = {      value match{        case Some(x) => x        case None => "no value found!"      }    }    println(show(map.get("a")) == "1");  }}

下面重点研究Pregel,为了方便,自己重新定义了一个Pregel0

package com.txq.spark.test

import org.apache.spark.Loggingimport org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}import scala.reflect.ClassTag

/**  * 自定义Pregel object,处理思路:  */object Pregel0 extends Logging {  def apply[VD:ClassTag,ED:ClassTag,A:ClassTag]  (graph:Graph[VD,ED],   initialMsg:A,   maxIterations:Int = Int.MaxValue,   activeDirection:EdgeDirection = EdgeDirection.Either)  (vprog:(VertexId,VD,A) => VD,   sendMsg:EdgeTriplet[VD,ED] => Iterator[(VertexId,A)],   mergeMsg:(A,A) => A)  : Graph[VD,ED] =  {  //①对vertices进行更新操作    var g = graph.mapVertices((vid,vdata) => vprog(vid,vdata,initialMsg)).cache();    //②compute the messages,注意调用的是mapReduceTriplets方法,源代码:  

def mapReduceTriplets[A](

map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

reduce: (A, A) => A),

activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None )

: VertexRDD[A]


var messages = g.mapReduceTriplets(sendMsg,mergeMsg);    print("messages:"+messages.take(10).mkString("\n"))    var activeMessages = messages.count();    //LOAD    var prevG:Graph[VD,ED] = null    var i = 0;    while(activeMessages > 0 && i < maxIterations){        //③Receive the messages.Vertices that didn‘t get any message do not appear in newVerts.        //内联操作,返回的结果是VertexRDD,可以参看后面的调试信息        val newVerts = g.vertices.innerJoin(messages)(vprog).cache();        print("newVerts:"+newVerts.take(10).mkString("\n"))        //④update the graph with the new vertices.        prevG = g;//先把旧的graph备份,以利于后面的graph更新和unpersist掉旧的graph     //④外联操作,返回整个更新的graph        g = g.outerJoinVertices(newVerts){(vid,old,newOpt) => newOpt.getOrElse(old)}//getOrElse方法,意味,如果newOpt存在,返回newOpt,不存在返回old        print(g.vertices.take(10).mkString("\n"))        g.cache();//新的graph cache起来,下一次迭代使用

val oldMessages = messages;//备份,同prevG = g操作一样      //Send new messages.Vertices that didn‘t get any message do not appear in newVerts.so      //don‘t send messages.We must cache messages.so it can be materialized on the next line.      //allowing us to uncache the previous iteration.    //⑤下一次迭代要发送的新的messages,先cache起来      messages = g.mapReduceTriplets(sendMsg,mergeMsg,Some((newVerts,activeDirection))).cache()      print("下一次迭代要发送的messages:"+messages.take(10).mkString("\n"))      activeMessages = messages.count();//⑥      print("下一次迭代要发送的messages的个数:"+ activeMessages)//如果activeMessages==0,迭代结束      logInfo("Pregel finished iteration" + i);    //原来,旧的message和graph不可用了,unpersist掉      oldMessages.unpersist(blocking= false);      newVerts.unpersist(blocking=false)//unpersist之后,就不可用了      prevG.unpersistVertices(blocking=false)      prevG.edges.unpersist(blocking=false)      i += 1;    }    g//返回最后的graph  }

}输出的调试信息:(距离v0节点最近的节点)第一次跌代:

messages:(11342,1.0)
(824020,1.0)
(867923,1.0)
(891835,1.0)
newVerts:(11342,1.0)
(824020,1.0)
(867923,1.0)
(891835,1.0)
下一次迭代要发送的messages:(302284,2.0)
(760842,2.0)
(417728,2.0)
(322178,2.0)
(387543,2.0)
(846213,2.0)
(857527,2.0)
(856657,2.0)
(695578,2.0)
(27469,2.0)
下一次迭代要发送的messages的个数:29

下一次迭代要发送的messages:(754862,3.0)
(672426,3.0)
(320258,3.0)
(143557,3.0)
(789355,3.0)
(596104,3.0)
(118398,3.0)
(30115,3.0)
下一次迭代要发送的messages的个数:141
依次不断类推,直到activeMessages = 0跌代结束。

上面需要cache的有:graph,messages,newVertis.spark中的创建RDD和transformation操作都是lazy的,存储的只是内存地址,并非真正创建对象,当进行action时,需要从头至尾运行一遍,所以cache之后,重复利用RDD,再次进行action时,速度会大大提升。unpersist之后,就不能后了,所以需要把旧的备份。

一般情况使用mapReduceTriplets可以解决很多问题,为什么Spark GraphX会提供Pregel API?主要是为了更方便地去做迭代操作。因为在GraphX里面,Graph这张图并没有自动cache,而是手动cache。但是为了每次迭代更快,需要手动去做cache,每次迭代完就需要把没用的删除掉而把有用的保留,这比较难以控制。因为Graph中的点和边是分开进行Cache的,而Pregel能够帮助我们。例如,PangeRank就非常适合用Pregel来做。

web-Google.txt.gz文件下载地址:http://snap.stanford.edu/data/web-Google.html

佟氏出品,必属精品!专注spark GraphX、数据挖掘、机器学习的源代码和算法,扎扎实实,写好每一行代码!

时间: 2024-08-02 10:57:34

Spark中GraphX图运算pregel详解的相关文章

Python中 and or 运算顺序详解 --- 短路逻辑

核心思想 表达式从左至右运算,若 or 的左侧逻辑值为 True ,则短路 or 后所有的表达式(不管是 and 还是 or),直接输出 or 左侧表达式 . 表达式从左至右运算,若 and 的左侧逻辑值为 False ,则短路其后所有 and 表达式,直到有 or 出现,输出 and 左侧表达式到 or 的左侧,参与接下来的逻辑运算. 若 or 的左侧为 False ,或者 and 的左侧为 True 则不能使用短路逻辑. 详情参见:http://www.cnblogs.com/an9wer/

图的应用详解-数据结构

图的应用详解-数据结构 概述 最小生成树——无向连通图的所有生成树中有一棵边的权值总和最小的生成树 拓扑排序 ——由偏序定义得到拓扑有序的操作便是拓扑排序.建立模型是AOV网 关键路径——在AOE-网中有些活动可以并行地进行,所以完成工程的最短时间是从开始点到完成点的最长路径的长度,路径长度最长的路径叫做关键路径(Critical Path). 最短路径——最短路径问题是图研究中的一个经典算法问题, 旨在寻找图(由结点和路径组成的)中两结点之间的最短路径. 1.最小生成树 1.1 问题背景:  

Swift使用WKWebView在iOS应用中调用Web的方法详解

这篇文章主要介绍了Swift使用WKWebView在iOS应用中调用Web的方法详解,使用WKWebView便等于使用和Safari中相同的JavaScript解释器,用来替代过去的UIWebView,需要的朋友可以参考下 自从iOS8开始,Apple引入了WKWebView欲代替UIWebView.相比而言,WKWebView消耗内从更少,功能也更加强大.让我们来看看WKWebView怎么使用吧! 0.初始化(1)首先需要引入WebKit库 复制代码代码如下: #import <WebKit/

spark读写压缩文件API使用详解

最近研究了下Spark如何读写压缩格式的文件,主要有如下三种方式,这里以lzo方式压缩为例     /*******************old hadoop api*************************/     val confHadoop = new JobConf     confHadoop.set("mapred.output.compress", "true")     confHadoop.set("mapred.output

批处理中的echo命令图文详解

批处理中的echo命令图文详解 1. Echo 显示当前ECHO的状态:ECHO ON 或者ECHO OFF 2. ECHO ON 将ECHO状态设置为ON,将显示命令行,也就是前面的C:\>类似的标志,如图所示: 3. ECHO OFF 将ECHO状态设置为OFF,将不显示命令行,也就是前面的C:\>类似的标志,其他功能一样,用户也可以输入命令,也可以显示命令结果,如图所示: 4. ECHO 字符串 将输入的字符串显示在CMD屏幕上.如图所示 5. ECHO 字符串 &ECHO 字符

Android 中的 Service 全面总结详解【下】

上一篇文章Android 中的 Service 全面总结详解[下] 介绍了Service的一些知识以及本地Service的使用,如果对Service还不太了解的建议先看下上篇文章:本文主要接着上一篇讲下远程服务的使用: 在说到远程服务的时候,我们需要先了解一些预备的知识: 首先来了解一下AIDL机制: AIDL的作用 由于每个应用程序都运行在自己的进程空间,并且可以从应用程序UI运行另一个服务进程,而且经常会在不同的进程间传递对象.在Android平台,一个进程通常不能访问另一个进程的内存空间,

【转】Android中measure过程、WRAP_CONTENT详解以及xml布局文件解析流程浅析(下)

转载请注明出处:http://blog.csdn.net/qinjuning 上篇文章<<Android中measure过程.WRAP_CONTENT详解以及xml布局文件解析流程浅析(上)>>中,我们 了解了View树的转换过程以及如何设置View的LayoutParams的.本文继续沿着既定轨迹继续未完成的job. 主要知识点如下:                 1.MeasureSpc类说明                 2.measure过程详解(揭秘其细节);   

C#中的Linq to Xml详解

这篇文章主要介绍了C#中的Linq to Xml详解,本文给出转换步骤以及大量实例,讲解了生成xml.查询并修改xml.监听xml事件.处理xml流等内容,需要的朋友可以参考下 操作之前,需要引入 程序集和命名空间 System.Xml.Linq;  一.生成Xml 为了能够在结构有一定的组织,笔者建议大家新建一个控制台项目,并且新建一个CreateXml类(以下部分都属于该类中). 并在其中写入以下属性: 代码如下: public static String Path { get { Stri

oc中字典的实现方法详解

一:字典的基本概念 Foundation中的字典(NSDictionary,NSMutableDictionary)是由键-值对组成的数据集合.正如,我们在字典里查找单词的定义一样. 通过key(键),查找的对应的value(值),key通常是字符串对象,也可以是其他任意类型对象.在一个字典对象中,key的值必须是唯一的. 此外,字典对象的键和值不可以为空(nil),如果需要在字典中加入一个空值,可以加入NSNull对象 二:不可变字典-NSDictionary 1:初始化(以一个元素和多个元素