Spark GraphX图计算【代码实现,源码分析】

一.简介

  参考:https://www.cnblogs.com/yszd/p/10186556.html

二.代码实现  

 1 package big.data.analyse.graphx
 2
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx._
 5 import org.apache.spark.rdd.RDD
 6 import org.apache.spark.sql.SparkSession
 7
 8 class VertexProperty()
 9 case class UserProperty(val name: String) extends VertexProperty
10 case class ProductProperty(val name: String, val price: Double) extends VertexProperty
11
12 /*class Graph[VD, ED]{
13   val vertices: VertexRDD[VD]
14   val edges: EdgeRDD[ED]
15 }*/
16
17 /**
18   * Created by zhen on 2019/10/4.
19   */
20 object GraphXTest {
21   /**
22     * 设置日志级别
23     */
24   Logger.getLogger("org").setLevel(Level.WARN)
25   def main(args: Array[String]) {
26     val spark = SparkSession.builder().appName("GraphXTest").master("local[2]").getOrCreate()
27     val sc = spark.sparkContext
28     /**
29       * 创建vertices的RDD
30       */
31     val users : RDD[(VertexId, (String, String))] = sc.parallelize(
32       Array((3L, ("Spark", "GraphX")), (7L, ("Hadoop", "Java")),
33             (5L, ("HBase", "Mysql")), (2L, ("Hive", "Mysql"))))
34
35     /**
36       * 创建edges的RDD
37       */
38     val relationships: RDD[Edge[String]] = sc.parallelize(
39       Array(Edge(3L, 7L, "Fast"), Edge(5L, 3L, "Relation"),
40       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "colleague")))
41
42     /**
43       * 定义默认用户
44       */
45     val defualtUser = ("Machical", "Missing")
46
47     /**
48       * 构建初始化图
49       */
50     val graph = Graph(users, relationships, defualtUser)
51
52     /**
53       * 使用三元组视图呈现顶点之间关系
54       */
55     val facts : RDD[String] = graph.triplets.map(triplet =>
56       triplet.srcAttr._1 + " is the " + triplet.attr + " with " + triplet.dstAttr._1)
57     facts.collect().foreach(println)
58
59     graph.vertices.foreach(println) //顶点
60     graph.edges.foreach(println) //边
61     graph.ops.degrees.foreach(println) // 各顶点的度
62     graph.triplets.foreach(println) // 顶点,边,关系
63     println(graph.ops.numEdges) // 边的数量
64     println(graph.ops.numVertices) // 顶点的数量
65   }
66 }

三.结果

  1.三元组视图

    

  2.顶点

    

  3.边

    

  4.各顶点的度

    

  5.三元组视图

    

  6.边/顶点数量

    

四.源码分析

 1 class Graph[VD, ED] {
 2    // Information about the Graph
 3   val numEdges: Long
 4   val numVertices:Long
 5   val inDegrees: VertexRDD[Int]
 6   val outDegrees: VertexRDD[Int]
 7   val degrees: VertexRDD[Int]
 8   
 9    // Views of the graph as collections
10   val vertices: VertexRDD[VD]
11   val edges: EdgeRDD[ED]
12   val triplets: RDD[EdgeTriplet[VD,ED]]
13  
14   //Functions for caching graphs
15   def persist(newLevel1:StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]//默认存储级别为MEMORY_ONLY
16   def cache(): Graph[VD, ED]
17   def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
18
19   // Change the partitioning heuristic
20   def partitionBy(partitionStrategy: PartitionStrategy)
21
22   // Transform vertex and edge attributes
23   def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
24   def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
25   def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
26   def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
27   def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2]
28
29   // Modify the graph structure
30   def reverse: Graph[VD, ED]
31   def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
32   def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] // 返回当前图和其它图的公共子图
33   def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
34
35   // Join RDDs with the graph  
36   def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
37   def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]))
38   
39   // Aggregate information about adjacent triplets
40   def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
41   def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
42   def aggregateMessages[Msg: ClassTag](sendMsg: EdgeContext[VD, ED, Msg] => Unit, merageMsg: (Msg, Msg) => Msg, tripletFields: TripletFields: TripletFields = TripletFields.All): VertexRDD[A]
43   
44   //Iterative graph-parallel computation
45   def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDiection)(vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A): Graph[VD, ED]
46   
47   // Basic graph algorithms
48   def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
49   def connectedComponents(): Graph[VertexId, ED]
50   def triangleCount(): Graph[Int, ED]
51   def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
52 }

