From 90b5e16909c6e8a0fe16b2fcb2b551cc2c32cb45 Mon Sep 17 00:00:00 2001 From: Paco Nathan Date: Sat, 20 Dec 2014 20:38:50 -0800 Subject: [PATCH] found the missing SSSP --- .../com/databricks/apps/graphx/sssp.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 src/main/scala/com/databricks/apps/graphx/sssp.scala diff --git a/src/main/scala/com/databricks/apps/graphx/sssp.scala b/src/main/scala/com/databricks/apps/graphx/sssp.scala new file mode 100644 index 0000000..5e17260 --- /dev/null +++ b/src/main/scala/com/databricks/apps/graphx/sssp.scala @@ -0,0 +1,33 @@ +// SSSP impl in Graphx using Pregel +//https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm +//http://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api +//http://stackoverflow.com/questions/23700124/how-to-get-sssp-actual-path-by-apache-spark-graphx + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators + +val graph = GraphGenerators.logNormalGraph(sc, numVertices = 5, numEParts = sc.defaultParallelism, mu = 4.0, sigma = 1.3).mapEdges(e => e.attr.toDouble) +graph.edges.foreach(println) + +// initialize all vertices except the root to have distance infinity +val sourceId: VertexId = 0 +val initialGraph : Graph[(Double, List[VertexId]), Double] = graph.mapVertices((id, _) => if (id == sourceId) (0.0, List[VertexId](sourceId)) else (Double.PositiveInfinity, List[VertexId]())) + +val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), Int.MaxValue, EdgeDirection.Out)( + // vertex program + (id, dist, newDist) => if (dist._1 < newDist._1) dist else newDist, + + // send message + triplet => { + if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr ) { + Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr , triplet.srcAttr._2 :+ triplet.dstId))) + } else { + Iterator.empty + } + }, + + // merge message + (a, b) => if (a._1 < b._1) a else b) + + println(sssp.vertices.collect.mkString("\n") +)