@@ -19,28 +19,47 @@ package org.apache.spark.examples.mllib
19
19
20
20
import org .apache .log4j .{Level , Logger }
21
21
import org .apache .spark .mllib .clustering .PowerIterationClustering
22
- import org .apache .spark .mllib .linalg .Vectors
23
22
import org .apache .spark .rdd .RDD
24
23
import org .apache .spark .{SparkConf , SparkContext }
25
24
import scopt .OptionParser
26
25
27
26
/**
28
- * An example Power Iteration Clustering app. Run with
27
+ * An example Power Iteration Clustering app. Takes an input of K concentric circles
28
+ * with a total of "n" sampled points (total here means "across ALL of the circles").
29
+ * The output should be K clusters - each cluster containing precisely the points associated
30
+ * with each of the input circles.
31
+ *
32
+ * Run with
29
33
* {{{
30
- * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample
31
- * [options] <input>
34
+ * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample [options]
35
+ *
36
+ * Where options include:
37
+ * k: Number of circles/ clusters
38
+ * n: Total number of sampled points. There are proportionally more points within the
39
+ * outer/larger circles
40
+ * numIterations: Number of Power Iterations
41
+ * outerRadius: radius of the outermost of the concentric circles
32
42
* }}}
43
+ *
44
+ * Here is a sample run and output:
45
+ *
46
+ * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample -k 3 --n 30 --numIterations 15
47
+ *
48
+ * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
49
+ * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
50
+ *
51
+ *
33
52
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
34
53
*/
35
54
object PowerIterationClusteringExample {
36
55
37
56
case class Params (
38
- input : String = null ,
39
- k : Int = 3 ,
40
- numPoints : Int = 30 ,
41
- numIterations : Int = 10 ,
42
- outerRadius : Double = 3.0
43
- ) extends AbstractParams [Params ]
57
+ input : String = null ,
58
+ k : Int = 3 ,
59
+ numPoints : Int = 30 ,
60
+ numIterations : Int = 10 ,
61
+ outerRadius : Double = 3.0
62
+ ) extends AbstractParams [Params ]
44
63
45
64
def main (args : Array [String ]) {
46
65
val defaultParams = Params ()
@@ -112,21 +131,21 @@ object PowerIterationClusteringExample {
112
131
val rdd = sc.parallelize(points)
113
132
val distancesRdd = rdd.cartesian(rdd).flatMap { case ((i0, (x0, y0)), (i1, (x1, y1))) =>
114
133
if (i0 < i1) {
115
- val sim = Some ((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1))))
116
- sim
134
+ Some ((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1))))
117
135
} else {
118
136
None
119
137
}
120
138
}
121
- val coll = distancesRdd.collect
122
139
distancesRdd
123
140
}
124
141
142
+ /**
143
+ * Gaussian Similarity: http://www.stat.wisc.edu/~mchung/teaching/MIA/reading/diffusion.gaussian.kernel.pdf
144
+ */
125
145
def gaussianSimilarity (p1 : (Double , Double ), p2 : (Double , Double ), sigma : Double ) = {
126
- val sim = (1.0 /
146
+ (1.0 /
127
147
(math.sqrt(2.0 * math.Pi ) * sigma)) * math.exp((- 1.0 / (2.0 * math.pow(sigma, 2.0 ))
128
148
* (math.pow(p1._1 - p2._1, 2 ) + math.pow(p1._2 - p2._2, 2 ))))
129
- sim
130
149
}
131
150
132
151
private [mllib] def similarity (p1 : (Double , Double ), p2 : (Double , Double )) = {
@@ -141,10 +160,6 @@ object PowerIterationClusteringExample {
141
160
142
161
Logger .getRootLogger.setLevel(Level .WARN )
143
162
144
- val examples = sc.textFile(params.input).map { line =>
145
- Vectors .dense(line.split(' ' ).map(_.toDouble))
146
- }.cache()
147
-
148
163
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
149
164
val model = new PowerIterationClustering ()
150
165
.setK(params.k)
@@ -153,7 +168,7 @@ object PowerIterationClusteringExample {
153
168
154
169
val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1))
155
170
println(s " Cluster assignments: "
156
- + s " ${clusters.map { case (k, v) => s " $k -> ${v.sorted.mkString(" [" , " ," , " ]" )}" }
171
+ + s " ${clusters.toList.sortBy{ case (k,v) => v.length}. map { case (k, v) => s " $k -> ${v.sorted.mkString(" [" , " ," , " ]" )}" }
157
172
.mkString(" ," )}" )
158
173
159
174
sc.stop()
0 commit comments