Skip to content

Commit e3441e3

Browse files
viiryamengxr
authored andcommitted
[SPARK-12363][MLLIB] Remove setRun and fix PowerIterationClustering failed test
JIRA: https://issues.apache.org/jira/browse/SPARK-12363 This issue is pointed by yanboliang. When `setRuns` is removed from PowerIterationClustering, one of the tests will be failed. I found that some `dstAttr`s of the normalized graph are not correct values but 0.0. By setting `TripletFields.All` in `mapTriplets` it can work. Author: Liang-Chi Hsieh <viirya@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Closes #10539 from viirya/fix-poweriter.
1 parent 374c4b2 commit e3441e3

File tree

4 files changed

+96
-85
lines changed

4 files changed

+96
-85
lines changed

examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,23 @@ import org.apache.spark.rdd.RDD
4040
* n: Number of sampled points on innermost circle.. There are proportionally more points
4141
* within the outer/larger circles
4242
* maxIterations: Number of Power Iterations
43-
* outerRadius: radius of the outermost of the concentric circles
4443
* }}}
4544
*
4645
* Here is a sample run and output:
4746
*
48-
* ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15
49-
*
50-
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
51-
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
47+
* ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15
5248
*
49+
* Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
50+
* 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
5351
*
5452
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
5553
*/
5654
object PowerIterationClusteringExample {
5755

5856
case class Params(
59-
input: String = null,
60-
k: Int = 3,
61-
numPoints: Int = 5,
62-
maxIterations: Int = 10,
63-
outerRadius: Double = 3.0
57+
k: Int = 2,
58+
numPoints: Int = 10,
59+
maxIterations: Int = 15
6460
) extends AbstractParams[Params]
6561

6662
def main(args: Array[String]) {
@@ -69,17 +65,14 @@ object PowerIterationClusteringExample {
6965
val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
7066
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
7167
opt[Int]('k', "k")
72-
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
68+
.text(s"number of circles (clusters), default: ${defaultParams.k}")
7369
.action((x, c) => c.copy(k = x))
7470
opt[Int]('n', "n")
7571
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
7672
.action((x, c) => c.copy(numPoints = x))
7773
opt[Int]("maxIterations")
7874
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
7975
.action((x, c) => c.copy(maxIterations = x))
80-
opt[Double]('r', "r")
81-
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
82-
.action((x, c) => c.copy(outerRadius = x))
8376
}
8477

8578
parser.parse(args, defaultParams).map { params =>
@@ -97,20 +90,21 @@ object PowerIterationClusteringExample {
9790

9891
Logger.getRootLogger.setLevel(Level.WARN)
9992

100-
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
93+
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
10194
val model = new PowerIterationClustering()
10295
.setK(params.k)
10396
.setMaxIterations(params.maxIterations)
97+
.setInitializationMode("degree")
10498
.run(circlesRdd)
10599

106100
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
107-
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
101+
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
108102
val assignmentsStr = assignments
109103
.map { case (k, v) =>
110104
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
111-
}.mkString(",")
105+
}.mkString(", ")
112106
val sizesStr = assignments.map {
113-
_._2.size
107+
_._2.length
114108
}.sorted.mkString("(", ",", ")")
115109
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
116110

@@ -124,20 +118,17 @@ object PowerIterationClusteringExample {
124118
}
125119
}
126120

127-
def generateCirclesRdd(sc: SparkContext,
128-
nCircles: Int = 3,
129-
nPoints: Int = 30,
130-
outerRadius: Double): RDD[(Long, Long, Double)] = {
131-
132-
val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
133-
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
134-
val points = (0 until nCircles).flatMap { cx =>
135-
generateCircle(radii(cx), groupSizes(cx))
121+
def generateCirclesRdd(
122+
sc: SparkContext,
123+
nCircles: Int,
124+
nPoints: Int): RDD[(Long, Long, Double)] = {
125+
val points = (1 to nCircles).flatMap { i =>
126+
generateCircle(i, i * nPoints)
136127
}.zipWithIndex
137128
val rdd = sc.parallelize(points)
138129
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
139130
if (i0 < i1) {
140-
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
131+
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1))))
141132
} else {
142133
None
143134
}
@@ -148,11 +139,9 @@ object PowerIterationClusteringExample {
148139
/**
149140
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
150141
*/
151-
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double): Double = {
152-
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
153-
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
142+
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double)): Double = {
154143
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
155-
coeff * math.exp(expCoeff * ssquares)
144+
math.exp(-ssquares / 2.0)
156145
}
157146
}
158147
// scalastyle:on println

mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.{Logging, SparkContext, SparkException}
2525
import org.apache.spark.annotation.Since
2626
import org.apache.spark.api.java.JavaRDD
2727
import org.apache.spark.graphx._
28-
import org.apache.spark.graphx.impl.GraphImpl
2928
import org.apache.spark.mllib.linalg.Vectors
3029
import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
3130
import org.apache.spark.rdd.RDD
@@ -264,10 +263,12 @@ object PowerIterationClustering extends Logging {
264263
},
265264
mergeMsg = _ + _,
266265
TripletFields.EdgeOnly)
267-
GraphImpl.fromExistingRDDs(vD, graph.edges)
266+
Graph(vD, graph.edges)
268267
.mapTriplets(
269268
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
270-
TripletFields.Src)
269+
new TripletFields(/* useSrc */ true,
270+
/* useDst */ false,
271+
/* useEdge */ true))
271272
}
272273

273274
/**
@@ -293,10 +294,12 @@ object PowerIterationClustering extends Logging {
293294
},
294295
mergeMsg = _ + _,
295296
TripletFields.EdgeOnly)
296-
GraphImpl.fromExistingRDDs(vD, gA.edges)
297+
Graph(vD, gA.edges)
297298
.mapTriplets(
298299
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
299-
TripletFields.Src)
300+
new TripletFields(/* useSrc */ true,
301+
/* useDst */ false,
302+
/* useEdge */ true))
300303
}
301304

302305
/**
@@ -317,7 +320,7 @@ object PowerIterationClustering extends Logging {
317320
}, preservesPartitioning = true).cache()
318321
val sum = r.values.map(math.abs).sum()
319322
val v0 = r.mapValues(x => x / sum)
320-
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
323+
Graph(VertexRDD(v0), g.edges)
321324
}
322325

323326
/**
@@ -332,7 +335,7 @@ object PowerIterationClustering extends Logging {
332335
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
333336
val sum = g.vertices.values.sum()
334337
val v0 = g.vertices.mapValues(_ / sum)
335-
GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
338+
Graph(VertexRDD(v0), g.edges)
336339
}
337340

338341
/**
@@ -357,7 +360,9 @@ object PowerIterationClustering extends Logging {
357360
val v = curG.aggregateMessages[Double](
358361
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
359362
mergeMsg = _ + _,
360-
TripletFields.Dst).cache()
363+
new TripletFields(/* useSrc */ false,
364+
/* useDst */ true,
365+
/* useEdge */ true)).cache()
361366
// normalize v
362367
val norm = v.values.map(math.abs).sum()
363368
logInfo(s"$msgPrefix: norm(v) = $norm.")
@@ -370,7 +375,7 @@ object PowerIterationClustering extends Logging {
370375
diffDelta = math.abs(delta - prevDelta)
371376
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
372377
// update v
373-
curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
378+
curG = Graph(VertexRDD(v1), g.edges)
374379
prevDelta = delta
375380
}
376381
curG.vertices
@@ -387,7 +392,6 @@ object PowerIterationClustering extends Logging {
387392
val points = v.mapValues(x => Vectors.dense(x)).cache()
388393
val model = new KMeans()
389394
.setK(k)
390-
.setRuns(5)
391395
.setSeed(0L)
392396
.run(points.values)
393397
points.mapValues(p => model.predict(p)).cache()

mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
3030

3131
import org.apache.spark.mllib.clustering.PowerIterationClustering._
3232

33+
/** Generates a circle of points. */
34+
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
35+
Array.tabulate(n) { i =>
36+
val theta = 2.0 * math.Pi * i / n
37+
(r * math.cos(theta), r * math.sin(theta))
38+
}
39+
}
40+
41+
/** Computes Gaussian similarity. */
42+
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
43+
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
44+
math.exp(-dist2 / 2.0)
45+
}
46+
3347
test("power iteration clustering") {
34-
/*
35-
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
36-
edge (3, 4).
37-
38-
15-14 -13 -12
39-
| |
40-
4 . 3 - 2 11
41-
| | x | |
42-
5 0 - 1 10
43-
| |
44-
6 - 7 - 8 - 9
45-
*/
48+
// Generate two circles following the example in the PIC paper.
49+
val r1 = 1.0
50+
val n1 = 10
51+
val r2 = 4.0
52+
val n2 = 40
53+
val n = n1 + n2
54+
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
55+
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
56+
(i.toLong, j.toLong, sim(points(i), points(j)))
57+
}
4658

