Skip to content

Commit e1a1ff8

Browse files
sboeschhuaweimengxr
authored andcommitted
[SPARK-5503][MLLIB] Example code for Power Iteration Clustering
Author: sboeschhuawei <stephen.boesch@huawei.com> Closes #4495 from javadba/picexamples and squashes the following commits: 3c84b14 [sboeschhuawei] PIC Examples updates from Xiangrui's comments round 5 2878675 [sboeschhuawei] Fourth round with xiangrui on PICExample d7ac350 [sboeschhuawei] Updates to PICExample from Xiangrui's comments round 3 d7f0cba [sboeschhuawei] Updates to PICExample from Xiangrui's comments round 3 cef28f4 [sboeschhuawei] Further updates to PICExample from Xiangrui's comments f7ff43d [sboeschhuawei] Update to PICExample from Xiangrui's comments efeec45 [sboeschhuawei] Update to PICExample from Xiangrui's comments 03e8de4 [sboeschhuawei] Added PICExample c509130 [sboeschhuawei] placeholder for pic examples 5864d4a [sboeschhuawei] placeholder for pic examples
1 parent c0ccd25 commit e1a1ff8

File tree

1 file changed

+160
-0
lines changed

1 file changed

+160
-0
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.mllib
19+
20+
import org.apache.log4j.{Level, Logger}
21+
import scopt.OptionParser
22+
23+
import org.apache.spark.mllib.clustering.PowerIterationClustering
24+
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.{SparkConf, SparkContext}
26+
27+
/**
28+
* An example Power Iteration Clustering http://www.icml2010.org/papers/387.pdf app.
29+
* Takes an input of K concentric circles and the number of points in the innermost circle.
30+
* The output should be K clusters - each cluster containing precisely the points associated
31+
* with each of the input circles.
32+
*
33+
* Run with
34+
* {{{
35+
* ./bin/run-example mllib.PowerIterationClusteringExample [options]
36+
*
37+
* Where options include:
38+
* k: Number of circles/clusters
39+
* n: Number of sampled points on innermost circle.. There are proportionally more points
40+
* within the outer/larger circles
41+
* maxIterations: Number of Power Iterations
42+
* outerRadius: radius of the outermost of the concentric circles
43+
* }}}
44+
*
45+
* Here is a sample run and output:
46+
*
47+
* ./bin/run-example mllib.PowerIterationClusteringExample
48+
* -k 3 --n 30 --maxIterations 15
49+
*
50+
* Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
51+
* 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
52+
*
53+
*
54+
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
55+
*/
56+
object PowerIterationClusteringExample {
57+
58+
case class Params(
59+
input: String = null,
60+
k: Int = 3,
61+
numPoints: Int = 5,
62+
maxIterations: Int = 10,
63+
outerRadius: Double = 3.0
64+
) extends AbstractParams[Params]
65+
66+
def main(args: Array[String]) {
67+
val defaultParams = Params()
68+
69+
val parser = new OptionParser[Params]("PIC Circles") {
70+
head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
71+
opt[Int]('k', "k")
72+
.text(s"number of circles (/clusters), default: ${defaultParams.k}")
73+
.action((x, c) => c.copy(k = x))
74+
opt[Int]('n', "n")
75+
.text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
76+
.action((x, c) => c.copy(numPoints = x))
77+
opt[Int]("maxIterations")
78+
.text(s"number of iterations, default: ${defaultParams.maxIterations}")
79+
.action((x, c) => c.copy(maxIterations = x))
80+
opt[Int]('r', "r")
81+
.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
82+
.action((x, c) => c.copy(numPoints = x))
83+
}
84+
85+
parser.parse(args, defaultParams).map { params =>
86+
run(params)
87+
}.getOrElse {
88+
sys.exit(1)
89+
}
90+
}
91+
92+
def run(params: Params) {
93+
val conf = new SparkConf()
94+
.setMaster("local")
95+
.setAppName(s"PowerIterationClustering with $params")
96+
val sc = new SparkContext(conf)
97+
98+
Logger.getRootLogger.setLevel(Level.WARN)
99+
100+
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
101+
val model = new PowerIterationClustering()
102+
.setK(params.k)
103+
.setMaxIterations(params.maxIterations)
104+
.run(circlesRdd)
105+
106+
val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1))
107+
val assignments = clusters.toList.sortBy { case (k, v) => v.length}
108+
val assignmentsStr = assignments
109+
.map { case (k, v) =>
110+
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
111+
}.mkString(",")
112+
val sizesStr = assignments.map {
113+
_._2.size
114+
}.sorted.mkString("(", ",", ")")
115+
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
116+
117+
sc.stop()
118+
}
119+
120+
def generateCircle(radius: Double, n: Int) = {
121+
Seq.tabulate(n) { i =>
122+
val theta = 2.0 * math.Pi * i / n
123+
(radius * math.cos(theta), radius * math.sin(theta))
124+
}
125+
}
126+
127+
def generateCirclesRdd(sc: SparkContext,
128+
nCircles: Int = 3,
129+
nPoints: Int = 30,
130+
outerRadius: Double): RDD[(Long, Long, Double)] = {
131+
132+
val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
133+
val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
134+
val points = (0 until nCircles).flatMap { cx =>
135+
generateCircle(radii(cx), groupSizes(cx))
136+
}.zipWithIndex
137+
val rdd = sc.parallelize(points)
138+
val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1)) =>
139+
if (i0 < i1) {
140+
Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
141+
} else {
142+
None
143+
}
144+
}
145+
distancesRdd
146+
}
147+
148+
/**
149+
* Gaussian Similarity: http://en.wikipedia.org/wiki/Radial_basis_function_kernel
150+
*/
151+
def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = {
152+
val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
153+
val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
154+
val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
155+
coeff * math.exp(expCoeff * ssquares)
156+
// math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2))
157+
}
158+
159+
160+
}

0 commit comments

Comments
 (0)