18
18
package org .apache .spark .examples .mllib
19
19
20
20
import org .apache .log4j .{Level , Logger }
21
- import org .apache .spark .mllib .clustering .KMeans
21
+ import org .apache .spark .mllib .clustering .PowerIterationClustering
22
22
import org .apache .spark .mllib .linalg .Vectors
23
+ import org .apache .spark .rdd .RDD
23
24
import org .apache .spark .{SparkConf , SparkContext }
24
25
import scopt .OptionParser
25
26
26
27
/**
27
- * An example k-means app. Run with
28
+ * An example Power Iteration Clustering app. Run with
28
29
* {{{
29
- * ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] <input>
30
+ * ./bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample
31
+ * [options] <input>
30
32
* }}}
31
33
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
32
34
*/
33
35
object PowerIterationClusteringExample {
34
36
35
- object InitializationMode extends Enumeration {
36
- type InitializationMode = Value
37
- val Random, Parallel = Value
38
- }
39
-
40
- import org .apache .spark .examples .mllib .PowerIterationClusteringExample .InitializationMode ._
41
-
42
37
case class Params (
43
- input : String = null ,
44
- k : Int = - 1 ,
45
- numIterations : Int = 10 ,
46
- initializationMode : InitializationMode = Parallel ) extends AbstractParams [Params ]
38
+ input : String = null ,
39
+ k : Int = 3 ,
40
+ numPoints : Int = 30 ,
41
+ numIterations : Int = 10 ,
42
+ outerRadius : Double = 3.0
43
+ ) extends AbstractParams [Params ]
47
44
48
45
def main (args : Array [String ]) {
49
46
val defaultParams = Params ()
50
47
51
- val parser = new OptionParser [Params ](" DenseKMeans " ) {
52
- head(" DenseKMeans : an example k-means app for dense data ." )
48
+ val parser = new OptionParser [Params ](" PIC Circles " ) {
49
+ head(" PowerIterationClusteringExample : an example PIC app using concentric circles ." )
53
50
opt[Int ]('k' , " k" )
54
- .required()
55
- .text(s " number of clusters, required " )
51
+ .text(s " number of circles (/clusters), default: ${defaultParams.k}" )
56
52
.action((x, c) => c.copy(k = x))
53
+ opt[Int ]('n' , " n" )
54
+ .text(s " number of points, default: ${defaultParams.numPoints}" )
55
+ .action((x, c) => c.copy(numPoints = x))
57
56
opt[Int ](" numIterations" )
58
- .text(s " number of iterations, default; ${defaultParams.numIterations}" )
57
+ .text(s " number of iterations, default: ${defaultParams.numIterations}" )
59
58
.action((x, c) => c.copy(numIterations = x))
60
- opt[String ](" initMode" )
61
- .text(s " initialization mode ( ${InitializationMode .values.mkString(" ," )}), " +
62
- s " default: ${defaultParams.initializationMode}" )
63
- .action((x, c) => c.copy(initializationMode = InitializationMode .withName(x)))
64
- arg[String ](" <input>" )
65
- .text(" input paths to examples" )
66
- .required()
67
- .action((x, c) => c.copy(input = x))
68
59
}
69
60
70
61
parser.parse(args, defaultParams).map { params =>
@@ -74,8 +65,78 @@ object PowerIterationClusteringExample {
74
65
}
75
66
}
76
67
68
+ def generateCircle (n : Int , r : Double ): Array [(Double , Double )] = {
69
+ val pi2 = 2 * math.Pi
70
+ (0.0 until pi2 by pi2 / n).map { x =>
71
+ (r * math.cos(x), r * math.sin(x))
72
+ }.toArray
73
+ }
74
+
75
+ def generateCirclesRdd (sc : SparkContext , nCircles : Int = 3 , nTotalPoints : Int = 30 ,
76
+ outerRadius : Double ):
77
+ RDD [(Long , Long , Double )] = {
78
+ // The circles are generated as follows:
79
+ // The Radii are equal to the largestRadius/(C - circleIndex)
80
+ // where C=Number of circles
81
+ // and the circleIndex is 0 for the innermost and (nCircles-1) for the outermost circle
82
+ // The number of points in each circle (and thus in each final cluster) is:
83
+ // x, 2x, .., nCircles*x
84
+ // Where x is found from x = N * C(C+1)/2
85
+ // The # points in the LAST circle is adjusted downwards so that the total sum is equal
86
+ // to the nTotalPoints
87
+
88
+ val smallestRad = math.ceil(nTotalPoints / (nCircles * (nCircles + 1 ) / 2.0 ))
89
+ var groupSizes = (1 to nCircles).map(gs => (gs * smallestRad).toInt)
90
+ groupSizes.zipWithIndex.map { case (gs, ix) =>
91
+ ix match {
92
+ case _ if ix == groupSizes.length => gs - (groupSizes.sum - nTotalPoints)
93
+ case _ => gs
94
+ }
95
+ }
96
+
97
+ val radii = for (cx <- 0 until nCircles) yield {
98
+ cx match {
99
+ case 0 => 0.1 * outerRadius / nCircles
100
+ case _ if cx == nCircles - 1 => outerRadius
101
+ case _ => outerRadius * cx / (nCircles - 1 )
102
+ }
103
+ }
104
+ var ix = 0
105
+ val points = for (cx <- 0 until nCircles;
106
+ px <- 0 until groupSizes(cx)) yield {
107
+ val theta = 2.0 * math.Pi * px / groupSizes(cx)
108
+ val out = (ix, (radii(cx) * math.cos(theta), radii(cx) * math.sin(theta)))
109
+ ix += 1
110
+ out
111
+ }
112
+ val rdd = sc.parallelize(points)
113
+ val distancesRdd = rdd.cartesian(rdd).flatMap { case ((i0, (x0, y0)), (i1, (x1, y1))) =>
114
+ if (i0 < i1) {
115
+ val sim = Some ((i0.toLong, i1.toLong, similarity((x0, y0), (x1, y1))))
116
+ sim
117
+ } else {
118
+ None
119
+ }
120
+ }
121
+ val coll = distancesRdd.collect
122
+ distancesRdd
123
+ }
124
+
125
+ def gaussianSimilarity (p1 : (Double , Double ), p2 : (Double , Double ), sigma : Double ) = {
126
+ val sim = (1.0 /
127
+ (math.sqrt(2.0 * math.Pi ) * sigma)) * math.exp((- 1.0 / (2.0 * math.pow(sigma, 2.0 ))
128
+ * (math.pow(p1._1 - p2._1, 2 ) + math.pow(p1._2 - p2._2, 2 ))))
129
+ sim
130
+ }
131
+
132
+ private [mllib] def similarity (p1 : (Double , Double ), p2 : (Double , Double )) = {
133
+ gaussianSimilarity(p1, p2, 1.0 )
134
+ }
135
+
77
136
def run (params : Params ) {
78
- val conf = new SparkConf ().setAppName(s " DenseKMeans with $params" )
137
+ val conf = new SparkConf ()
138
+ .setMaster(" local" )
139
+ .setAppName(s " PowerIterationClustering with $params" )
79
140
val sc = new SparkContext (conf)
80
141
81
142
Logger .getRootLogger.setLevel(Level .WARN )
@@ -84,24 +145,16 @@ object PowerIterationClusteringExample {
84
145
Vectors .dense(line.split(' ' ).map(_.toDouble))
85
146
}.cache()
86
147
87
- val numExamples = examples.count()
88
-
89
- println(s " numExamples = $numExamples. " )
90
-
91
- val initMode = params.initializationMode match {
92
- case Random => KMeans .RANDOM
93
- case Parallel => KMeans .K_MEANS_PARALLEL
94
- }
95
-
96
- val model = new KMeans ()
97
- .setInitializationMode(initMode)
148
+ val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
149
+ val model = new PowerIterationClustering ()
98
150
.setK(params.k)
99
151
.setMaxIterations(params.numIterations)
100
- .run(examples)
101
-
102
- val cost = model.computeCost(examples)
152
+ .run(circlesRdd)
103
153
104
- println(s " Total cost = $cost. " )
154
+ val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1))
155
+ println(s " Cluster assignments: "
156
+ + s " ${clusters.map { case (k, v) => s " $k -> ${v.sorted.mkString(" [" , " ," , " ]" )}" }
157
+ .mkString(" ," )}" )
105
158
106
159
sc.stop()
107
160
}
0 commit comments