Skip to content

Commit 8038da2

Browse files
committed
Merge pull request #2 from jegonzal/GraphXCCIssue
Improving documentation and identifying potential bug in CC calculation.
2 parents 97cd27e + 80e4d98 commit 8038da2

File tree

4 files changed

+89
-22
lines changed

4 files changed

+89
-22
lines changed

docs/graphx-programming-guide.md

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ import org.apache.spark.graphx._
8484
import org.apache.spark.rdd.RDD
8585
{% endhighlight %}
8686

87-
If you are not using the Spark shell you will also need a Spark context.
87+
If you are not using the Spark shell you will also need a `SparkContext`. To learn more about
88+
getting started with Spark refer to the [Spark Quick Start Guide](quick-start.html).
8889

8990
# The Property Graph
9091
<a name="property_graph"></a>
@@ -190,7 +191,7 @@ and `graph.edges` members respectively.
190191
{% highlight scala %}
191192
val graph: Graph[(String, String), String] // Constructed from above
192193
// Count all users which are postdocs
193-
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc"}.count
194+
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
194195
// Count all the edges where src > dst
195196
graph.edges.filter(e => e.srcId > e.dstId).count
196197
{% endhighlight %}
@@ -258,8 +259,10 @@ val graph: Graph[(String, String), String]
258259
val indDegrees: VertexRDD[Int] = graph.inDegrees
259260
{% endhighlight %}
260261

261-
The reason for differentiating between core graph operations and GraphOps is to be able to support
262-
various graph representations in the future.
262+
The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be
263+
able to support different graph representations in the future. Each graph representation must
264+
provide implementations of the core operations and reuse many of the useful operations defined in
265+
[`GraphOps`][GraphOps].
263266

264267
## Property Operators
265268

@@ -334,14 +337,32 @@ interest or eliminate broken links. For example in the following code we remove
334337
[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED]
335338

336339
{% highlight scala %}
337-
val users: RDD[(VertexId, (String, String))]
338-
val edges: RDD[Edge[String]]
340+
// Create an RDD for the vertices
341+
val users: RDD[(VertexID, (String, String))] =
342+
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
343+
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
344+
(4L, ("peter", "student"))))
345+
// Create an RDD for edges
346+
val relationships: RDD[Edge[String]] =
347+
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
348+
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
349+
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
339350
// Define a default user in case there are relationship with missing user
340351
val defaultUser = ("John Doe", "Missing")
341352
// Build the initial Graph
342353
val graph = Graph(users, relationships, defaultUser)
354+
// Notice that there is a user 0 (for which we have no information) connecting users
355+
// 4 (peter) and 5 (franklin).
356+
graph.triplets.map(
357+
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
358+
).collect.foreach(println(_))
343359
// Remove missing vertices as well as the edges to connected to them
344360
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
361+
// The valid subgraph will disconnect users 4 and 5 by removing user 0
362+
validGraph.vertices.collect.foreach(println(_))
363+
validGraph.triplets.map(
364+
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
365+
).collect.foreach(println(_))
345366
{% endhighlight %}
346367

347368
> Note in the above example only the vertex predicate is provided. The `subgraph` operator defaults

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,8 +325,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
325325
*
326326
* @see [[org.apache.spark.graphx.lib.ConnectedComponents]]
327327
*/
328-
def connectedComponents(): Graph[VertexID, ED] = {
329-
ConnectedComponents.run(graph)
328+
def connectedComponents(undirected: Boolean = true): Graph[VertexID, ED] = {
329+
ConnectedComponents.run(graph, undirected)
330330
}
331331

332332
/**

graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,42 @@ object ConnectedComponents {
1414
* @tparam ED the edge attribute type (preserved in the computation)
1515
*
1616
* @param graph the graph for which to compute the connected components
17+
* @param undirected compute reachability ignoring edge direction.
1718
*
1819
* @return a graph with vertex attributes containing the smallest vertex in each
1920
* connected component
2021
*/
21-
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
22+
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], undirected: Boolean = true):
23+
Graph[VertexID, ED] = {
2224
val ccGraph = graph.mapVertices { case (vid, _) => vid }
23-
24-
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
25-
if (edge.srcAttr < edge.dstAttr) {
26-
Iterator((edge.dstId, edge.srcAttr))
27-
} else if (edge.srcAttr > edge.dstAttr) {
28-
Iterator((edge.srcId, edge.dstAttr))
29-
} else {
30-
Iterator.empty
25+
if (undirected) {
26+
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
27+
if (edge.srcAttr < edge.dstAttr) {
28+
Iterator((edge.dstId, edge.srcAttr))
29+
} else if (edge.srcAttr > edge.dstAttr) {
30+
Iterator((edge.srcId, edge.dstAttr))
31+
} else {
32+
Iterator.empty
33+
}
34+
}
35+
val initialMessage = Long.MaxValue
36+
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Both)(
37+
vprog = (id, attr, msg) => math.min(attr, msg),
38+
sendMsg = sendMessage,
39+
mergeMsg = (a, b) => math.min(a, b))
40+
} else {
41+
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
42+
if (edge.srcAttr < edge.dstAttr) {
43+
Iterator((edge.dstId, edge.srcAttr))
44+
} else {
45+
Iterator.empty
46+
}
3147
}
48+
val initialMessage = Long.MaxValue
49+
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Out)(
50+
vprog = (id, attr, msg) => math.min(attr, msg),
51+
sendMsg = sendMessage,
52+
mergeMsg = (a, b) => math.min(a, b))
3253
}
33-
val initialMessage = Long.MaxValue
34-
Pregel(ccGraph, initialMessage)(
35-
vprog = (id, attr, msg) => math.min(attr, msg),
36-
sendMsg = sendMessage,
37-
mergeMsg = (a, b) => math.min(a, b))
3854
} // end of connectedComponents
3955
}

graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,34 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
8080
}
8181
} // end of reverse chain connected components
8282

83+
test("Connected Components on a Toy Connected Graph") {
84+
withSpark { sc =>
85+
// Create an RDD for the vertices
86+
val users: RDD[(VertexID, (String, String))] =
87+
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
88+
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
89+
(4L, ("peter", "student"))))
90+
// Create an RDD for edges
91+
val relationships: RDD[Edge[String]] =
92+
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
93+
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
94+
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
95+
// Edges are:
96+
// 2 ---> 5 ---> 3
97+
// | \
98+
// V \|
99+
// 4 ---> 0 7
100+
//
101+
// Define a default user in case there are relationship with missing user
102+
val defaultUser = ("John Doe", "Missing")
103+
// Build the initial Graph
104+
val graph = Graph(users, relationships, defaultUser)
105+
val ccGraph = graph.connectedComponents(undirected = true)
106+
val vertices = ccGraph.vertices.collect
107+
for ( (id, cc) <- vertices ) {
108+
assert(cc == 0)
109+
}
110+
}
111+
} // end of toy connected components
112+
83113
}

0 commit comments

Comments
 (0)