大数据技术之_19_Spark学习_05_Spark GraphX 应用解析小结

========== Spark GraphX 概述 ==========
1、Spark GraphX是什么?
  (1)Spark GraphX 是 Spark 的一个模块,主要用于进行以图为核心的计算还有分布式图的计算。
  (2)GraphX 他的底层计算也是 RDD 计算,它和 RDD 共用一种存储形态,在展示形态上可以以数据集来表示,也可以图的形式来表示。

2、Spark GraphX 有哪些抽象?
(1)顶点。
  顶点的表示用 RDD[(VertexId, VD)] 来表示,(VertexId, VD) 这个元组用来具体表示一个顶点,VertexID 表示顶点的 ID,是 Long 类型的别名,VD 是顶点的属性,是一个类型参数,可以是任何类型。
(2)边。
  边的表示用 RDD[Edge[ED]] 来表示,Edge 用来具体表示一个边,Edge 里面包含一个 ED 类型参数来设定的属性,ED 类型中包括 一个源顶点的 ID 和一个目标顶点的 ID。
(3)三元组。
  三元组结构用 RDD[EdgeTriplet[VD, ED]] 来表示,EdgeTriplet[VD, ED] 来表示一个三元组,三元组包含了一个边、边的属性、源顶点 ID、源顶点属性、目标顶点 ID、目标顶点属性。VD 和 ED 是类型参数,VD 表示顶点的属性,ED 表示边的属性。
(4)图。
  图在 Spark 中用 Graph[VD, ED] 来表示,可以通过顶点和边来构建。

========== Spark GraphX 图的构建 ==========
1、对于 Vertex 顶点的构建:
(1)对于 RDD[(VertexId, VD)] 这种版本:

val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

(2)对于 VertexRDD[VD] 这种版本:是顶点的构建的优化版本。说白了,就是对上面版本的包装,包装中进行了一些优化!

val users1: VertexRDD[(String, String)] = VertexRDD[(String, String)](users)

2、对于 Edge 边的构建:
(1)对于 RDD[Edge[ED]] 这种版本:

val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))

(2)对于 EdgeRDD[ED] 这种版本:是边的构建的优化版本。说白了,就是对上面版本的包装,包装中进行了一些优化!

val relationships1: EdgeRDD[String] = EdgeRDD.fromEdges(relationships)

3、对于 Graph 图的构建:
Graph[VD: ClassTag, ED: ClassTag]
(1)通过 Graph 类的 apply 方法来构建。

val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))val defaultUser = ("John Doe", "Missing")val graph = Graph(users, relationships) 

def apply[VD: ClassTag, ED: ClassTag](    vertices: RDD[(VertexId, VD)],    edges: RDD[Edge[ED]],    defaultVertexAttr: VD = null.asInstanceOf[VD],    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

测试代码:

scala> val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))users: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, String))] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> val defaultUser = ("John Doe", "Missing")defaultUser: (String, String) = (John Doe,Missing)

scala> val graph = Graph(users, relationships)graph: org.apache.spark.graphx.Graph[(String, String),String] = [email protected]4285b1bd

scala> graph.vertices.collect.foreach(println _)(5,(franklin,prof))                                                             (2,(istoica,prof))(3,(rxin,student))(7,(jgonzal,postdoc))

scala> graph.edges.collect.foreach(println _)Edge(3,7,collab)Edge(5,3,advisor)Edge(2,5,colleague)Edge(5,7,pi)

(2)通过 Graph 类提供 fromEdges 方法来构建。注意:对于顶点的属性是使用提供的默认属性。

val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))val defaultUser = ("aaa", "bbb")val graph2 = Graph.fromEdges(relationships, defaultUser)

