Spark Graphx编程指南

问题导读

1.GraphX提供了几种方式从RDD或者磁盘上的顶点和边集合构造图?
2.PageRank算法在图中发挥什么作用?
3.三角形计数算法的作用是什么?

Spark中文手册-编程指南
Spark之一个快速的例子Spark之基本概念
Spark之基本概念
Spark之基本概念(2)
Spark之基本概念(3)
Spark-sql由入门到精通
Spark-sql由入门到精通续
spark GraphX编程指南(1)

Pregel API

图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。 一系列的graph-parallel抽象已经被提出来用来表达这些迭代算法。GraphX公开了一个类似Pregel的操作,它是广泛使用的Pregel和GraphLab抽象的一个融合。

在GraphX中,更高级的Pregel操作是一个约束到图拓扑的批量同步(bulk-synchronous)并行消息抽象。Pregel操作者执行一系列的超级步骤(super steps),在这些步骤中,顶点从 之前的超级步骤中接收进入(inbound)消息的总和,为顶点属性计算一个新的值,然后在以后的超级步骤中发送消息到邻居顶点。不像Pregel而更像GraphLab,消息作为一个边三元组的函数被并行 计算,消息计算既访问了源顶点特征也访问了目的顶点特征。在超级步中,没有收到消息的顶点被跳过。当没有消息遗留时,Pregel操作停止迭代并返回最终的图。

注意,与更标准的Pregel实现不同的是,GraphX中的顶点仅仅能发送信息给邻居顶点,并利用用户自定义的消息函数构造消息。这些限制允许在GraphX进行额外的优化。

一下是[url=https://spark.apache.org/docs/latest/api/scala/index.html#[email protected][A](A,Int,EdgeDirection]Pregel操作[/url]((VertexId,VD,A)?VD,(EdgeTriplet[VD,ED])?Iterator[(VertexId,A)],(A,A)?A)(ClassTag[A]):Graph[VD,ED])的类型签名以及实现草图(注意,访问graph.cache已经被删除)

  1. class GraphOps[VD, ED] {
  2. def pregel[A]
  3. (initialMsg: A,
  4. maxIter: Int = Int.MaxValue,
  5. activeDir: EdgeDirection = EdgeDirection.Out)
  6. (vprog: (VertexId, VD, A) => VD,
  7. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  8. mergeMsg: (A, A) => A)
  9. : Graph[VD, ED] = {
  10. // Receive the initial message at each vertex
  11. var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
  12. // compute the messages
  13. var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  14. var activeMessages = messages.count()
  15. // Loop until no messages remain or maxIterations is achieved
  16. var i = 0
  17. while (activeMessages > 0 && i < maxIterations) {
  18. // Receive the messages: -----------------------------------------------------------------------
  19. // Run the vertex program on all vertices that receive messages
  20. val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
  21. // Merge the new vertex values back into the graph
  22. g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
  23. // Send Messages: ------------------------------------------------------------------------------
  24. // Vertices that didn‘t receive a message above don‘t appear in newVerts and therefore don‘t
  25. // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
  26. // on edges in the activeDir of vertices in newVerts
  27. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
  28. activeMessages = messages.count()
  29. i += 1
  30. }
  31. g
  32. }
  33. }

复制代码

注意,pregel有两个参数列表(graph.pregel(list1)(list2))。第一个参数列表包含配置参数初始消息、最大迭代数、发送消息的边的方向(默认是沿边方向出)。第二个参数列表包含用户 自定义的函数用来接收消息(vprog)、计算消息(sendMsg)、合并消息(mergeMsg)。

我们可以用Pregel操作表达计算单源最短路径( single source shortest path)。

  1. import org.apache.spark.graphx._
  2. // Import random graph generation library
  3. import org.apache.spark.graphx.util.GraphGenerators
  4. // A graph with edge attributes containing distances
  5. val graph: Graph[Int, Double] =
  6. GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
  7. val sourceId: VertexId = 42 // The ultimate source
  8. // Initialize the graph such that all vertices except the root have distance infinity.
  9. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
  10. val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  11. (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  12. triplet => {  // Send Message
  13. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
  14. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  15. } else {
  16. Iterator.empty
  17. }
  18. },
  19. (a,b) => math.min(a,b) // Merge Message
  20. )
  21. println(sssp.vertices.collect.mkString("\n"))

