Skip to content

Commit f483ca4

Browse files
committed
Merge pull request apache#3 from dcrankshaw/osdi_with_kcore_for_merge
Osdi with kcore for merge
2 parents 8d22500 + 9e59642 commit f483ca4

File tree

5 files changed

+203
-9
lines changed

5 files changed

+203
-9
lines changed

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ object Pregel extends Logging {
127127
// Loop
128128
var prevG: Graph[VD, ED] = null.asInstanceOf[Graph[VD, ED]]
129129
var i = 0
130+
logWarning("Starting pregel.")
130131
while (activeMessages > 0 && i < maxIterations) {
131132
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
132133
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
@@ -150,13 +151,15 @@ object Pregel extends Logging {
150151
SparkEnv.get.blockManager.shuffleBlockManager.removeAllShuffleStuff()
151152
}
152153

153-
logInfo("Pregel finished iteration " + i)
154+
logWarning("Pregel finished iteration " + i)
154155

155156
// Unpersist the RDDs hidden by newly-materialized RDDs
156157
oldMessages.unpersist(blocking=false)
157158
newVerts.unpersist(blocking=false)
158159
prevG.unpersistVertices(blocking=false)
159160
// count the iteration
161+
logWarning(s"Pregel iteration $i")
162+
// println(s"Pregel iteration $i")
160163
i += 1
161164
}
162165

graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.spark.graphx.lib
1919

2020
import org.apache.spark._
21+
import scala.math._
2122
import org.apache.spark.graphx._
2223
import org.apache.spark.graphx.PartitionStrategy._
24+
import org.apache.spark.SparkContext._
2325

2426
/**
2527
* Driver program for running graph algorithms.
@@ -127,27 +129,69 @@ object Analytics extends Logging {
127129
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
128130

129131
val cc = ConnectedComponents.run(graph)
130-
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
132+
logWarning("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct().count())
133+
sc.stop()
134+
135+
case "kcore" =>
136+
var numEPart = 4
137+
var kmax = 4
138+
var kmin = 1
139+
var partitionStrategy: Option[PartitionStrategy] = None
140+
141+
options.foreach{
142+
case ("numEPart", v) => numEPart = v.toInt
143+
case ("kmax", v) => kmax = v.toInt
144+
case ("kmin", v) => kmin = v.toInt
145+
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
146+
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
147+
}
148+
149+
if (kmax < 1) {
150+
logWarning("kmax must be positive")
151+
sys.exit(1)
152+
}
153+
if (kmax < kmin) {
154+
logWarning("kmax must be greater than or equal to kmin")
155+
sys.exit(1)
156+
}
157+
158+
println("======================================")
159+
println("| KCORE |")
160+
println("======================================")
161+
162+
val sc = new SparkContext(host, "KCore(" + fname + ")", conf)
163+
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
164+
minEdgePartitions = numEPart).cache()
165+
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
166+
167+
logWarning("Starting kcore")
168+
val result = KCore.run(graph, kmax, kmin)
169+
170+
logWarning("Size of cores: " + result.vertices.map { case (vid,data) => (min(data, kmax), 1) }.reduceByKey((_+_)).collect().mkString(", "))
131171
sc.stop()
132172

133173
case "triangles" =>
134174
var numEPart = 4
135175
// TriangleCount requires the graph to be partitioned
136-
var partitionStrategy: PartitionStrategy = RandomVertexCut
176+
var partitionStrategy: Option[PartitionStrategy] = None
137177

138178
options.foreach{
139179
case ("numEPart", v) => numEPart = v.toInt
140-
case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
180+
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
141181
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
142182
}
143183
println("======================================")
144184
println("| Triangle Count |")
145185
println("======================================")
146186
val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf)
147-
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
148-
minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
187+
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
188+
minEdgePartitions = numEPart).cache()
189+
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
190+
// val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
191+
// minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
192+
logWarning(s"Graph has ${graph.vertices.count} vertices")
149193
val triangles = TriangleCount.run(graph)
150-
println("Triangles: " + triangles.vertices.map {
194+
logWarning("Triangles: " + triangles.vertices.map {
151195
case (vid,data) => data.toLong
152196
}.reduce(_ + _) / 3)
153197
sc.stop()
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package org.apache.spark.graphx.lib
2+
3+
import org.apache.spark.graphx._
4+
import org.apache.spark._
5+
import scala.math._
6+
import org.apache.spark.SparkContext._
7+
import scala.reflect.ClassTag
8+
9+
object KCore extends Logging {
10+
/**
11+
* Compute the k-core decomposition of the graph for all k <= kmax. This
12+
* uses the iterative pruning algorithm discussed by Alvarez-Hamelin et al.
13+
* in K-Core Decomposition: a Tool For the Visualization of Large Scale Networks
14+
* (see <a href="http://arxiv.org/abs/cs/0504107">http://arxiv.org/abs/cs/0504107</a>).
15+
*
16+
* @tparam VD the vertex attribute type (discarded in the computation)
17+
* @tparam ED the edge attribute type (preserved in the computation)
18+
*
19+
* @param graph the graph for which to compute the connected components
20+
* @param kmax the maximum value of k to decompose the graph
21+
*
22+
* @return a graph where the vertex attribute is the minimum of
23+
* kmax or the highest value k for which that vertex was a member of
24+
* the k-core.
25+
*
26+
* @note This method has the advantage of returning not just a single kcore of the
27+
* graph but will yield all the cores for all k in [1, kmax].
28+
*/
29+
30+
def run[VD: ClassTag, ED: ClassTag](
31+
graph: Graph[VD, ED],
32+
kmax: Int,
33+
kmin: Int = 1)
34+
: Graph[Int, ED] = {
35+
36+
// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
37+
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache
38+
val degrees = graph.degrees
39+
val numVertices = degrees.count
40+
// logWarning(s"Numvertices: $numVertices")
41+
// logWarning(s"degree sample: ${degrees.take(10).mkString(", ")}")
42+
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
43+
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).take(10).mkString(", "))
44+
var curK = kmin
45+
while (curK <= kmax) {
46+
g = computeCurrentKCore(g, curK).cache
47+
val testK = curK
48+
val vCount = g.vertices.filter{ case (vid, (vd, _)) => vd >= testK}.count()
49+
val eCount = g.triplets.map{t => t.srcAttr._1 >= testK && t.dstAttr._1 >= testK }.count()
50+
logWarning(s"K=$curK, V=$vCount, E=$eCount")
51+
curK += 1
52+
}
53+
g.mapVertices({ case (_, (k, _)) => k})
54+
}
55+
56+
def computeCurrentKCore[ED: ClassTag](graph: Graph[(Int, Boolean), ED], k: Int) = {
57+
logWarning(s"Computing kcore for k=$k")
58+
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(VertexId, (Int, Boolean))] = {
59+
if (!et.srcAttr._2 || !et.dstAttr._2) {
60+
// if either vertex has already been turned off we do nothing
61+
Iterator.empty
62+
} else if (et.srcAttr._1 < k && et.dstAttr._1 < k) {
63+
// tell both vertices to turn off but don't need change count value
64+
Iterator((et.srcId, (0, false)), (et.dstId, (0, false)))
65+
} else if (et.srcAttr._1 < k) {
66+
// if src is being pruned, tell dst to subtract from vertex count but don't turn off
67+
Iterator((et.srcId, (0, false)), (et.dstId, (1, true)))
68+
} else if (et.dstAttr._1 < k) {
69+
// if dst is being pruned, tell src to subtract from vertex count but don't turn off
70+
Iterator((et.dstId, (0, false)), (et.srcId, (1, true)))
71+
} else {
72+
// no-op but keep these vertices active?
73+
// Iterator((et.srcId, (0, true)), (et.dstId, (0, true)))
74+
Iterator.empty
75+
}
76+
}
77+
78+
// subtracts removed neighbors from neighbor count and tells vertex whether it was turned off or not
79+
def mergeMsg(m1: (Int, Boolean), m2: (Int, Boolean)): (Int, Boolean) = {
80+
(m1._1 + m2._1, m1._2 && m2._2)
81+
}
82+
83+
def vProg(vid: VertexId, data: (Int, Boolean), update: (Int, Boolean)): (Int, Boolean) = {
84+
var newCount = data._1
85+
var on = data._2
86+
if (on) {
87+
newCount = max(k - 1, data._1 - update._1)
88+
on = update._2
89+
}
90+
(newCount, on)
91+
}
92+
93+
// Note that initial message should have no effect
94+
Pregel(graph, (0, true))(vProg, sendMsg, mergeMsg)
95+
}
96+
}

graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.graphx.lib
1919

2020
import scala.reflect.ClassTag
21+
import org.apache.spark.Logging
2122

2223
import org.apache.spark.graphx._
2324

@@ -36,10 +37,12 @@ import org.apache.spark.graphx._
3637
* (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
3738
* using [[org.apache.spark.graphx.Graph#partitionBy]].
3839
*/
39-
object TriangleCount {
40+
object TriangleCount extends Logging {
4041

4142
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
4243
// Remove redundant edges
44+
45+
logWarning("Entering Triangle Count.")
4346
val g = graph.groupEdges((a, b) => a).cache()
4447

4548
// Construct set representations of the neighborhoods
@@ -56,6 +59,8 @@ object TriangleCount {
5659
}
5760
set
5861
}
62+
63+
logWarning("Neighbor sets collected.")
5964
// join the sets with the graph
6065
val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
6166
(vid, _, optSet) => optSet.getOrElse(null)
@@ -82,12 +87,14 @@ object TriangleCount {
8287
// compute the intersection along edges
8388
val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
8489
// Merge counters with the graph and divide by two since each triangle is counted twice
85-
g.outerJoinVertices(counters) {
90+
val result = g.outerJoinVertices(counters) {
8691
(vid, _, optCounter: Option[Int]) =>
8792
val dblCount = optCounter.getOrElse(0)
8893
// double count should be even (divisible by two)
8994
assert((dblCount & 1) == 0)
9095
dblCount / 2
9196
}
97+
logWarning("Triangle count finished.")
98+
result
9299
} // end of TriangleCount
93100
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package org.apache.spark.graphx.lib
2+
3+
import org.scalatest.FunSuite
4+
5+
import org.apache.spark.SparkContext
6+
import org.apache.spark.SparkContext._
7+
import org.apache.spark.graphx._
8+
// import org.apache.spark.graphx.util.GraphGenerators
9+
import org.apache.spark.rdd._
10+
11+
12+
class KCoreSuite extends FunSuite with LocalSparkContext {
13+
14+
def createTriple(sid: VertexId, did: VertexId, sattr: Int, dattr: Int, eattr: Int): EdgeTriplet[Int,Int] = {
15+
val et = new EdgeTriplet[Int,Int]
16+
et.srcId = sid
17+
et.dstId = did
18+
et.srcAttr = sattr
19+
et.dstAttr = dattr
20+
et.attr = eattr
21+
et
22+
}
23+
24+
def createKCoreEdges(): Seq[Edge[Int]] = {
25+
Seq(Edge(11,31), Edge(12,31), Edge(31,33), Edge(31,32), Edge(31,34), Edge(33,34),
26+
Edge(33,32), Edge(34,32), Edge(32,13), Edge(32,23), Edge(34,23), Edge(23,14),
27+
Edge(34,21), Edge(34,22), Edge(21,22))
28+
}
29+
30+
test("KCore") {
31+
withSpark { sc =>
32+
val rawEdges = createKCoreEdges()
33+
val vertices = Set((11, 1), (12,1), (13,1), (14,1), (21,2), (22,2), (23,2), (31, 3), (32,3), (33,3), (34,3))
34+
val graph = Graph.fromEdges(sc.parallelize(rawEdges), "a")
35+
val resultGraph = KCore.run(graph, 5)
36+
val resultVerts = resultGraph.vertices.collect.toSet
37+
assert(resultVerts === vertices)
38+
39+
}
40+
}
41+
42+
43+
44+
}

0 commit comments

Comments
 (0)