原文地址:https://www.cnblogs.com/yszd/p/11621791.html

时间: 2024-08-07 06:27:42

Spark GraphX图计算【代码实现,源码分析】的相关文章

Spark Graphx图计算案例实战之aggregateMessages求社交网络中的最大年纪追求者和平均年纪!

Spark Graphx提供了mapReduceTriplets来对图进行聚合计算,但是1.2以后不再推荐使用,源代码如下: @deprecated("use aggregateMessages", "1.2.0") def mapReduceTriplets[A: ClassTag](     mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],     reduceFunc: (A, A) =&g

spark graphx图计算

一.使用graph做好友推荐 import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //求共同好友 object CommendFriend { def main(args: Array[String]): Unit = { //创建入口 val conf: SparkConf

Spark GraphX图计算核心算子实战【AggreagteMessage】

一.简介 参考博客:https://www.cnblogs.com/yszd/p/10186556.html 二.代码实现 1 package graphx 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.graphx.util.GraphGenerators 5 import org.apache.spark.sql.SparkSession 6 7 /** 8 * Created by Adminis

spark内核揭秘-11-Driver中AppClient源码分析

首先从SparkContext中TaskScheduler实例的创建开始: 进入taskScheduler.start()方法内部: 进入其实现者TaskSchedulerImpl内部: 可以发现在start具体实现的内部首先是有个backend.start方法: 其最终具体的实现类为: 从代码中可以看出,我们把CoarseGrainedExecutorBackend封装成command,然后交给appDesc,接着交给了Appclient,此时的AppClient就是客户端程序! AppCli

小记--------spark的worker原理分析及源码分析

Worker类源码位置: org.apache.spark.deploy.worker /** *启动driver的源码分析 */ case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") //创建DriverRunner线程 val driver = new DriverRunner( conf, driverId, workDir, sparkHome, dr

invalidate和requestLayout方法源码分析

invalidate方法源码分析 在之前分析View的绘制流程中,最后都有调用一个叫invalidate的方法,这个方法是啥玩意?我们来看一下View类中invalidate系列方法的源码(ViewGroup没有重写这些方法),如下: /**  * Mark the area defined by dirty as needing to be drawn. dirty代表需要重新绘制的脏的区域  * If the view is visible, onDraw(Canvas) will be c

Volley简单学习使用三——源码分析一(修改)

一.Volley框架图 根据图简单猜测Volley工作的流程,见右下角的注释,蓝色表示主线程(main thread),绿色表示缓存线程(cache thread),黄色表示网络线程(network threads): 再寻找图中的关键字:queue(RequestQueue),cache queue,CacheDispatcher,NetworkDispatcher; 流程可简单那描述为:RequestQueue的add()操作将Request添加到缓存队列cache queue中.Cache

4. Netty源码分析之Unsafe

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的. 一.Unsafe继承关系图 二.AbstractUnsafe源码分析 1. register方法 register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法. public final v

jQuery源码分析-each函数

本文部分截取自且行且思 jQuery.each方法用于遍历一个数组或对象,并对当前遍历的元素进行处理,在jQuery使用的频率非常大,下面就这个函数做了详细讲解: 复制代码代码 /*! * jQuery源码分析-each函数 * jQuery版本:1.4.2 * * ---------------------------------------------------------- * 函数介绍 * * each函数通过jQuery.extend函数附加到jQuery对象中: * jQuery.