/** * 自定义收集VertexId的neighborIds * @author TongXueQiang */def collectNeighborIds[T,U](edgeDirection:EdgeDirection,graph:Graph[T,U])(implicit m:scala.reflect.ClassTag[T],n:scala.reflect.ClassTag[U]):VertexRDD[Array[VertexId]] = { val nbrs = graph.mapReduceTriplets[Array[VertexId]]( //map函数 edgeTriplets => { val msgTosrc = (edgeTriplets.srcId,Array(edgeTriplets.dstId)); val msgTodst = (edgeTriplets.dstId,Array(edgeTriplets.srcId)); edgeDirection match { case EdgeDirection.Either =>Iterator(msgTosrc,msgTodst) case EdgeDirection.Out => Iterator(msgTosrc) case EdgeDirection.In => Iterator(msgTodst) case EdgeDirection.Both => throw new SparkException("It doesn‘t make sense to collect neighbors without a " + "direction.(EdgeDirection.Both is not supported.use EdgeDirection.Either instead.)") } },_ ++ _)//reduce函数 nbrs}测试:object Test {
System.setProperty("hadoop.home.dir","D://hadoop-2.6.2"); val conf = new SparkConf().setMaster("local").setAppName("SparkGraph"); val sc = new SparkContext(conf);
def main(args:Array[String]):Unit = { val graph = GraphGenerators.logNormalGraph(sc,numVertices = 100).map((id,_) => id.toDouble);
collectNeighborIds(EdgeDirection.In,graph).foreach(line => {print(line._1+":"); for (elem <- line._2) {print(elem + " ")};println;});
} }
时间: 2024-08-11 17:21:32