/** * Created by root on 9/8/15. */ import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD object SparkGraphXTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("graphx app").setMaster("local") val sc = new SparkContext(conf) val users: RDD[(VertexId, (String, String))] = sc.parallelize( Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) val relationships: RDD[Edge[String]] = sc.parallelize( Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) val defaultUser = ("John Doe", "Missing") val graph = Graph(users, relationships, defaultUser) val count1 = graph.vertices.filter{ case (id, (name, pos)) => pos == "postdoc" }.count() val count2 = graph.edges.filter(e => e.srcId > e.dstId).count() val count3 = graph.edges.filter{ case Edge(src, dst, prop) => src > dst }.count() println(count1) println(count2) println(count3) val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._2 + " is the " + triplet.attr + " of " +triplet.dstAttr._2) facts.collect().foreach(println(_)) val users2: RDD[(VertexId, (String, String ,String))] = sc.parallelize( Array((3L, ("rxin", "student", "20")), (7L, ("jgonzal", "postdoc", "22")), (5L, ("franklin", "prof", "24")), (2L, ("istoica", "prof", "26")))) val relationships2: RDD[Edge[String]] = sc.parallelize( Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) val defaultUser2 = ("Amy Sun", "aaa", "18") val graph2 = Graph(users2, relationships2, defaultUser2) val facts2: RDD[String] = graph2.triplets.map(triplet => triplet.srcAttr.toString() + " is the " + triplet.attr + " of " +triplet.dstAttr.toString()) facts2.collect().foreach(println(_)) } }
时间: 2024-10-12 13:07:38