Skip to content

Commit 62969fa

Browse files
committed
use BDV directly in symmetricEigs
change the computation mode to local-svd, local-eigs, and dist-eigs update tests and docs
1 parent c273771 commit 62969fa

File tree

3 files changed

+120
-139
lines changed

3 files changed

+120
-139
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private[mllib] object EigenValueDecomposition {
4747
* function.
4848
*/
4949
private[mllib] def symmetricEigs(
50-
mul: DenseVector => DenseVector,
50+
mul: BDV[Double] => BDV[Double],
5151
n: Int,
5252
k: Int,
5353
tol: Double,
@@ -102,9 +102,9 @@ private[mllib] object EigenValueDecomposition {
102102
// multiply working vector with the matrix
103103
val inputOffset = ipntr(0) - 1
104104
val outputOffset = ipntr(1) - 1
105-
val x = w(inputOffset until inputOffset + n)
106-
val y = w(outputOffset until outputOffset + n)
107-
y := BDV(mul(Vectors.fromBreeze(x).asInstanceOf[DenseVector]).toArray)
105+
val x = w.slice(inputOffset, inputOffset + n)
106+
val y = w.slice(outputOffset, outputOffset + n)
107+
y := mul(x)
108108
// call ARPACK's reverse communication
109109
arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr,
110110
workd, workl, workl.length, info)
@@ -143,13 +143,12 @@ private[mllib] object EigenValueDecomposition {
143143

144144
// copy eigenvectors in descending order of eigenvalues
145145
val sortedU = BDM.zeros[Double](n, computed)
146-
sortedEigenPairs.zipWithIndex.foreach { r => {
147-
val b = r._2 * n
148-
var i = 0
149-
while (i < n) {
150-
sortedU.data(b + i) = r._1._2(i)
151-
i += 1
152-
}
146+
sortedEigenPairs.zipWithIndex.foreach { r =>
147+
val b = r._2 * n
148+
var i = 0
149+
while (i < n) {
150+
sortedU.data(b + i) = r._1._2(i)
151+
i += 1
153152
}
154153
}
155154

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 88 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.mllib.linalg.distributed
1919

20-
import java.util
20+
import java.util.Arrays
2121

2222
import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
2323
import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
@@ -207,9 +207,9 @@ class RowMatrix(
207207
* @param v a local DenseVector whose length must match the number of columns of this matrix.
208208
* @return a local DenseVector representing the product.
209209
*/
210-
private[mllib] def multiplyGramianMatrixBy(v: DenseVector): DenseVector = {
210+
private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = {
211211
val n = numCols().toInt
212-
val vbr = rows.context.broadcast(v.toBreeze)
212+
val vbr = rows.context.broadcast(v)
213213

214214
val bv = rows.aggregate(BDV.zeros[Double](n))(
215215
seqOp = (U, r) => {
@@ -227,7 +227,7 @@ class RowMatrix(
227227
combOp = (U1, U2) => U1 += U2
228228
)
229229

230-
Vectors.fromBreeze(bv).asInstanceOf[DenseVector]
230+
bv
231231
}
232232

233233
/**
@@ -250,49 +250,51 @@ class RowMatrix(
250250
}
251251

252252
/**
253-
* Computes singular value decomposition of this matrix. Denote this matrix by A (m x n), this
253+
* Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This
254254
* will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k
255255
* singular values, U and V contain the corresponding singular vectors.
256256
*
257-
* This approach assumes n is smaller than m, and invokes a dense matrix implementation when n is
258-
* small (n < 100) or the number of requested singular values is the same as n (k == n). For
259-
* problems with large n (n >= 100) and k < n, this approach invokes a sparse matrix
260-
* implementation that provides a function to ARPACK to multiply a vector with A'A. It iteratively
261-
* calls ARPACK-dsaupd on the master node, from which we recover S and V. Then we compute U via
262-
* easy matrix multiplication as U = A * (V * S^{-1}).
257+
* At most k largest non-zero singular values and associated vectors are returned. If there are k
258+
* such values, then the dimensions of the return will be:
259+
* - U is a RowMatrix of size m x k that satisfies U' * U = eye(k),
260+
* - s is a Vector of size k, holding the singular values in descending order,
261+
* - V is a Matrix of size n x k that satisfies V' * V = eye(k).
263262
*
264-
* The dense implementation requires `n^2` doubles to fit in memory and `O(n^3)` time on the
265-
* master node.
266-
*
267-
* The sparse implementation requires `n * (6 * k + 4)` doubles to fit in memory on the master
268-
* node and approximately `O(k * nnz(A))` time distributed on all worker nodes. There is no
269-
* restriction on m (number of rows).
263+
* We assume n is smaller than m. The singular values and the right singular vectors are derived
264+
* from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix
265+
* storing the right singular vectors, is computed via matrix multiplication as
266+
* U = A * (V * S^{-1}), if requested by user. The actual method to use is determined
267+
* automatically based on the cost:
268+
* - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian
269+
* matrix first and then compute its top eigenvalues and eigenvectors locally on the driver.
270+
* This requires a single pass with O(n^2) storage on each executor and on the driver, and
271+
* O(n^2 k) time on the driver.
272+
* - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to
273+
* compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k)
274+
* passes, O(n) storage on each executor, and O(n k) storage on the driver.
270275
*
271276
* Several internal parameters are set to default values. The reciprocal condition number rCond
272277
* is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where
273278
* sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for
274-
* ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK
279+
* ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's
275280
* eigen-decomposition is set to 1e-10.
276281
*
277-
* At most k largest non-zero singular values and associated vectors are returned.
278-
* If there are k such values, then the dimensions of the return will be:
279-
*
280-
* U is a RowMatrix of size m x k that satisfies U'U = eye(k),
281-
* s is a Vector of size k, holding the singular values in descending order,
282-
* and V is a Matrix of size n x k that satisfies V'V = eye(k).
282+
* @note The conditions that decide which method to use internally and the default parameters are
283+
* subject to change.
283284
*
284-
* @param k number of leading singular values to keep (0 < k <= n). It might return less than
285-
* k if there are numerically zero singular values or there are not enough Ritz values
285+
* @param k number of leading singular values to keep (0 < k <= n). It might return less than k if
286+
* there are numerically zero singular values or there are not enough Ritz values
286287
* converged before the maximum number of Arnoldi update iterations is reached (in case
287288
* that matrix A is ill-conditioned).
288-
* @param computeU whether to compute U.
289+
* @param computeU whether to compute U
289290
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
290291
* are treated as zero, where sigma(0) is the largest singular value.
291-
* @return SingularValueDecomposition(U, s, V), U = null if computeU = false
292+
* @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
292293
*/
293-
def computeSVD(k: Int,
294-
computeU: Boolean = false,
295-
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
294+
def computeSVD(
295+
k: Int,
296+
computeU: Boolean = false,
297+
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
296298
// maximum number of Arnoldi update iterations for invoking ARPACK
297299
val maxIter = math.max(300, k * 3)
298300
// numerical tolerance for invoking ARPACK
@@ -301,87 +303,78 @@ class RowMatrix(
301303
}
302304

303305
/**
304-
* Actual SVD computation, visible for testing.
306+
* The actual SVD implementation, visible for testing.
307+
*
308+
* @param k number of leading singular values to keep (0 < k <= n)
309+
* @param computeU whether to compute U
310+
* @param rCond the reciprocal condition number
311+
* @param maxIter max number of iterations (if ARPACK is used)
312+
* @param tol termination tolerance (if ARPACK is used)
313+
* @param mode computation mode (auto: determine automatically which mode to use,
314+
* local-svd: compute gram matrix and computes its full SVD locally,
315+
* local-eigs: compute gram matrix and computes its top eigenvalues locally,
316+
* dist-eigs: compute the top eigenvalues of the gram matrix distributively)
317+
* @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
305318
*/
306-
private[mllib] def computeSVD(k: Int,
307-
computeU: Boolean,
308-
rCond: Double,
309-
maxIter: Int,
310-
tol: Double,
311-
mode: String): SingularValueDecomposition[RowMatrix, Matrix] = {
319+
private[mllib] def computeSVD(
320+
k: Int,
321+
computeU: Boolean,
322+
rCond: Double,
323+
maxIter: Int,
324+
tol: Double,
325+
mode: String): SingularValueDecomposition[RowMatrix, Matrix] = {
312326
val n = numCols().toInt
327+
require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.")
313328

314329
object SVDMode extends Enumeration {
315-
val DenseARPACK, DenseLAPACK, SparseARPACK = Value
330+
val LocalARPACK, LocalLAPACK, DistARPACK = Value
316331
}
317332

318-
val derivedMode = mode match {
319-
case "auto" => if (n < 100 || k == n) {
320-
// invoke dense implementation when n is small or k == n (since ARPACK requires k < n)
321-
require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
322-
"dense"
323-
} else {
324-
// invoke sparse implementation with ARPACK when n is large
325-
require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.")
326-
"sparse"
327-
}
328-
case "dense" => "dense"
329-
case "sparse" => "sparse"
333+
val computeMode = mode match {
334+
case "auto" =>
335+
// TODO: The conditions below are not fully tested.
336+
if (n < 100 || k > n / 2) {
337+
// If n is small or k is large compared with n, we better compute the Gramian matrix first
338+
// and then compute its eigenvalues locally, instead of making multiple passes.
339+
if (k < n / 3) {
340+
SVDMode.LocalARPACK
341+
} else {
342+
SVDMode.LocalLAPACK
343+
}
344+
} else {
345+
// If k is small compared with n, we use ARPACK with distributed multiplication.
346+
SVDMode.DistARPACK
347+
}
348+
case "local-svd" => SVDMode.LocalLAPACK
349+
case "local-eigs" => SVDMode.LocalARPACK
350+
case "dist-eigs" => SVDMode.DistARPACK
330351
case _ => throw new IllegalArgumentException(s"Do not support mode $mode.")
331352
}
332353

333-
val computeMode = derivedMode match {
334-
case "dense" => if (k < n / 2) {
335-
// when k is small, call ARPACK
336-
require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.")
337-
SVDMode.DenseARPACK
338-
} else {
339-
// when k is large, call LAPACK
340-
SVDMode.DenseLAPACK
341-
}
342-
case "sparse" => SVDMode.SparseARPACK
343-
}
344-
354+
// Compute the eigen-decomposition of A' * A.
345355
val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match {
346-
case SVDMode.DenseARPACK => {
356+
case SVDMode.LocalARPACK =>
357+
require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.")
347358
val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
348-
def multiplyDenseGramianMatrixBy(v: DenseVector): DenseVector = {
349-
Vectors.fromBreeze(G * v.toBreeze).asInstanceOf[DenseVector]
350-
}
351-
EigenValueDecomposition.symmetricEigs(multiplyDenseGramianMatrixBy, n, k, tol, maxIter)
352-
}
353-
case SVDMode.DenseLAPACK => {
359+
EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter)
360+
case SVDMode.LocalLAPACK =>
354361
val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
355-
val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = brzSvd(G)
362+
val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
356363
(sigmaSquaresFull, uFull)
357-
}
358-
case SVDMode.SparseARPACK => {
364+
case SVDMode.DistARPACK =>
365+
require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
359366
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
360-
}
361367
}
362368

363-
computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u)
364-
}
365-
366-
/**
367-
* Determine effective rank of SVD result and compute left singular vectors if required.
368-
*/
369-
private def computeSVDEffectiveRank(
370-
k: Int,
371-
n: Int,
372-
computeU: Boolean,
373-
rCond: Double,
374-
sigmaSquares: BDV[Double],
375-
u: BDM[Double]): SingularValueDecomposition[RowMatrix, Matrix] = {
376369
val sigmas: BDV[Double] = brzSqrt(sigmaSquares)
377370

378-
// Determine effective rank.
371+
// Determine the effective rank.
379372
val sigma0 = sigmas(0)
380373
val threshold = rCond * sigma0
381374
var i = 0
382-
// sigmas might have a length smaller than k, if some Ritz values do not satisfy the
383-
// convergence criterion specified by tol after maxIterations.
384-
// Thus use i < min(k, sigmas.length) instead of i < k
375+
// sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence
376+
// criterion specified by tol after max number of iterations.
377+
// Thus use i < min(k, sigmas.length) instead of i < k.
385378
if (sigmas.length < k) {
386379
logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.")
387380
}
@@ -394,12 +387,12 @@ class RowMatrix(
394387
logWarning(s"Requested $k singular values but only found $sk nonzeros.")
395388
}
396389

397-
val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk))
398-
val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
390+
val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
391+
val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
399392

400393
if (computeU) {
401394
// N = Vk * Sk^{-1}
402-
val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
395+
val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
403396
var i = 0
404397
var j = 0
405398
while (j < sk) {
@@ -484,7 +477,7 @@ class RowMatrix(
484477
if (k == n) {
485478
Matrices.dense(n, k, u.data)
486479
} else {
487-
Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
480+
Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k))
488481
}
489482
}
490483

mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -95,51 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
9595
}
9696

9797
test("svd of a full-rank matrix") {
98-
for (denseSVD <- Seq(true, false)) {
99-
for (mat <- Seq(denseMat, sparseMat)) {
98+
for (mat <- Seq(denseMat, sparseMat)) {
99+
for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
100100
val localMat = mat.toBreeze()
101101
val (localU, localSigma, localVt) = brzSvd(localMat)
102102
val localV: BDM[Double] = localVt.t.toDenseMatrix
103103
for (k <- 1 to n) {
104-
val svd = if (k < n) {
105-
if (denseSVD) {
106-
mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense")
107-
} else {
108-
mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "sparse")
109-
}
110-
} else {
111-
// when k = n, always use dense SVD
112-
mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense")
104+
val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n
105+
if (!skip) {
106+
val svd = mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode)
107+
val U = svd.U
108+
val s = svd.s
109+
val V = svd.V
110+
assert(U.numRows() === m)
111+
assert(U.numCols() === k)
112+
assert(s.size === k)
113+
assert(V.numRows === n)
114+
assert(V.numCols === k)
115+
assertColumnEqualUpToSign(U.toBreeze(), localU, k)
116+
assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
117+
assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
113118
}
114-
val U = svd.U
115-
val s = svd.s
116-
val V = svd.V
117-
assert(U.numRows() === m)
118-
assert(U.numCols() === k)
119-
assert(s.size === k)
120-
assert(V.numRows === n)
121-
assert(V.numCols === k)
122-
assertColumnEqualUpToSign(U.toBreeze(), localU, k)
123-
assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
124-
assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
125-
}
126-
val svdWithoutU = if (denseSVD) {
127-
mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "dense")
128-
} else {
129-
mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "sparse")
130119
}
120+
val svdWithoutU = mat.computeSVD(1, computeU = false, 1e-9, 300, 1e-10, mode)
131121
assert(svdWithoutU.U === null)
132122
}
133123
}
134124
}
135125

136126
test("svd of a low-rank matrix") {
137-
for (denseSVD <- Seq(true, false)) {
138-
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
139-
val mat = new RowMatrix(rows, 4, 3)
140-
val svd = if (denseSVD) mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "dense")
141-
else mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "sparse")
142-
assert(svd.s.size === 1, "should not return zero singular values")
127+
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
128+
val mat = new RowMatrix(rows, 4, 3)
129+
for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
130+
val svd = mat.computeSVD(2, computeU = true, 1e-6, 300, 1e-10, mode)
131+
assert(svd.s.size === 1, s"should not return zero singular values but got ${svd.s}")
143132
assert(svd.U.numRows() === 4)
144133
assert(svd.U.numCols() === 1)
145134
assert(svd.V.numRows === 3)

0 commit comments

Comments
 (0)