47-
val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
48-
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
49-
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
50-
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
5159
val model = new PowerIterationClustering()
5260
.setK(2)
61+
.setMaxIterations(40)
5362
.run(sc.parallelize(similarities, 2))
5463
val predictions = Array.fill(2)(mutable.Set.empty[Long])
5564
model.assignments.collect().foreach { a =>
5665
predictions(a.cluster) += a.id
5766
}
58-
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
67+
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
5968

6069
val model2 = new PowerIterationClustering()
6170
.setK(2)
71+
.setMaxIterations(10)
6272
.setInitializationMode("degree")
6373
.run(sc.parallelize(similarities, 2))
6474
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
6575
model2.assignments.collect().foreach { a =>
6676
predictions2(a.cluster) += a.id
6777
}
68-
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
78+
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
6979
}
7080

7181
test("power iteration clustering on graph") {
72-
/*
73-
We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
74-
edge (3, 4).
75-
76-
15-14 -13 -12
77-
| |
78-
4 . 3 - 2 11
79-
| | x | |
80-
5 0 - 1 10
81-
| |
82-
6 - 7 - 8 - 9
83-
*/
84-
85-
val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
86-
(1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
87-
(4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
88-
(10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
82+
// Generate two circles following the example in the PIC paper.
83+
val r1 = 1.0
84+
val n1 = 10
85+
val r2 = 4.0
86+
val n2 = 40
87+
val n = n1 + n2
88+
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
89+
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
90+
(i.toLong, j.toLong, sim(points(i), points(j)))
91+
}
8992

9093
val edges = similarities.flatMap { case (i, j, s) =>
9194
if (i != j) {
@@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
98101

99102
val model = new PowerIterationClustering()
100103
.setK(2)
104+
.setMaxIterations(40)
101105
.run(graph)
102106
val predictions = Array.fill(2)(mutable.Set.empty[Long])
103107
model.assignments.collect().foreach { a =>
104108
predictions(a.cluster) += a.id
105109
}
106-
assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
110+
assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
107111

108112
val model2 = new PowerIterationClustering()
109113
.setK(2)
114+
.setMaxIterations(10)
110115
.setInitializationMode("degree")
111116
.run(sc.parallelize(similarities, 2))
112117
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
113118
model2.assignments.collect().foreach { a =>
114119
predictions2(a.cluster) += a.id
115120
}
116-
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
121+
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
117122
}
118123

119124
test("normalize and powerIter") {

python/pyspark/mllib/clustering.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -571,12 +571,25 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
571571
572572
Model produced by [[PowerIterationClustering]].
573573
574-
>>> data = [(0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0), (1, 3, 1.0),
575-
... (2, 3, 1.0), (3, 4, 0.1), (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0),
576-
... (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0), (10, 11, 1.0),
577-
... (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0)]
578-
>>> rdd = sc.parallelize(data, 2)
579-
>>> model = PowerIterationClustering.train(rdd, 2, 100)
574+
>>> import math
575+
>>> def genCircle(r, n):
576+
... points = []
577+
... for i in range(0, n):
578+
... theta = 2.0 * math.pi * i / n
579+
... points.append((r * math.cos(theta), r * math.sin(theta)))
580+
... return points
581+
>>> def sim(x, y):
582+
... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
583+
... return math.exp(-dist2 / 2.0)
584+
>>> r1 = 1.0
585+
>>> n1 = 10
586+
>>> r2 = 4.0
587+
>>> n2 = 40
588+
>>> n = n1 + n2
589+
>>> points = genCircle(r1, n1) + genCircle(r2, n2)
590+
>>> similarities = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)]
591+
>>> rdd = sc.parallelize(similarities, 2)
592+
>>> model = PowerIterationClustering.train(rdd, 2, 40)
580593
>>> model.k
581594
2
582595
>>> result = sorted(model.assignments().collect(), key=lambda x: x.id)

0 commit comments

Comments
 (0)