def fromEdges[VD: ClassTag, ED: ClassTag](    edges: RDD[Edge[ED]],    defaultValue: VD,    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

测试代码:

scala> val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[15] at parallelize at <console>:26

scala> val defaultUser = ("aaa", "bbb")defaultUser: (String, String) = (aaa,bbb)

scala> val graph2 = Graph.fromEdges(relationships, defaultUser)graph2: org.apache.spark.graphx.Graph[(String, String),String] = [email protected]52fb37d7

scala> graph2.vertices.collect.foreach(println _)(5,(aaa,bbb))(2,(aaa,bbb))(3,(aaa,bbb))(7,(aaa,bbb))

scala> graph2.edges.collect.foreach(println _)Edge(3,7,collab)Edge(5,3,advisor)Edge(2,5,colleague)Edge(5,7,pi)

(3)通过 Graph 类提供的 fromEdgeTuples 方法来构建。注意:对于顶点的属性是使用提供的默认属性,对于边的属性是相同边的数量。

val relationships: RDD[(VertexId, VertexId)] = sc.parallelize(Array((3L, 7L), (5L, 3L), (2L, 5L), (5L, 7L)))val defaultUser = ("haha", "heihei")val graph3 = Graph.fromEdgeTuples[(String, String)](relationships, defaultUser)

def fromEdgeTuples[VD: ClassTag](    rawEdges: RDD[(VertexId, VertexId)],    defaultValue: VD,    uniqueEdges: Option[PartitionStrategy] = None,    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

测试代码:

scala> val relationships: RDD[(VertexId, VertexId)] = sc.parallelize(Array((3L, 7L), (5L, 3L), (2L, 5L), (5L, 7L)))relationships: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = ParallelCollectionRDD[26] at parallelize at <console>:26

scala> val defaultUser = ("haha", "heihei")defaultUser: (String, String) = (haha,heihei)

scala> val graph3 = Graph.fromEdgeTuples[(String, String)](relationships, defaultUser)graph3: org.apache.spark.graphx.Graph[(String, String),Int] = [email protected]5cb7311b

scala> graph3.vertices.collect.foreach(println _)(5,(haha,heihei))(2,(haha,heihei))(3,(haha,heihei))(7,(haha,heihei))

scala> graph3.edges.collect.foreach(println _)Edge(3,7,1)     第三个元素“1”表示的是相同边的数量Edge(5,3,1)Edge(2,5,1)Edge(5,7,1)

========== Spark GraphX 图的基本信息转换 ==========
1、graph.numEdges 返回当前图的边的数量
2、graph.numVertices 返回当前图的顶点的数量
3、graph.inDegrees 返回当前图每个顶点入度的数量,返回类型为 VertexRDD[Int]
4、graph.outDegrees 返回当前图每个顶点出度的数量,返回的类型为 VertexRDD[Int]
5、graph.degrees 返回当前图每个顶点入度和出度的和,返回的类型为 VertexRDD[Int]

========== Spark GraphX 图的转换操作 ==========
1、def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2) (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
  对当前图每一个顶点应用提供的 map 函数来修改顶点的属性,返回一个新的图。
2、def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
  对当前图每一条边应用提供的 map 函数来修改边的属性,返回一个新图。
3、def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  对当前图每一个三元组应用提供的 map 函数来修改边的属性,返回一个新图。

========== Spark GraphX 图的结构操作 ==========
1、def reverse: Graph[VD, ED]
  该操作反转一个图,产生一个新图,新图中的每条边的方向和原图每条边的方向相反。
2、def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true), vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED]
  该操作返回一个当前图的子图,通过传入 epred 函数来过滤边,通过传入 vpred 函数来过滤顶点,返回满足 epred 函数值为 true 的边和满足 vpred 函数值为 true 顶点组成子图。
3、def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
  mask 函数用于求一张图和 other 这张图的交集,该交集的判别条件指的是:1、对于顶点,只对比顶点的 ID。2、对于边,只对比边的 srcID、dstID,如果 other 和当前图的交集中的边、顶点的属性不一致,那么 mask 产生的图默认采用当前图的属性。
4、def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  该操作实现将当前图中的两条相同边(边的 srcID 和 dstID 相同)合并。你需要传入一个 merge 函数,用于合并这两边的属性返回一个新的属性。注意:合并两条边的前提是,两条边在一个分区。

========== Spark GraphX 顶点关联操作 ==========
1、def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  该操作通过 mapFunc 函数将 table 中提供的数据更新到相同 VertexId 的属性里。
2、def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
  该操作和 joinVertices 提供了相同的功能,但是,如果 table 中不存在相对应的顶点(也就是不存 VertexId),这个时候 U 默认是 None。

========== Spark GraphX 聚合操作 ==========
1、def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  该操作返回 EdgeDirection 定义的方向中相邻顶点的 ID 和属性的集合。
2、def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  改操作返回 EdgeDirection 定义的方向中相邻顶点的 ID 的集合。
3、def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit,mergeMsg: (A, A) => A,tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
  该函数用于聚合发送到顶点的信息,A 是发送的信息的类型,sendMsg 是每一条边都会自动触发,到底有没有消息能够发送到顶点,使用 EdgeContext 里面的 sendToSrc和sendToDst 来实现。mergeMsg
