Skip to content

Commit 3bcaa2f

Browse files
committed
Added faster version of kcore and fixed compile issue with dataflow pagerank
1 parent 03f5d76 commit 3bcaa2f

File tree

3 files changed

+106
-3
lines changed

3 files changed

+106
-3
lines changed

graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ object Analytics extends Logging {
165165
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
166166

167167
logWarning("Starting kcore")
168-
val result = KCore.run(graph, kmax, kmin)
168+
val result = KCoreFast.run(graph, kmax, kmin)
169169

170170
logWarning("Size of cores: " + result.vertices.map { case (vid,data) => (min(data, kmax), 1) }.reduceByKey((_+_)).collect().mkString(", "))
171171
sc.stop()

graphx/src/main/scala/org/apache/spark/graphx/lib/DataflowPagerank.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,9 @@ object DataflowPagerank extends Logging {
216216
// emit min ccId to src and adst
217217
.flatMap { case (dst: Long, ((src: Long, srcCC: Long), dstCC)) =>
218218
if (srcCC < dstCC) {
219-
Iterator(dst, srcCC)
219+
Iterator((dst, srcCC))
220220
} else if (dstCC < srcCC) {
221-
Iterator(src, dstCC)
221+
Iterator((src, dstCC))
222222
} else {
223223
Iterator.empty
224224
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package org.apache.spark.graphx.lib
2+
3+
import org.apache.spark.graphx._
4+
import org.apache.spark._
5+
import scala.math._
6+
import org.apache.spark.SparkContext._
7+
import scala.reflect.ClassTag
8+
9+
object KCoreFast extends Logging {
10+
/**
11+
* Compute the k-core decomposition of the graph for all k <= kmax. This
12+
* uses the iterative pruning algorithm discussed by Alvarez-Hamelin et al.
13+
* in K-Core Decomposition: a Tool For the Visualization of Large Scale Networks
14+
* (see <a href="http://arxiv.org/abs/cs/0504107">http://arxiv.org/abs/cs/0504107</a>).
15+
*
16+
* @tparam VD the vertex attribute type (discarded in the computation)
17+
* @tparam ED the edge attribute type (preserved in the computation)
18+
*
19+
* @param graph the graph for which to compute the connected components
20+
* @param kmax the maximum value of k to decompose the graph
21+
*
22+
* @return a graph where the vertex attribute is the minimum of
23+
* kmax or the highest value k for which that vertex was a member of
24+
* the k-core.
25+
*
26+
* @note This method has the advantage of returning not just a single kcore of the
27+
* graph but will yield all the cores for all k in [1, kmax].
28+
*/
29+
30+
def run[VD: ClassTag, ED: ClassTag](
31+
graph: Graph[VD, ED],
32+
kmax: Int,
33+
kmin: Int = 1)
34+
: Graph[Int, ED] = {
35+
36+
// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
37+
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => newData.getOrElse(0)).cache
38+
// val degrees = graph.degrees
39+
// val numVertices = degrees.count
40+
// logWarning(s"Numvertices: $numVertices")
41+
// logWarning(s"degree sample: ${degrees.take(10).mkString(", ")}")
42+
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
43+
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).take(10).mkString(", "))
44+
var curK = kmin
45+
while (curK <= kmax) {
46+
g = computeCurrentKCore(g, curK).cache
47+
val testK = curK
48+
val vCount = g.vertices.filter{ case (vid, vd) => vd >= curK}.count()
49+
val eCount = g.triplets.map{t => t.srcAttr >= testK && t.dstAttr >= testK }.count()
50+
logWarning(s"K=$curK, V=$vCount, E=$eCount")
51+
curK += 1
52+
}
53+
g.mapVertices({ case (_, k) => k})
54+
}
55+
56+
def computeCurrentKCore[ED: ClassTag](graph: Graph[Int, ED], k: Int) = {
57+
logWarning(s"Computing kcore for k=$k")
58+
def sendMsg(et: EdgeTriplet[Int, ED]): Iterator[(VertexId, Int)] = {
59+
if (et.srcAttr < 0 || et.dstAttr < 0) {
60+
// if either vertex has already been turned off we do nothing
61+
Iterator.empty
62+
} else if (et.srcAttr < k && et.dstAttr < k) {
63+
// tell both vertices to turn off but don't need change count value
64+
Iterator((et.srcId, -1), (et.dstId, -1))
65+
66+
} else if (et.srcAttr < k) {
67+
// if src is being pruned, tell dst to subtract from vertex count
68+
Iterator((et.srcId, -1), (et.dstId, 1))
69+
70+
} else if (et.dstAttr < k) {
71+
// if dst is being pruned, tell src to subtract from vertex count
72+
Iterator((et.dstId, -1), (et.srcId, 1))
73+
74+
} else {
75+
Iterator.empty
76+
}
77+
}
78+
79+
// subtracts removed neighbors from neighbor count and tells vertex whether it was turned off or not
80+
def mergeMsg(m1: Int, m2: Int): Int = {
81+
if (m1 < 0 || m2 < 0) {
82+
-1
83+
} else {
84+
m1 + m2
85+
}
86+
}
87+
88+
def vProg(vid: VertexId, data: Int, update: Int): Int = {
89+
if (update < 0) {
90+
// if the vertex has turned off, keep it turned off
91+
-1
92+
} else {
93+
// subtract the number of neighbors that have turned off this round from
94+
// the count of active vertices
95+
// TODO(crankshaw) can we ever have the case data < update?
96+
max(data - update, 0)
97+
}
98+
}
99+
100+
// Note that initial message should have no effect
101+
Pregel(graph, 0)(vProg, sendMsg, mergeMsg)
102+
}
103+
}

0 commit comments

Comments
 (0)