Skip to content

Commit 76a6a54

Browse files
committed
Fixed kcore. now works.
1 parent 1e924df commit 76a6a54

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.graphx.lib
1919

2020
import org.apache.spark._
21+
import scala.math._
2122
import org.apache.spark.graphx._
2223
import org.apache.spark.graphx.PartitionStrategy._
2324
import org.apache.spark.SparkContext._
@@ -163,8 +164,10 @@ object Analytics extends Logging {
163164
minEdgePartitions = numEPart).cache()
164165
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
165166

167+
logWarning("Starting kcore")
166168
val result = KCore.run(graph, kmax, kmin)
167-
println("Size of cores: " + result.vertices.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
169+
170+
logWarning("Size of cores: " + result.vertices.map { case (vid,data) => (min(data, kmax), 1) }.reduceByKey((_+_)).collect().mkString(", "))
168171
sc.stop()
169172

170173
case "triangles" =>

graphx/src/main/scala/org/apache/spark/graphx/lib/KCore.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,12 @@ object KCore extends Logging {
3535

3636
// Graph[(Int, Boolean), ED] - boolean indicates whether it is active or not
3737
var g = graph.outerJoinVertices(graph.degrees)((vid, oldData, newData) => (newData.getOrElse(0), true)).cache
38-
var degrees = graph.degrees
39-
println("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
38+
val degrees = graph.degrees
39+
val numVertices = degrees.count
40+
// logWarning(s"Numvertices: $numVertices")
41+
// logWarning(s"degree sample: ${degrees.take(10).mkString(", ")}")
42+
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).collect().mkString(", "))
43+
// logWarning("degree distribution: " + degrees.map{ case (vid,data) => (data, 1)}.reduceByKey((_+_)).take(10).mkString(", "))
4044
var curK = kmin
4145
while (curK <= kmax) {
4246
g = computeCurrentKCore(g, curK).cache
@@ -50,6 +54,7 @@ object KCore extends Logging {
5054
}
5155

5256
def computeCurrentKCore[ED: ClassTag](graph: Graph[(Int, Boolean), ED], k: Int) = {
57+
logWarning(s"Computing kcore for k=$k")
5358
def sendMsg(et: EdgeTriplet[(Int, Boolean), ED]): Iterator[(VertexId, (Int, Boolean))] = {
5459
if (!et.srcAttr._2 || !et.dstAttr._2) {
5560
// if either vertex has already been turned off we do nothing
@@ -86,7 +91,6 @@ object KCore extends Logging {
8691
}
8792

8893
// Note that initial message should have no effect
89-
logWarning("kcore starting pregel")
9094
Pregel(graph, (0, true))(vProg, sendMsg, mergeMsg)
9195
}
9296
}

0 commit comments

Comments
 (0)