是每一个顶点都会在接受到所有消息之后调用,主要用于所有接收到的消息的聚合。然后整个函数返回消息的顶点集合 VertexRDD[A]。

原文地址:https://www.cnblogs.com/chenmingjun/p/10797815.html

时间: 2024-10-29 19:10:49

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析小结的相关文章

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述、解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank 实例

第1章 Spark GraphX 概述1.1 什么是 Spark GraphX1.2 弹性分布式属性图1.3 运行图计算程序第2章 Spark GraphX 解析2.1 存储模式2.1.1 图存储模式2.1.2 GraphX 存储模式2.2 vertices.edges 以及 triplets2.2.1 vertices2.2.2 edges2.2.3 triplets2.3 图的构建2.3.1 构建图的方法2.3.2 构建图的过程2.4 计算模式2.4.1 BSP 计算模式2.4.2 图操作一

大数据技术之_19_Spark学习_02_Spark Core 应用解析小结

1.RDD 全称 弹性分布式数据集 Resilient Distributed Dataset它就是一个 class. abstract class RDD[T: ClassTag](    @transient private var _sc: SparkContext,    @transient private var deps: Seq[Dependency[_]]  ) extends Serializable with Logging { 继承了 Serializable 和具有 L

大数据技术之_19_Spark学习_06_Spark 源码解析小结

========== Spark 通信架构 ========== 1.spark 一开始使用 akka 作为网络通信框架,spark 2.X 版本以后完全抛弃 akka,而使用 netty 作为新的网络通信框架.最主要原因:spark 对 akka 没有维护,需要 akka 更新,spark 的发展受到了 akka 的牵制,akka 版本之间无法通信,即 akka 兼容性问题.2.RpcEnv:RPC 上下文环境,每个 Rpc 端点运行时依赖的上下文环境称之为 RpcEnv.类似于 SparkC

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Acto

大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作

一 概述1.1 什么是搜索?1.2 如果用数据库做搜索会怎么样?1.3 什么是全文检索和 Lucene?1.4 什么是 Elasticsearch?1.5 Elasticsearch 的适用场景1.6 Elasticsearch 的特点1.7 Elasticsearch 的核心概念1.7.1 近实时1.7.2 Cluster(集群)1.7.3 Node(节点)1.7.4 Index(索引 --> 数据库)1.7.5 Type(类型 --> 表)1.7.6 Document(文档 -->

大数据技术之_03_Hadoop学习_02_入门_Hadoop运行模式+【本地运行模式+伪分布式运行模式+完全分布式运行模式(开发重点)】+Hadoop编译源码(面试重点)+常见错误及解决方案

第4章 Hadoop运行模式4.1 本地运行模式4.1.1 官方Grep案例4.1.2 官方WordCount案例4.2 伪分布式运行模式4.2.1 启动HDFS并运行MapReduce程序4.2.2 启动YARN并运行MapReduce程序4.2.3 配置历史服务器4.2.4 配置日志的聚集4.2.5 配置文件说明4.3 完全分布式运行模式(开发重点)4.3.1 虚拟机准备4.3.2 编写集群分发脚本xsync4.3.3 集群配置4.3.4 集群单点启动4.3.5 SSH无密登录配置4.3.6

大数据技术之_08_Hive学习_01_Hive入门+Hive安装、配置和使用+Hive数据类型

第1章 Hive入门1.1 什么是Hive1.2 Hive的优缺点1.2.1 优点1.2.2 缺点1.3 Hive架构原理1.4 Hive和数据库比较1.4.1 查询语言1.4.2 数据存储位置1.4.3 数据更新1.4.4 索引1.4.5 执行1.4.6 执行延迟1.4.7 可扩展性1.4.8 数据规模第2章 Hive安装.配置和使用2.1 Hive安装地址2.2 Hive安装部署2.3 将本地文件导入Hive案例2.4 MySql安装2.4.1 安装包准备2.4.2 安装MySql服务器2.

大数据技术之_16_Scala学习_08_数据结构(下)-集合操作+模式匹配

第十一章 数据结构(下)-集合操作11.1 集合元素的映射-map11.1.1 map 映射函数的操作11.1.2 高阶函数基本使用案例1+案例211.1.3 使用 map 映射函数来解决11.1.4 模拟实现 map 映射函数的机制11.1.5 课堂练习11.2 集合元素的扁平-flatMap11.3 集合元素的过滤-filter11.4 集合元素的化简-reduce11.5 集合元素的折叠-fold11.6 集合元素的扫描-scan11.7 集合的综合应用案例11.8 集合的合并-zip11