Skip to content

Commit 92d4752

Browse files
committed
Move the Guassian/ Affinity matrix calcs out of PIC. Presently in the test suite
1 parent 7ebd149 commit 92d4752

File tree

4 files changed

+337
-213
lines changed

4 files changed

+337
-213
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.apache.spark.examples.mllib
2+
3+
import org.apache.log4j.{Level, Logger}
4+
import org.apache.spark.mllib.clustering.PowerIterationClustering
5+
import org.apache.spark.mllib.linalg.Vectors
6+
import org.apache.spark.{SparkConf, SparkContext}
7+
import scopt.OptionParser
8+
import breeze.linalg.{DenseVector => BDV}
9+
10+
import scala.util.Random
11+
12+
/*
13+
* Licensed to the Apache Software Foundation (ASF) under one or more
14+
* contributor license agreements. See the NOTICE file distributed with
15+
* this work for additional information regarding copyright ownership.
16+
* The ASF licenses this file to You under the Apache License, Version 2.0
17+
* (the "License"); you may not use this file except in compliance with
18+
* the License. You may obtain a copy of the License at
19+
*
20+
* http://www.apache.org/licenses/LICENSE-2.0
21+
*
22+
* Unless required by applicable law or agreed to in writing, software
23+
* distributed under the License is distributed on an "AS IS" BASIS,
24+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25+
* See the License for the specific language governing permissions and
26+
* limitations under the License.
27+
*/
28+
29+
30+
/**
31+
* An example k-means app. Run with
32+
* {{{
33+
* ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] <input>
34+
* }}}
35+
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
36+
*/
37+
object PowerIterationClusteringExample {
38+
39+
40+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package org.apache.spark.examples.mllib
2+
3+
import org.apache.log4j.{Level, Logger}
4+
import org.apache.spark.mllib.linalg.Vectors
5+
import org.apache.spark.{SparkContext, SparkConf}
6+
import org.apache.spark.mllib.clustering.PowerIterationClustering
7+
import scopt.OptionParser
8+
9+
import scala.util.Random
10+
11+
/*
12+
* Licensed to the Apache Software Foundation (ASF) under one or more
13+
* contributor license agreements. See the NOTICE file distributed with
14+
* this work for additional information regarding copyright ownership.
15+
* The ASF licenses this file to You under the Apache License, Version 2.0
16+
* (the "License"); you may not use this file except in compliance with
17+
* the License. You may obtain a copy of the License at
18+
*
19+
* http://www.apache.org/licenses/LICENSE-2.0
20+
*
21+
* Unless required by applicable law or agreed to in writing, software
22+
* distributed under the License is distributed on an "AS IS" BASIS,
23+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24+
* See the License for the specific language governing permissions and
25+
* limitations under the License.
26+
*/
27+
28+
29+
/**
30+
* An example k-means app. Run with
31+
* {{{
32+
* ./bin/run-example org.apache.spark.examples.mllib.DenseKMeans [options] <input>
33+
* }}}
34+
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
35+
*/
36+
object PowerIterationClusteringForGaussian {
37+
38+
39+
val PIC = PowerIterationClustering
40+
41+
case class Params(
42+
input: String = null,
43+
k: Int = -1,
44+
numIterations: Int = PIC.defaultIterations
45+
) extends AbstractParams[Params]
46+
47+
def main(args: Array[String]) {
48+
val defaultParams = Params()
49+
50+
val parser = new OptionParser[Params]("DenseKMeans") {
51+
head("Power Iteration Clustering for Gaussian Similarity inputs.")
52+
opt[Int]('k', "k")
53+
.required()
54+
.text(s"number of clusters, required")
55+
.action((x, c) => c.copy(k = x))
56+
opt[Int]("numIterations")
57+
.text(s"number of iterations, default; ${defaultParams.numIterations}")
58+
.action((x, c) => c.copy(numIterations = x))
59+
arg[String]("<input>")
60+
.text("input paths to examples")
61+
.required()
62+
.action((x, c) => c.copy(input = x))
63+
}
64+
65+
parser.parse(args, defaultParams).map { params =>
66+
run(params)
67+
}.getOrElse {
68+
sys.exit(1)
69+
}
70+
}
71+
72+
def run(params: Params) {
73+
val conf = new SparkConf().setAppName(s"DenseKMeans with $params")
74+
val sc = new SparkContext(conf)
75+
76+
Logger.getRootLogger.setLevel(Level.WARN)
77+
78+
val examples = sc.textFile(params.input).map { line =>
79+
Vectors.dense(line.split(' ').map(_.toDouble))
80+
}.cache()
81+
82+
val numExamples = examples.count()
83+
84+
85+
sc.stop()
86+
}
87+
88+
}

mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala

Lines changed: 2 additions & 205 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,11 @@ object PowerIterationClustering {
4444
type DGraph = Graph[Double, Double]
4545
type IndexedVector[Double] = (Long, BDV[Double])
4646

47-
4847
// Terminate iteration when norm changes by less than this value
49-
private[mllib] val defaultMinNormChange: Double = 1e-11
50-
51-
// Default sigma for Gaussian Distance calculations
52-
private[mllib] val defaultSigma = 1.0
48+
val defaultMinNormChange: Double = 1e-11
5349

5450
// Default number of iterations for PIC loop
55-
private[mllib] val defaultIterations: Int = 20
56-
57-
// Default minimum affinity between points - lower than this it is considered
58-
// zero and no edge will be created
59-
private[mllib] val defaultMinAffinity = 1e-11
51+
val defaultIterations: Int = 20
6052

6153
// Do not allow divide by zero: change to this value instead
6254
val defaultDivideByZeroVal: Double = 1e-15
@@ -73,11 +65,6 @@ object PowerIterationClustering {
7365
* @param nClusters Number of clusters to create
7466
* @param nIterations Number of iterations of the PIC algorithm
7567
* that calculates primary PseudoEigenvector and Eigenvalue
76-
* @param sigma Sigma for Gaussian distribution calculation according to
77-
* [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
78-
* @param minAffinity Minimum Affinity between two Points in the input dataset: below
79-
* this threshold the affinity will be considered "close to" zero and
80-
* no Edge will be created between those Points in the sparse matrix
8168
* @param nRuns Number of runs for the KMeans clustering
8269
* @return Tuple of (Seq[(Cluster Id,Cluster Center)],
8370
* Seq[(VertexId, ClusterID Membership)]
@@ -86,8 +73,6 @@ object PowerIterationClustering {
8673
G: Graph[Double, Double],
8774
nClusters: Int,
8875
nIterations: Int = defaultIterations,
89-
sigma: Double = defaultSigma,
90-
minAffinity: Double = defaultMinAffinity,
9176
nRuns: Int = defaultKMeansRuns)
9277
: (Seq[(Int, Vector)], Seq[((VertexId, Vector), Int)]) = {
9378
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
@@ -122,45 +107,6 @@ object PowerIterationClustering {
122107
(ccs, estCollected)
123108
}
124109

125-
/**
126-
*
127-
* Create an affinity matrix
128-
*
129-
* @param sc Spark Context
130-
* @param points Input Points in format of [(VertexId,(x,y)]
131-
* where VertexId is a Long
132-
* @param sigma Sigma for Gaussian distribution calculation according to
133-
* [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
134-
* @param minAffinity Minimum Affinity between two Points in the input dataset: below
135-
* this threshold the affinity will be considered "close to" zero and
136-
* no Edge will be created between those Points in the sparse matrix
137-
* @return Tuple of (Seq[(Cluster Id,Cluster Center)],
138-
* Seq[(VertexId, ClusterID Membership)]
139-
*/
140-
def createGaussianAffinityMatrix(sc: SparkContext,
141-
points: Points,
142-
sigma: Double = defaultSigma,
143-
minAffinity: Double = defaultMinAffinity)
144-
: Graph[Double, Double] = {
145-
val vidsRdd = sc.parallelize(points.map(_._1).sorted)
146-
val nVertices = points.length
147-
148-
val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
149-
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
150-
if (logger.isDebugEnabled) {
151-
logger.debug(s"Vt(0)=${
152-
printVector(new BDV(initialVt.map {
153-
_._2
154-
}.toArray))
155-
}")
156-
}
157-
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
158-
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
159-
if (logger.isDebugEnabled) {
160-
logger.debug(printMatrixFromEdges(G.edges))
161-
}
162-
G
163-
}
164110

165111
/**
166112
* Create a Graph given an initial Vt0 and a set of Edges that
@@ -270,154 +216,5 @@ object PowerIterationClustering {
270216
initialVt
271217
}
272218

273-
/**
274-
* Calculate the Gaussian distance between two Vectors according to:
275-
*
276-
* exp( -(X1-X2)^2/2*sigma^2))
277-
*
278-
* where X1 and X2 are Vectors
279-
*
280-
* @param vect1 Input Vector1
281-
* @param vect2 Input Vector2
282-
* @param sigma Gaussian parameter sigma
283-
* @return
284-
*/
285-
private[mllib] def gaussianDist(vect1: BDV[Double], vect2: BDV[Double], sigma: Double) = {
286-
val c1c2 = vect1.toArray.zip(vect2.toArray)
287-
val dist = Math.exp((-0.5 / Math.pow(sigma, 2.0)) * c1c2.foldLeft(0.0) {
288-
case (dist: Double, (c1: Double, c2: Double)) =>
289-
dist + Math.pow(c1 - c2, 2)
290-
})
291-
dist
292-
}
293-
294-
/**
295-
* Create a sparse EdgeRDD from an array of densevectors. The elements that
296-
* are "close to" zero - as configured by the minAffinity value - do not
297-
* result in an Edge being created.
298-
*
299-
* @param sc
300-
* @param wRdd
301-
* @param minAffinity
302-
* @return
303-
*/
304-
private[mllib] def createSparseEdgesRdd(sc: SparkContext, wRdd: RDD[IndexedVector[Double]],
305-
minAffinity: Double = defaultMinAffinity) = {
306-
val labels = wRdd.map { case (vid, vect) => vid}.collect
307-
val edgesRdd = wRdd.flatMap { case (vid, vect) =>
308-
for ((dval, ix) <- vect.toArray.zipWithIndex
309-
if Math.abs(dval) >= minAffinity)
310-
yield Edge(vid, labels(ix), dval)
311-
}
312-
edgesRdd
313-
}
314-
315-
/**
316-
* Create the normalized affinity matrix "W" given a set of Points
317-
*
318-
* @param sc SparkContext
319-
* @param points Input Points in format of [(VertexId,(x,y)]
320-
* where VertexId is a Long
321-
* @param sigma Gaussian parameter sigma
322-
* @return
323-
*/
324-
private[mllib] def createNormalizedAffinityMatrix(sc: SparkContext,
325-
points: Points, sigma: Double) = {
326-
val nVertices = points.length
327-
val affinityRddNotNorm = sc.parallelize({
328-
val ivect = new Array[IndexedVector[Double]](nVertices)
329-
for (i <- 0 until points.size) {
330-
ivect(i) = new IndexedVector(points(i)._1, new BDV(Array.fill(nVertices)(100.0)))
331-
for (j <- 0 until points.size) {
332-
val dist = if (i != j) {
333-
gaussianDist(points(i)._2, points(j)._2, sigma)
334-
} else {
335-
0.0
336-
}
337-
ivect(i)._2(j) = dist
338-
}
339-
}
340-
ivect.zipWithIndex.map { case (vect, ix) =>
341-
(ix, vect)
342-
}
343-
}, nVertices)
344-
if (logger.isDebugEnabled) {
345-
logger.debug(s"Affinity:\n${
346-
printMatrix(affinityRddNotNorm.map(_._2), nVertices, nVertices)
347-
}")
348-
}
349-
val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) =>
350-
vect.foldLeft(0.0) {
351-
_ + _
352-
}
353-
}
354-
val materializedRowSums = rowSums.collect
355-
val similarityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
356-
(vid, vect.map {
357-
_ / materializedRowSums(rowx)
358-
})
359-
}
360-
if (logger.isDebugEnabled) {
361-
logger.debug(s"W:\n${printMatrix(similarityRdd, nVertices, nVertices)}")
362-
}
363-
(similarityRdd, materializedRowSums)
364-
}
365-
366-
private[mllib] def printMatrix(denseVectorRDD: RDD[LabeledPoint], i: Int, i1: Int) = {
367-
denseVectorRDD.collect.map {
368-
case (vid, dvect) => dvect.toArray
369-
}.flatten
370-
}
371-
372-
private[mllib] def printMatrixFromEdges(edgesRdd: EdgeRDD[_]) = {
373-
val edgec = edgesRdd.collect
374-
val sorted = edgec.sortWith { case (e1, e2) =>
375-
e1.srcId < e2.srcId || (e1.srcId == e2.srcId && e1.dstId <= e2.dstId)
376-
}
377-
378-
}
379-
380-
private[mllib] def makeNonZero(dval: Double, tol: Double = defaultDivideByZeroVal) = {
381-
if (Math.abs(dval) < tol) {
382-
Math.signum(dval) * tol
383-
} else {
384-
dval
385-
}
386-
}
387-
388-
private[mllib] def printMatrix(mat: BDM[Double]): String
389-
= printMatrix(mat, mat.rows, mat.cols)
390-
391-
private[mllib] def printMatrix(mat: BDM[Double], numRows: Int, numCols: Int): String
392-
= printMatrix(mat.toArray, numRows, numCols)
393-
394-
private[mllib] def printMatrix(vectors: Array[BDV[Double]]): String = {
395-
printMatrix(vectors.map {
396-
_.toArray
397-
}.flatten, vectors.length, vectors.length)
398-
}
399-
400-
private[mllib] def printMatrix(vect: Array[Double], numRows: Int, numCols: Int): String = {
401-
val darr = vect
402-
val stride = darr.length / numCols
403-
val sb = new StringBuilder
404-
def leftJust(s: String, len: Int) = {
405-
" ".substring(0, len - Math.min(len, s.length)) + s
406-
}
407-
408-
assert(darr.length == numRows * numCols,
409-
s"Input array is not correct length (${darr.length}) given #rows/cols=$numRows/$numCols")
410-
for (r <- 0 until numRows) {
411-
for (c <- 0 until numCols) {
412-
sb.append(leftJust(f"${darr(r * stride + c)}%.6f", 9) + " ")
413-
}
414-
sb.append("\n")
415-
}
416-
sb.toString
417-
}
418-
419-
private[mllib] def printVector(dvect: BDV[Double]) = {
420-
dvect.toArray.mkString(",")
421-
}
422219

423220
}

0 commit comments

Comments
 (0)