复制代码

图构造者

GraphX提供了几种方式从RDD或者磁盘上的顶点和边集合构造图。默认情况下,没有哪个图构造者为图的边重新分区,而是把边保留在默认的分区中(例如HDFS中它们的原始块)。Graph.groupEdges?ED):Graph[VD,ED]) 需要重新分区图,因为它假定相同的边将会被分配到同一个分区,所以你必须在调用groupEdges之前调用Graph.partitionBy:Graph[VD,ED])

  1. object GraphLoader {
  2. def edgeListFile(
  3. sc: SparkContext,
  4. path: String,
  5. canonicalOrientation: Boolean = false,
  6. minEdgePartitions: Int = 1)
  7. : Graph[Int, Int]
  8. }

复制代码

GraphLoader.edgeListFile:Graph[Int,Int]) 提供了一个方式从磁盘上的边列表中加载一个图。它解析如下形式(源顶点ID,目标顶点ID)的连接表,跳过以#开头的注释行。

  1. # This is a comment
  2. 2 1
  3. 4 1
  4. 1 2

复制代码

它从指定的边创建一个图,自动地创建边提及的所有顶点。所有的顶点和边的属性默认都是1。canonicalOrientation参数允许重定向正方向(srcId < dstId)的边。这在connected components 算法中需要用到。minEdgePartitions参数指定生成的边分区的最少数量。边分区可能比指定的分区更多,例如,一个HDFS文件包含更多的块。

  1. object Graph {
  2. def apply[VD, ED](
  3. vertices: RDD[(VertexId, VD)],
  4. edges: RDD[Edge[ED]],
  5. defaultVertexAttr: VD = null)
  6. : Graph[VD, ED]
  7. def fromEdges[VD, ED](
  8. edges: RDD[Edge[ED]],
  9. defaultValue: VD): Graph[VD, ED]
  10. def fromEdgeTuples[VD](
  11. rawEdges: RDD[(VertexId, VertexId)],
  12. defaultValue: VD,
  13. uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
  14. }

复制代码

[url=https://spark.apache.org/docs/latest/api/scala/index.html#[email protected][VD,ED](RDD[(VertexId,VD]Graph.apply[/url]],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]) 允许从顶点和边的RDD上创建一个图。重复的顶点可以任意的选择其中一个,在边RDD中而不是在顶点RDD中发现的顶点分配默认的属性。

[url=https://spark.apache.org/docs/latest/api/scala/index.html#[email protected][VD,ED](RDD[Edge[ED]],VD]Graph.fromEdges[/url](ClassTag[VD],ClassTag[ED]):Graph[VD,ED]) 允许仅仅从一个边RDD上创建一个图,它自动地创建边提及的顶点,并分配这些顶点默认的值。

[url=https://spark.apache.org/docs/latest/api/scala/index.html#[email protected][VD](RDD[(VertexId,VertexId]Graph.fromEdgeTuples[/url]],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]) 允许仅仅从一个边元组组成的RDD上创建一个图。分配给边的值为1。它自动地创建边提及的顶点,并分配这些顶点默认的值。它还支持删除边。为了删除边,需要传递一个PartitionStrategy 为值的Some作为uniqueEdges参数(如uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。分配相同的边到同一个分区从而使它们可以被删除,一个分区策略是必须的。

顶点和边RDDs

GraphX暴露保存在图中的顶点和边的RDD。然而,因为GraphX包含的顶点和边拥有优化的数据结构,这些数据结构提供了额外的功能。顶点和边分别返回VertexRDD和EdgeRDD。这一章 我们将学习它们的一些有用的功能。

VertexRDDs

VertexRDD[A]继承自RDD[(VertexID, A)]并且添加了额外的限制,那就是每个VertexID只能出现一次。此外,VertexRDD[A]代表了一组属性类型为A的顶点。在内部,这通过 保存顶点属性到一个可重复使用的hash-map数据结构来获得。所以,如果两个VertexRDDs从相同的基本VertexRDD获得(如通过filter或者mapValues),它们能够在固定的时间内连接 而不需要hash评价。为了利用这个索引数据结构,VertexRDD暴露了一下附加的功能:

  1. class VertexRDD[VD] extends RDD[(VertexID, VD)] {
  2. // Filter the vertex set but preserves the internal index
  3. def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  4. // Transform the values without changing the ids (preserves the internal index)
  5. def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  6. def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  7. // Remove vertices from this set that appear in the other set
  8. def diff(other: VertexRDD[VD]): VertexRDD[VD]
  9. // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  10. def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  11. def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  12. // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  13. def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
  14. }

复制代码

举个例子,filter操作如何返回一个VertexRDD。过滤器实际使用一个BitSet实现,因此它能够重用索引以及保留和其它VertexRDDs做连接时速度快的能力。同样的,mapValues操作 不允许map函数改变VertexID,因此可以保证相同的HashMap数据结构能够重用。当连接两个从相同的hashmap获取的VertexRDDs和使用线性扫描而不是昂贵的点查找实现连接操作时,leftJoin 和innerJoin都能够使用。

从一个RDD[(VertexID, A)]高效地构建一个新的VertexRDD,aggregateUsingIndex操作是有用的。概念上,如果我通过一组顶点构造了一个VertexRDD[B],而VertexRDD[B]是 一些RDD[(VertexID, A)]中顶点的超集,那么我们就可以在聚合以及随后索引RDD[(VertexID, A)]中重用索引。例如:

  1. val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
  2. val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
  3. // There should be 200 entries in rddB
  4. rddB.count
  5. val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
  6. // There should be 100 entries in setB
  7. setB.count
  8. // Joining A and B should now be fast!
  9. val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

复制代码

EdgeRDDs

EdgeRDD[ED]继承自RDD[Edge[ED]],使用定义在PartitionStrategy的 各种分区策略中的一个在块分区中组织边。在每个分区中,边属性和相邻结构被分别保存,当属性值改变时,它们可以最大化的重用。

EdgeRDD暴露了三个额外的函数

  1. // Transform the edge attributes while preserving the structure
  2. def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
  3. // Revere the edges reusing both attributes and structure
  4. def reverse: EdgeRDD[ED]
  5. // Join two `EdgeRDD`s partitioned using the same partitioning strategy.
  6. def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

复制代码

在大多数的应用中,我们发现,EdgeRDD操作可以通过图操作者(graph operators)或者定义在基本RDD中的操作来完成。

图算法

GraphX包括一组图算法来简化分析任务。这些算法包含在org.apache.spark.graphx.lib包中,可以被直接访问。

PageRank算法

PageRank度量一个图中每个顶点的重要程度,假定从u到v的一条边代表v的重要性标签。例如,一个Twitter用户被许多其它人粉,该用户排名很高。GraphX带有静态和动态PageRank的实现方法 ,这些方法在PageRank object中。静态的PageRank运行固定次数 的迭代,而动态的PageRank一直运行,直到收敛。GraphOps允许直接调用这些算法作为图上的方法。

GraphX包含一个我们可以运行PageRank的社交网络数据集的例子。用户集在graphx/data/users.txt中,用户之间的关系在graphx/data/followers.txt中。我们通过下面的方法计算 每个用户的PageRank。

  1. // Load the edges as a graph
  2. val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
  3. // Run PageRank
  4. val ranks = graph.pageRank(0.0001).vertices
  5. // Join the ranks with the usernames
  6. val users = sc.textFile("graphx/data/users.txt").map { line =>
  7. val fields = line.split(",")
  8. (fields(0).toLong, fields(1))
  9. }
  10. val ranksByUsername = users.join(ranks).map {
  11. case (id, (username, rank)) => (username, rank)
  12. }
  13. // Print the result
  14. println(ranksByUsername.collect().mkString("\n"))

复制代码

连通体算法

连通体算法用id标注图中每个连通体,将连通体中序号最小的顶点的id作为连通体的id。例如,在社交网络中,连通体可以近似为集群。GraphX在ConnectedComponents object 中包含了一个算法的实现,我们通过下面的方法计算社交网络数据集中的连通体。

  1. / Load the graph as in the PageRank example
  2. val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
  3. // Find the connected components
  4. val cc = graph.connectedComponents().vertices
  5. // Join the connected components with the usernames
  6. val users = sc.textFile("graphx/data/users.txt").map { line =>
  7. val fields = line.split(",")
  8. (fields(0).toLong, fields(1))
  9. }
  10. val ccByUsername = users.join(cc).map {
  11. case (id, (username, cc)) => (username, cc)
  12. }
  13. // Print the result
  14. println(ccByUsername.collect().mkString("\n"))

复制代码

三角形计数算法

一个顶点有两个相邻的顶点以及相邻顶点之间的边时,这个顶点是一个三角形的一部分。GraphX在TriangleCount object 中实现了一个三角形计数算法,它计算通过每个顶点的三角形的数量。需要注意的是,在计算社交网络数据集的三角形计数时,TriangleCount需要边的方向是规范的方向(srcId < dstId), 并且图通过Graph.partitionBy分片过。

  1. // Load the edges in canonical order and partition the graph for triangle count
  2. val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
  3. // Find the triangle count for each vertex
  4. val triCounts = graph.triangleCount().vertices
  5. // Join the triangle counts with the usernames
  6. val users = sc.textFile("graphx/data/users.txt").map { line =>
  7. val fields = line.split(",")
  8. (fields(0).toLong, fields(1))
  9. }
  10. val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  11. (username, tc)
  12. }
  13. // Print the result
  14. println(triCountByUsername.collect().mkString("\n"))

复制代码

例子

假定我们想从一些文本文件中构建一个图,限制这个图包含重要的关系和用户,并且在子图上运行page-rank,最后返回与top用户相关的属性。可以通过如下方式实现.

  1. // Connect to the Spark cluster
  2. val sc = new SparkContext("spark://master.amplab.org", "research")
  3. // Load my user data and parse into tuples of user id and attribute list
  4. val users = (sc.textFile("graphx/data/users.txt")
  5. .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
  6. // Parse the edge data which is already in userId -> userId format
  7. val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
  8. // Attach the user attributes
  9. val graph = followerGraph.outerJoinVertices(users) {
  10. case (uid, deg, Some(attrList)) => attrList
  11. // Some users may not have attributes so we set them as empty
  12. case (uid, deg, None) => Array.empty[String]
  13. }
  14. // Restrict the graph to users with usernames and names
  15. val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
  16. // Compute the PageRank
  17. val pagerankGraph = subgraph.pageRank(0.001)
  18. // Get the attributes of the top pagerank users
  19. val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  20. case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  21. case (uid, attrList, None) => (0.0, attrList.toList)
  22. }
  23. println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

复制代码

相关内容:

Spark中文手册1-编程指南
http://www.aboutyun.com/thread-11413-1-1.html

Spark中文手册2:Spark之一个快速的例子
http://www.aboutyun.com/thread-11484-1-1.html

Spark中文手册3:Spark之基本概念
http://www.aboutyun.com/thread-11502-1-1.html

Spark中文手册4:Spark之基本概念(2)
http://www.aboutyun.com/thread-11516-1-1.html

Spark中文手册5:Spark之基本概念(3)
http://www.aboutyun.com/thread-11535-1-1.html

Spark中文手册6:Spark-sql由入门到精通
http://www.aboutyun.com/thread-11562-1-1.html

Spark中文手册7:Spark-sql由入门到精通【续】
http://www.aboutyun.com/thread-11575-1-1.html

Spark中文手册8:spark GraphX编程指南(1)
http://www.aboutyun.com/thread-11589-1-1.html

Spark中文手册10:spark部署:提交应用程序及独立部署模式
http://www.aboutyun.com/thread-11615-1-1.html

Spark中文手册11:Spark 配置指南
http://www.aboutyun.com/thread-10652-1-1.html

时间: 2024-10-14 21:08:59

Spark Graphx编程指南的相关文章

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Spark Streaming编程指南

本文基于Spark Streaming Programming Guide原文翻译, 加上一些自己的理解和小实验的结果. 一.概述 Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具有容错能力的用于处理实时数据流的一个组件.Spark Streaming可以接收各种数据源传递来的数据,比如Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可以使用一些用高阶函数(比如map, reduce, join及window)进行封装的复杂算法做进

Spark SQL编程指南(Python)

前言 Spark SQL允许我们在Spark环境中使用SQL或者Hive SQL执行关系型查询.它的核心是一个特殊类型的Spark RDD:SchemaRDD. SchemaRDD类似于传统关系型数据库的一张表,由两部分组成: Rows:数据行对象 Schema:数据行模式:列名.列数据类型.列可否为空等 Schema可以通过四种方式被创建: (1)Existing RDD (2)Parquet File (3)JSON Dataset (4)By running Hive SQL 考虑到Par

Spark SQL编程指南(Python)【转】

转自:http://www.cnblogs.com/yurunmiao/p/4685310.html 前言 Spark SQL允许我们在Spark环境中使用SQL或者Hive SQL执行关系型查询.它的核心是一个特殊类型的Spark RDD:SchemaRDD. SchemaRDD类似于传统关系型数据库的一张表,由两部分组成: Rows:数据行对象 Schema:数据行模式:列名.列数据类型.列可否为空等 Schema可以通过四种方式被创建: (1)Existing RDD (2)Parquet

Spark(1.6.1) Sql 编程指南+实战案例分析

首先看看从官网学习后总结的一个思维导图 概述(Overview) Spark SQL是Spark的一个模块,用于结构化数据处理.它提供了一个编程的抽象被称为DataFrames,也可以作为分布式SQL查询引擎. 开始Spark SQL Spark SQL中所有功能的入口点是SQLContext类,或者它子类中的一个.为了创建一个基本的SQLContext,你所需要的是一个SparkContext. 除了基本的SQLContext,你还可以创建一个HiveContext,它提供了基本的SQLCon

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

明风:分布式图计算的平台Spark GraphX 在淘宝的实践

快刀初试:Spark GraphX在淘宝的实践 作者:明风 (本文由团队中梧苇和我一起撰写,并由团队中的林岳,岩岫,世仪等多人Review,发表于程序员的8月刊,由于篇幅原因,略作删减,本文为完整版) 对于网络科学而言,世间万物都可以抽象成点,而事物之间的关系都可以抽象成边,并根据不同的应用场景,生成不同的网络,因此整个世界都可以用一个巨大的复杂网络来代表.有关复杂网络和图算法的研究,在最近的十几年取得了巨大的进展,并在多个领域有重要的应用. 作为最大的电商平台,淘宝上数亿买家和卖家,每天产生数

Spark GraphX学习笔记

概述 GraphX是 Spark中用于图(如Web-Graphs and Social Networks)和图并行计算(如 PageRank and Collaborative Filtering)的API,可以认为是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重写及优化,跟其他分布式 图计算框架相比,GraphX最大的贡献是,在Spark之上提供一站式数据解决方案,可以方便且高效地完成图计算的一整套流水作业. Graphx是Spark生态中的非常重要的组件,

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机.平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题.图计算正在被广泛地应用于社交网络.电子商务,地图等领域.对于图计算的两个核心问题:图存储模式和图计算模型,Spark GraphX给出了近乎完美的答案, 而Spark GraphX作为图计算领域的屠龙宝刀,对Pregel  API的支持更是让Spark GraphX如虎添翼.Spark GraphX可以轻而易举的完成基于度分布