Skip to content

Commit 4ddc552

Browse files
committed
Added kcore implementation.
Conflicts: graph/src/main/scala/org/apache/spark/graph/Pregel.scala
1 parent 250199a commit 4ddc552

File tree

3 files changed

+281
-0
lines changed

3 files changed

+281
-0
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package org.apache.spark.graph
2+
3+
4+
/**
5+
* This object implements a Pregel-like bulk-synchronous
6+
* message-passing API. However, unlike the original Pregel API the
7+
* GraphX pregel API factors the sendMessage computation over edges,
8+
* enables the message sending computation to read both vertex
9+
* attributes, and finally constrains messages to the graph structure.
10+
* These changes allow for substantially more efficient distributed
11+
* execution while also exposing greater flexibility for graph based
12+
* computation.
13+
*
14+
* @example We can use the Pregel abstraction to implement PageRank
15+
* {{{
16+
* val pagerankGraph: Graph[Double, Double] = graph
17+
* // Associate the degree with each vertex
18+
* .outerJoinVertices(graph.outDegrees){
19+
* (vid, vdata, deg) => deg.getOrElse(0)
20+
* }
21+
* // Set the weight on the edges based on the degree
22+
* .mapTriplets( e => 1.0 / e.srcAttr )
23+
* // Set the vertex attributes to the initial pagerank values
24+
* .mapVertices( (id, attr) => 1.0 )
25+
*
26+
* def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
27+
* resetProb + (1.0 - resetProb) * msgSum
28+
* def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
29+
* Some(edge.srcAttr * edge.attr)
30+
* def messageCombiner(a: Double, b: Double): Double = a + b
31+
* val initialMessage = 0.0
32+
* // Execute pregel for a fixed number of iterations.
33+
* Pregel(pagerankGraph, initialMessage, numIter)(
34+
* vertexProgram, sendMessage, messageCombiner)
35+
* }}}
36+
*
37+
*/
38+
object Pregel {
39+
40+
/**
41+
* Execute a Pregel-like iterative vertex-parallel abstraction. The
42+
* user-defined vertex-program `vprog` is executed in parallel on
43+
* each vertex receiving any inbound messages and computing a new
44+
* value for the vertex. The `sendMsg` function is then invoked on
45+
* all out-edges and is used to compute an optional message to the
46+
* destination vertex. The `mergeMsg` function is a commutative
47+
* associative function used to combine messages destined to the
48+
* same vertex.
49+
*
50+
* On the first iteration all vertices receive the `initialMsg` and
51+
* on subsequent iterations if a vertex does not receive a message
52+
* then the vertex-program is not invoked.
53+
*
54+
* This function iterates until there are no remaining messages, or
55+
* for maxIterations iterations.
56+
*
57+
* @tparam VD the vertex data type
58+
* @tparam ED the edge data type
59+
* @tparam A the Pregel message type
60+
*
61+
* @param graph the input graph.
62+
*
63+
* @param initialMsg the message each vertex will receive at the on
64+
* the first iteration.
65+
*
66+
* @param maxIterations the maximum number of iterations to run for.
67+
*
68+
* @param vprog the user-defined vertex program which runs on each
69+
* vertex and receives the inbound message and computes a new vertex
70+
* value. On the first iteration the vertex program is invoked on
71+
* all vertices and is passed the default message. On subsequent
72+
* iterations the vertex program is only invoked on those vertices
73+
* that receive messages.
74+
*
75+
* @param sendMsg a user supplied function that is applied to out
76+
* edges of vertices that received messages in the current
77+
* iteration.
78+
*
79+
* @param mergeMsg a user supplied function that takes two incoming
80+
* messages of type A and merges them into a single message of type
81+
* A. ''This function must be commutative and associative and
82+
* ideally the size of A should not increase.''
83+
*
84+
* @return the resulting graph at the end of the computation
85+
*
86+
*/
87+
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
88+
(graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
89+
vprog: (Vid, VD, A) => VD,
90+
sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
91+
mergeMsg: (A, A) => A)
92+
: Graph[VD, ED] = {
93+
94+
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
95+
// compute the messages
96+
var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
97+
var activeMessages = messages.count()
98+
// Loop
99+
var i = 0
100+
while (activeMessages > 0 && i < maxIterations) {
101+
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
102+
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
103+
// Update the graph with the new vertices.
104+
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
105+
106+
val oldMessages = messages
107+
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
108+
// get to send messages.
109+
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
110+
activeMessages = messages.count()
111+
// after counting we can unpersist the old messages
112+
oldMessages.unpersist(blocking=false)
113+
// count the iteration
114+
i += 1
115+
}
116+
117+
g
118+
} // end of apply
119+
120+
121+
// runs Pregel but treats graph as undirected (e.g. sends messages along both in and out edges)
122+
def undirectedRun[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
123+
(graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
124+
vprog: (Vid, VD, A) => VD,
125+
sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
126+
mergeMsg: (A, A) => A)
127+
: Graph[VD, ED] = {
128+
129+
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
130+
// compute the messages
131+
var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
132+
var activeMessages = messages.count()
133+
// Loop
134+
var i = 0
135+
while (activeMessages > 0 && i < maxIterations) {
136+
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
137+
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
138+
// Update the graph with the new vertices.
139+
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
140+
141+
val oldMessages = messages
142+
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
143+
// get to send messages.
144+
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Both))).cache()
145+
activeMessages = messages.count()
146+
// after counting we can unpersist the old messages
147+
oldMessages.unpersist(blocking=false)
148+
// count the iteration
149+
i += 1
150+
}
151+
152+
g
153+
} // end of apply
154+
155+
} // end of class Pregel
156+
157+
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.apache.spark.graph.algorithms
2+
3+
import org.apache.spark.graph._
4+
import org.apache.spark._
5+
import scala.math._
6+
7+
object KCore extends Logging {
8+
/**
9+
* Compute the k-core decomposition of the graph for all k <= kmax. This
10+
* uses the iterative pruning algorithm discussed by Alvarez-Hamelin et al.
11+
* in K-Core Decomposition: a Tool For the Visualization of Large Scale Networks
12+
* (see <a href="http://arxiv.org/abs/cs/0504107">http://arxiv.org/abs/cs/0504107</a>).
13+
*
14+
* @tparam VD the vertex attribute type (discarded in the computation)
15+
* @tparam ED the edge attribute type (preserved in the computation)
16+
*
17+
* @param graph the graph for which to compute the connected components
18+
* @param kmax the maximum value of k to decompose the graph
19+
*
20+
* @return a graph where the vertex attribute is the minimum of
21+
* kmax or the highest value k for which that vertex was a member of
22+
* the k-core.
23+
*
24+
* @note This method has the advantage of returning not just a single kcore of the
25+
* graph but will yield all the cores for all k in [1, kmax].
26+
*/
27+
28+
def run[VD: Manifest, ED: Manifest](
29+
graph: Graph[VD, ED],
30+
kmax: Int)
31+
: Graph[Int, ED] = {
32+
33+
// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
34+
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true))
35+
var curK = 1
36+
while (curK <= kmax) {
37+
g = computeCurrentKCore(g, curK)
38+
curK += 1
39+
}
40+
g.mapVertices({ case (_, (k, _)) => k})
41+
}
42+
43+
def computeCurrentKCore[ED: Manifest](graph: Graph[(Int, Boolean), ED], k: Int) = {
44+
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(Vid, (Int, Boolean))] = {
45+
if (!et.srcAttr._2 || !et.dstAttr._2) {
46+
// if either vertex has already been turned off, in which case we do nothing
47+
Iterator.empty
48+
} else if (et.srcAttr._1 < k && et.dstAttr._1 < k) {
49+
// tell both vertices to turn off but don't need change count value
50+
Iterator((et.srcId, (0, false)), (et.dstId, (0, false)))
51+
} else if (et.srcAttr._1 < k) {
52+
// if src is being pruned, tell dst to subtract from vertex count but don't turn off
53+
Iterator((et.srcId, (0, false)), (et.dstId, (1, true)))
54+
} else if (et.dstAttr._1 < k) {
55+
// if dst is being pruned, tell src to subtract from vertex count but don't turn off
56+
Iterator((et.dstId, (0, false)), (et.srcId, (1, true)))
57+
} else {
58+
Iterator.empty
59+
}
60+
}
61+
62+
// subtracts removed neighbors from neighbor count and tells vertex whether it was turned off or not
63+
def mergeMsg(m1: (Int, Boolean), m2: (Int, Boolean)): (Int, Boolean) = {
64+
(m1._1 + m2._1, m1._2 && m2._2)
65+
}
66+
67+
def vProg(vid: Vid, data: (Int, Boolean), update: (Int, Boolean)): (Int, Boolean) = {
68+
var newCount = data._1
69+
var on = data._2
70+
if (on) {
71+
newCount = max(k - 1, data._1 - update._1)
72+
on = update._2
73+
}
74+
(newCount, on)
75+
}
76+
77+
// Note that initial message should have no effect
78+
Pregel.undirectedRun(graph, (0, true))(vProg, sendMsg, mergeMsg)
79+
}
80+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package org.apache.spark.graph.algorithms
2+
3+
import org.scalatest.FunSuite
4+
5+
import org.apache.spark.SparkContext
6+
import org.apache.spark.SparkContext._
7+
import org.apache.spark.graph._
8+
import org.apache.spark.graph.util.GraphGenerators
9+
import org.apache.spark.rdd._
10+
11+
12+
class KCoreSuite extends FunSuite with LocalSparkContext {
13+
14+
def createTriple(sid: Vid, did: Vid, sattr: Int, dattr: Int, eattr: Int): EdgeTriplet[Int,Int] = {
15+
val et = new EdgeTriplet[Int,Int]
16+
et.srcId = sid
17+
et.dstId = did
18+
et.srcAttr = sattr
19+
et.dstAttr = dattr
20+
et.attr = eattr
21+
et
22+
}
23+
24+
def createKCoreEdges(): Seq[Edge[Int]] = {
25+
Seq(Edge(11,31), Edge(12,31), Edge(31,33), Edge(31,32), Edge(31,34), Edge(33,34),
26+
Edge(33,32), Edge(34,32), Edge(32,13), Edge(32,23), Edge(34,23), Edge(23,14),
27+
Edge(34,21), Edge(34,22), Edge(21,22))
28+
}
29+
30+
test("KCore") {
31+
withSpark { sc =>
32+
val rawEdges = createKCoreEdges()
33+
val vertices = Set((11, 1), (12,1), (13,1), (14,1), (21,2), (22,2), (23,2), (31, 3), (32,3), (33,3), (34,3))
34+
val graph = Graph.fromEdges(sc.parallelize(rawEdges), "a")
35+
val resultGraph = KCore.run(graph, 1, 5)
36+
val resultVerts = resultGraph.vertices.collect.toSet
37+
assert(resultVerts === vertices)
38+
39+
}
40+
}
41+
42+
43+
44+
}

0 commit comments

Comments
 (0)