Spark之GraphX的Graph_scala学习

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.graphx

import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark.graphx.impl._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
 * The Graph abstractly represents a graph with arbitrary objects
 * associated with vertices and edges.  The graph provides basic
 * operations to access and manipulate the data associated with
 * vertices and edges as well as the underlying structure.  Like Spark
 * RDDs, the graph is a functional data-structure in which mutating
 * operations return new graphs.
 *
 * @note [[GraphOps]] contains additional convenience operations and graph algorithms.
 *
 * @tparam VD the vertex attribute type
 * @tparam ED the edge attribute type
 */
abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializable {

  /**
   * An RDD containing the vertices and their associated attributes.
   *
   * @note vertex ids are unique.
   * @return an RDD containing the vertices in this graph
   */
  @transient val vertices: VertexRDD[VD]

  /**
   * An RDD containing the edges and their associated attributes.  The entries in the RDD contain
   * just the source id and target id along with the edge data.
   *
   * @return an RDD containing the edges in this graph
   *
   * @see [[Edge]] for the edge type.
   * @see [[triplets]] to get an RDD which contains all the edges
   * along with their vertex data.
   *
   */
  @transient val edges: EdgeRDD[ED, VD]

  /**
   * An RDD containing the edge triplets, which are edges along with the vertex data associated with
   * the adjacent vertices. The caller should use [[edges]] if the vertex data are not needed, i.e.
   * if only the edge data and adjacent vertex ids are needed.
   *
   * @return an RDD containing edge triplets
   *
   * @example This operation might be used to evaluate a graph
   * coloring where we would like to check that both vertices are a
   * different color.
   * {{{
   * type Color = Int
   * val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv")
   * val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
   * }}}
   */
  @transient val triplets: RDD[EdgeTriplet[VD, ED]]

  /**
   * Caches the vertices and edges associated with this graph at the specified storage level,
   * ignoring any target storage levels previously set.
   *
   * @param newLevel the level at which to cache the graph.
   *
   * @return A reference to this graph for convenience.
   */
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

  /**
   * Caches the vertices and edges associated with this graph at the previously-specified target
   * storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
   * multiple queries to reuse the same construction process.
   */
  def cache(): Graph[VD, ED]

  /**
   * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative
   * algorithms that modify the vertex attributes but reuse the edges. This method can be used to
   * uncache the vertex attributes of previous iterations once they are no longer needed, improving
   * GC performance.
   */
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

  /**
   * Repartitions the edges in the graph according to `partitionStrategy`.
   *
   * @param partitionStrategy the partitioning strategy to use when partitioning the edges
   * in the graph.
   */
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

  /**
   * Repartitions the edges in the graph according to `partitionStrategy`.
   *
   * @param partitionStrategy the partitioning strategy to use when partitioning the edges
   * in the graph.
   * @param numPartitions the number of edge partitions in the new graph.
   */
  def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]

  /**
   * Transforms each vertex attribute in the graph using the map function.
   *
   * @note The new graph has the same structure.  As a consequence the underlying index structures
   * can be reused.
   *
   * @param map the function from a vertex object to a new vertex value
   *
   * @tparam VD2 the new vertex data type
   *
   * @example We might use this operation to change the vertex values
   * from one type to another to initialize an algorithm.
   * {{{
   * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
   * val root = 42
   * var bfsGraph = rawGraph.mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
   * }}}
   *
   */
  def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)
    (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

  /**
   * Transforms each edge attribute in the graph using the map function.  The map function is not
   * passed the vertex value for the vertices adjacent to the edge.  If vertex values are desired,
   * use `mapTriplets`.
   *
   * @note This graph is not changed and that the new graph has the
   * same structure.  As a consequence the underlying index structures
   * can be reused.
   *
   * @param map the function from an edge object to a new edge value.
   *
   * @tparam ED2 the new edge data type
   *
   * @example This function might be used to initialize edge
   * attributes.
   *
   */
  def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = {
    mapEdges((pid, iter) => iter.map(map))
  }

  /**
   * Transforms each edge attribute using the map function, passing it a whole partition at a
   * time. The map function is given an iterator over edges within a logical partition as well as
   * the partition‘s ID, and it should return a new iterator over the new values of each edge. The
   * new iterator‘s elements must correspond one-to-one with the old iterator‘s elements. If
   * adjacent vertex values are desired, use `mapTriplets`.
   *
   * @note This does not change the structure of the
   * graph or modify the values of this graph.  As a consequence
   * the underlying index structures can be reused.
   *
   * @param map a function that takes a partition id and an iterator
   * over all the edges in the partition, and must return an iterator over
   * the new values for each edge in the order of the input iterator
   *
   * @tparam ED2 the new edge data type
   *
   */
  def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2])
    : Graph[VD, ED2]

  /**
   * Transforms each edge attribute using the map function, passing it the adjacent vertex
   * attributes as well. If adjacent vertex values are not required,
   * consider using `mapEdges` instead.
   *
   * @note This does not change the structure of the
   * graph or modify the values of this graph.  As a consequence
   * the underlying index structures can be reused.
   *
   * @param map the function from an edge object to a new edge value.
   *
   * @tparam ED2 the new edge data type
   *
   * @example This function might be used to initialize edge
   * attributes based on the attributes associated with each vertex.
   * {{{
   * val rawGraph: Graph[Int, Int] = someLoadFunction()
   * val graph = rawGraph.mapTriplets[Int]( edge =>
   *   edge.src.data - edge.dst.data)
   * }}}
   *
   */
  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
    mapTriplets((pid, iter) => iter.map(map))
  }

  /**
   * Transforms each edge attribute a partition at a time using the map function, passing it the
   * adjacent vertex attributes as well. The map function is given an iterator over edge triplets
   * within a logical partition and should yield a new iterator over the new values of each edge in
   * the order in which they are provided.  If adjacent vertex values are not required, consider
   * using `mapEdges` instead.
   *
   * @note This does not change the structure of the
   * graph or modify the values of this graph.  As a consequence
   * the underlying index structures can be reused.
   *
   * @param map the iterator transform
   *
   * @tparam ED2 the new edge data type
   *
   */
  def mapTriplets[ED2: ClassTag](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]

  /**
   * Reverses all edges in the graph.  If this graph contains an edge from a to b then the returned
   * graph contains an edge from b to a.
   */
  def reverse: Graph[VD, ED]

  /**
   * Restricts the graph to only the vertices and edges satisfying the predicates. The resulting
   * subgraph satisifies
   *
   * {{{
   * V‘ = {v : for all v in V where vpred(v)}
   * E‘ = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
   * }}}
   *
   * @param epred the edge predicate, which takes a triplet and
   * evaluates to true if the edge is to remain in the subgraph.  Note
   * that only edges where both vertices satisfy the vertex
   * predicate are considered.
   *
   * @param vpred the vertex predicate, which takes a vertex object and
   * evaluates to true if the vertex is to be included in the subgraph
   *
   * @return the subgraph containing only the vertices and edges that
   * satisfy the predicates
   */
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]

  /**
   * Restricts the graph to only the vertices and edges that are also in `other`, but keeps the
   * attributes from this graph.
   * @param other the graph to project this graph onto
   * @return a graph with vertices and edges that exist in both the current graph and `other`,
   * with vertex and edge data from the current graph
   */
  def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

  /**
   * Merges multiple edges between two vertices into a single edge. For correct results, the graph
   * must have been partitioned using [[partitionBy]].
   *
   * @param merge the user-supplied commutative associative function to merge edge attributes
   *              for duplicate edges.
   *
   * @return The resulting graph with a single edge for each (source, dest) vertex pair.
   */
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

  /**
   * Aggregates values from the neighboring edges and vertices of each vertex.  The user supplied
   * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
   * "sent" to either vertex in the edge.  The `reduceFunc` is then used to combine the output of
   * the map phase destined to each vertex.
   *
   * @tparam A the type of "message" to be sent to each vertex
   *
   * @param mapFunc the user defined map function which returns 0 or
   * more messages to neighboring vertices
   *
   * @param reduceFunc the user defined reduce function which should
   * be commutative and associative and is used to combine the output
   * of the map phase
   *
   * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to
   * consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on
   * edges with destination in the active set.  If the direction is `Out`,
   * `mapFunc` will only be run on edges originating from vertices in the active set. If the
   * direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set
   * . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the
   * active set. The active set must have the same index as the graph‘s vertices.
   *
   * @example We can use this function to compute the in-degree of each
   * vertex
   * {{{
   * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
   * val inDeg: RDD[(VertexId, Int)] =
   *   mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
   * }}}
   *
   * @note By expressing computation at the edge level we achieve
   * maximum parallelism.  This is one of the core functions in the
   * Graph API in that enables neighborhood level computation. For
   * example this function can be used to count neighbors satisfying a
   * predicate or implement PageRank.
   *
   */
  def mapReduceTriplets[A: ClassTag](
      mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      reduceFunc: (A, A) => A,
      activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
    : VertexRDD[A]

  /**
   * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`.  The
   * input table should contain at most one entry for each vertex.  If no entry in `other` is
   * provided for a particular vertex in the graph, the map function receives `None`.
   *
   * @tparam U the type of entry in the table of updates
   * @tparam VD2 the new vertex value type
   *
   * @param other the table to join with the vertices in the graph.
   *              The table should contain at most one entry for each vertex.
   * @param mapFunc the function used to compute the new vertex values.
   *                The map function is invoked for all vertices, even those
   *                that do not have a corresponding entry in the table.
   *
   * @example This function is used to update the vertices with new values based on external data.
   *          For example we could add the out-degree to each vertex record:
   *
   * {{{
   * val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
   * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees
   * val graph = rawGraph.outerJoinVertices(outDeg) {
   *   (vid, data, optDeg) => optDeg.getOrElse(0)
   * }
   * }}}
   */
  def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null)
    : Graph[VD2, ED]

  /**
   * The associated [[GraphOps]] object.
   */
  // Save a copy of the GraphOps object so there is always one unique GraphOps object
  // for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
  val ops = new GraphOps(this)
} // end of Graph

/**
 * The Graph object contains a collection of routines used to construct graphs from RDDs.
 */
object Graph {

  /**
   * Construct a graph from a collection of edges encoded as vertex id pairs.
   *
   * @param rawEdges a collection of edges in (src, dst) form
   * @param defaultValue the vertex attributes with which to create vertices referenced by the edges
   * @param uniqueEdges if multiple identical edges are found they are combined and the edge
   * attribute is set to the sum.  Otherwise duplicate edges are treated as separate. To enable
   * `uniqueEdges`, a [[PartitionStrategy]] must be provided.
   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
   * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
   *
   * @return a graph with edge attributes containing either the count of duplicate edges or 1
   * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
   */
  def fromEdgeTuples[VD: ClassTag](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
  {
    val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
    val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
    uniqueEdges match {
      case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
      case None => graph
    }
  }

  /**
   * Construct a graph from a collection of edges.
   *
   * @param edges the RDD containing the set of edges in the graph
   * @param defaultValue the default vertex attribute to use for each vertex
   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
   * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
   *
   * @return a graph with edge attributes described by `edges` and vertices
   *         given by all vertices in `edges` with value `defaultValue`
   */
  def fromEdges[VD: ClassTag, ED: ClassTag](
      edges: RDD[Edge[ED]],
      defaultValue: VD,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
    GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  }

  /**
   * Construct a graph from a collection of vertices and
   * edges with attributes.  Duplicate vertices are picked arbitrarily and
   * vertices found in the edge collection but not in the input
   * vertices are assigned the default attribute.
   *
   * @tparam VD the vertex attribute type
   * @tparam ED the edge attribute type
   * @param vertices the "set" of vertices and their attributes
   * @param edges the collection of edges in the graph
   * @param defaultVertexAttr the default vertex attribute to use for vertices that are
   *                          mentioned in edges but not in vertices
   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
   * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
   */
  def apply[VD: ClassTag, ED: ClassTag](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null.asInstanceOf[VD],
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
    GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
  }

  /**
   * Implicitly extracts the [[GraphOps]] member from a graph.
   *
   * To improve modularity the Graph type only contains a small set of basic operations.
   * All the convenience operations are defined in the [[GraphOps]] class which may be
   * shared across multiple graph implementations.
   */
  implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag]
      (g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops
} // end of Graph object
时间: 2024-10-06 20:45:33

Spark之GraphX的Graph_scala学习的相关文章

Spark的Rpct模块的学习

Spark的Rpct模块的学习 Spark的Rpc模块是1.x重构出来可,以前的代码中大量使用了akka的类,为了把akka从项目的依赖中移除,所有添加了该模块.先看下该模块的几个主要的类 使用EA把该模块所有的类都添加进来了 要看懂该模块还是要先了解akka,  akka有Actor和ActorRef两个类,一个用于接收消息,一个用于发送消息.正好对应该模块的RpcEndpoint和RpcEndpointRef两个类. 下面大致介绍下这几个类,附带一些scala的特性 1:RpcAddress

Spark运行调试方法与学习资源汇总

最近,在学习和使用Spark的过程中,遇到了一些莫名其妙的错误和问题,在逐个解决的过程中,体会到有必要对解决上述问题的方法进行总结,以便能够在短时间内尽快发现问题来源并解决问题,现与各位看官探讨学习如下: 解决spark运行调试问题的四把“尖刀”: 1.Log 包括控制台日志.主从节点日志.HDFS日志等.许多错误可以通过日志,直接对错误类型.错误来源进行准确定位,因此,学会读取和分析Log是解决问题的第一步. 2.Google 确定错误类型和原因后,就可以使用Google在Spark User

基于Spark的异构分布式深度学习平台

导读:本文介绍百度基于Spark的异构分布式深度学习系统,把Spark与深度学习平台PADDLE结合起来解决PADDLE与业务逻辑间的数据通路问题,在此基础上使用GPU与FPGA异构计算提升每台机器的数据处理能力,使用YARN对异构资源做分配,支持Multi-Tenancy,让资源的使用更有效. 深层神经网络技术最近几年取得了巨大的突破,特别在语音和图像识别应用上有质的飞跃,已经被验证能够使用到许多业务上.如何大规模分布式地执行深度学习程序,使其更好地支持不同的业务线成为当务之急.在过去两年,百

Spark的Streaming和Spark的SQL简单入门学习

1.Spark Streaming是什么? a.Spark Streaming是什么? Spark Streaming类似于Apache Storm,用于流式数据的处理.根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点.Spark Streaming支持的数据输入源很多,例如:Kafka.Flume.Twitter.ZeroMQ和简单的TCP套接字等等.数据输入后可以用Spark的高度抽象原语如:map.reduce.join.window等进行运算.而结果也能保存

Spark中GraphX图运算pregel详解

由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释: package com.txq.spark.test import org.apache.spark.graphx.util.GraphGeneratorsimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext, SparkException, gra

Spark 从入门到精通学习笔记大纲

Spark 传奇行动目录 我把这个部份称之为Spark世界的[九阴真经],是 Spark学习者增强内功的首选地方. 第28课:Spark天堂之门解密 (点击进入博客)从 SparkContext 创建3大核心对象开始到注册给 Master 这个过程中的源码鉴赏 第29课:Master HA彻底解密 (点击进入博客)从 Master 如何基于 ZooKeeper 来做 HA 的源码鉴赏 第30课:Master的注册机制和状态管理解密 (点击进入博客)从 Master 的角度去分析它是如何接收 Wo

Spark RDD类源码学习(未完)

每天进步一点点~开搞~ abstract class RDD[T: ClassTag]( //@transient 注解表示将字段标记为瞬态的 @transient private var _sc: SparkContext, // Seq是序列,元素有插入的先后顺序,可以有重复的元素. @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { if (classOf[RDD[_]]

大数据Spark蘑菇云前传第15课:Scala类型参数编程实战及Spark源码鉴赏(学习笔记)

前传第15课:Scala类型参数编程实战及Spark源码鉴赏 本課課程: Spark源码中的Scala类型系統的使用 Scala类型系統编程操作实战 Spark源码中的Scala类型系統的使用 classOf[RDD[_]] 這個也是类型系統 這里的意思是說 B 這種類型必需至少是 A 這樣類型 Ordering Scala类型系統编程操作实战 作為類型系統最大的就可以對類型進行限制,在Scala 中的類型系統,他本身也作為對象.e.g. 我們可以建立 Person 這個類,現在可以建立一個什麼

Spark (Python版) 零基础学习笔记(二)—— Spark Transformations总结及举例

1. map(func) 将func函数作用到数据集的每个元素,生成一个新的分布式的数据集并返回 1 >>> a = sc.parallelize(('a', 'b', 'c')) 2 >>> a.map(lambda x: x+'1').collect() 3 ['a1', 'b1', 'c1'] 2. filter(func) 选出所有func返回值为true的元素,作为一个新的数据集返回 1 >>> a = sc.parallelize(rang