Skip to content

Commit 1cc979e

Browse files
mengxrtdas
authored andcommitted
[SPARK-1273] MLlib bug fixes, improvements, and doc updates for v0.9.1
Cherry-picked a few MLlib commits that are bug fixes, optimization, or doc updates for the v0.9.1 release. JIRA: https://spark-project.atlassian.net/browse/SPARK-1273 Author: Xiangrui Meng <meng@databricks.com> Author: Sean Owen <sowen@cloudera.com> Author: Andrew Tulloch <andrew@tullo.ch> Author: Chen Chao <crazyjvm@gmail.com> Closes #175 from mengxr/branch-0.9 and squashes the following commits: d8928ea [Xiangrui Meng] add Apache header to LocalSparkContext a66d386 [Xiangrui Meng] Merge remote-tracking branch 'apache/branch-0.9' into branch-0.9 a899894 [Xiangrui Meng] [SPARK-1237, 1238] Improve the computation of YtY for implicit ALS 46fe493 [Xiangrui Meng] [SPARK-1260]: faster construction of features with intercept 6340a18 [Sean Owen] MLLIB-22. Support negative implicit input in ALS f27441a [Chen Chao] MLLIB-24: url of "Collaborative Filtering for Implicit Feedback Datasets" in ALS is invalid now a26ac90 [Sean Owen] Merge pull request #460 from srowen/RandomInitialALSVectors 0564985 [Andrew Tulloch] Fixed import order 2512e67 [Andrew Tulloch] LocalSparkContext for MLlib
1 parent a4eef65 commit 1cc979e

File tree

13 files changed

+259
-195
lines changed

13 files changed

+259
-195
lines changed

mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

Lines changed: 141 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.mllib.recommendation
1919

2020
import scala.collection.mutable.{ArrayBuffer, BitSet}
21+
import scala.math.{abs, sqrt}
2122
import scala.util.Random
2223
import scala.util.Sorting
2324

@@ -63,7 +64,7 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
6364
* Alternating Least Squares matrix factorization.
6465
*
6566
* ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices,
66-
* `X` and `Y`, i.e. `Xt * Y = R`. Typically these approximations are called 'factor' matrices.
67+
* `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices.
6768
* The general approach is iterative. During each iteration, one of the factor matrices is held
6869
* constant, while the other is solved for using least squares. The newly-solved factor matrix is
6970
* then held constant while solving for the other factor matrix.
@@ -80,17 +81,22 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
8081
*
8182
* For implicit preference data, the algorithm used is based on
8283
* "Collaborative Filtering for Implicit Feedback Datasets", available at
83-
* [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here.
84+
* [[http://dx.doi.org/10.1109/ICDM.2008.22]], adapted for the blocked approach used here.
8485
*
8586
* Essentially instead of finding the low-rank approximations to the rating matrix `R`,
8687
* this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0
8788
* and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user
8889
* preferences rather than explicit ratings given to items.
8990
*/
90-
class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double,
91-
var implicitPrefs: Boolean, var alpha: Double)
92-
extends Serializable with Logging
93-
{
91+
class ALS private (
92+
var numBlocks: Int,
93+
var rank: Int,
94+
var iterations: Int,
95+
var lambda: Double,
96+
var implicitPrefs: Boolean,
97+
var alpha: Double,
98+
var seed: Long = System.nanoTime()
99+
) extends Serializable with Logging {
94100
def this() = this(-1, 10, 10, 0.01, false, 1.0)
95101

96102
/**
@@ -130,6 +136,12 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
130136
this
131137
}
132138

139+
/** Sets a random seed to have deterministic results. */
140+
def setSeed(seed: Long): ALS = {
141+
this.seed = seed
142+
this
143+
}
144+
133145
/**
134146
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
135147
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
@@ -151,9 +163,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
151163
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
152164
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
153165

154-
// Initialize user and product factors randomly, but use a deterministic seed for each partition
155-
// so that fault recovery works
156-
val seedGen = new Random()
166+
// Initialize user and product factors randomly, but use a deterministic seed for each
167+
// partition so that fault recovery works
168+
val seedGen = new Random(seed)
157169
val seed1 = seedGen.nextInt()
158170
val seed2 = seedGen.nextInt()
159171
// Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
@@ -208,21 +220,46 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
208220
*/
209221
def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
210222
if (implicitPrefs) {
211-
Option(
212-
factors.flatMapValues { case factorArray =>
213-
factorArray.view.map { vector =>
214-
val x = new DoubleMatrix(vector)
215-
x.mmul(x.transpose())
216-
}
217-
}.reduceByKeyLocally((a, b) => a.addi(b))
218-
.values
219-
.reduce((a, b) => a.addi(b))
220-
)
223+
val n = rank * (rank + 1) / 2
224+
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
225+
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
226+
L
227+
}, combOp = (L1, L2) => {
228+
L1.addi(L2)
229+
})
230+
val YtY = new DoubleMatrix(rank, rank)
231+
fillFullMatrix(LYtY, YtY)
232+
Option(YtY)
221233
} else {
222234
None
223235
}
224236
}
225237

238+
/**
239+
* Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR.
240+
*
241+
* @param L the lower triangular part of the matrix packed in an array (row major)
242+
*/
243+
private def dspr(alpha: Double, x: DoubleMatrix, L: DoubleMatrix) = {
244+
val n = x.length
245+
var i = 0
246+
var j = 0
247+
var idx = 0
248+
var axi = 0.0
249+
val xd = x.data
250+
val Ld = L.data
251+
while (i < n) {
252+
axi = alpha * xd(i)
253+
j = 0
254+
while (j <= i) {
255+
Ld(idx) += axi * xd(j)
256+
j += 1
257+
idx += 1
258+
}
259+
i += 1
260+
}
261+
}
262+
226263
/**
227264
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
228265
*/
@@ -301,7 +338,14 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
301338
* Make a random factor vector with the given random.
302339
*/
303340
private def randomFactor(rank: Int, rand: Random): Array[Double] = {
304-
Array.fill(rank)(rand.nextDouble)
341+
// Choose a unit vector uniformly at random from the unit sphere, but from the
342+
// "first quadrant" where all elements are nonnegative. This can be done by choosing
343+
// elements distributed as Normal(0,1) and taking the absolute value, and then normalizing.
344+
// This appears to create factorizations that have a slightly better reconstruction
345+
// (<1%) compared picking elements uniformly at random in [0,1].
346+
val factor = Array.fill(rank)(abs(rand.nextGaussian()))
347+
val norm = sqrt(factor.map(x => x * x).sum)
348+
factor.map(x => x / norm)
305349
}
306350

307351
/**
@@ -365,51 +409,41 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
365409
for (productBlock <- 0 until numBlocks) {
366410
for (p <- 0 until blockFactors(productBlock).length) {
367411
val x = new DoubleMatrix(blockFactors(productBlock)(p))
368-
fillXtX(x, tempXtX)
412+
tempXtX.fill(0.0)
413+
dspr(1.0, x, tempXtX)
369414
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
370415
for (i <- 0 until us.length) {
371416
implicitPrefs match {
372417
case false =>
373418
userXtX(us(i)).addi(tempXtX)
374419
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
375420
case true =>
376-
userXtX(us(i)).addi(tempXtX.mul(alpha * rs(i)))
377-
SimpleBlas.axpy(1 + alpha * rs(i), x, userXy(us(i)))
421+
// Extension to the original paper to handle rs(i) < 0. confidence is a function
422+
// of |rs(i)| instead so that it is never negative:
423+
val confidence = 1 + alpha * abs(rs(i))
424+
SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
425+
// For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
426+
// means we try to reconstruct 0. We add terms only where P = 1, so, term below
427+
// is now only added for rs(i) > 0:
428+
if (rs(i) > 0) {
429+
SimpleBlas.axpy(confidence, x, userXy(us(i)))
430+
}
378431
}
379432
}
380433
}
381434
}
382435

383436
// Solve the least-squares problem for each user and return the new feature vectors
384-
userXtX.zipWithIndex.map{ case (triangularXtX, index) =>
437+
Array.range(0, numUsers).map { index =>
385438
// Compute the full XtX matrix from the lower-triangular part we got above
386-
fillFullMatrix(triangularXtX, fullXtX)
439+
fillFullMatrix(userXtX(index), fullXtX)
387440
// Add regularization
388441
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
389442
// Solve the resulting matrix, which is symmetric and positive-definite
390443
implicitPrefs match {
391444
case false => Solve.solvePositive(fullXtX, userXy(index)).data
392-
case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data
393-
}
394-
}
395-
}
396-
397-
/**
398-
* Set xtxDest to the lower-triangular part of x transpose * x. For efficiency in summing
399-
* these matrices, we store xtxDest as only rank * (rank+1) / 2 values, namely the values
400-
* at (0,0), (1,0), (1,1), (2,0), (2,1), (2,2), etc in that order.
401-
*/
402-
private def fillXtX(x: DoubleMatrix, xtxDest: DoubleMatrix) {
403-
var i = 0
404-
var pos = 0
405-
while (i < x.length) {
406-
var j = 0
407-
while (j <= i) {
408-
xtxDest.data(pos) = x.data(i) * x.data(j)
409-
pos += 1
410-
j += 1
445+
case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data
411446
}
412-
i += 1
413447
}
414448
}
415449

@@ -436,9 +470,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
436470

437471

438472
/**
439-
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorizaton.
473+
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
440474
*/
441475
object ALS {
476+
442477
/**
443478
* Train a matrix factorization model given an RDD of ratings given by users to some products,
444479
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
@@ -451,15 +486,39 @@ object ALS {
451486
* @param iterations number of iterations of ALS (recommended: 10-20)
452487
* @param lambda regularization factor (recommended: 0.01)
453488
* @param blocks level of parallelism to split computation into
489+
* @param seed random seed
454490
*/
455491
def train(
456492
ratings: RDD[Rating],
457493
rank: Int,
458494
iterations: Int,
459495
lambda: Double,
460-
blocks: Int)
461-
: MatrixFactorizationModel =
462-
{
496+
blocks: Int,
497+
seed: Long
498+
): MatrixFactorizationModel = {
499+
new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
500+
}
501+
502+
/**
503+
* Train a matrix factorization model given an RDD of ratings given by users to some products,
504+
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
505+
* product of two lower-rank matrices of a given rank (number of features). To solve for these
506+
* features, we run a given number of iterations of ALS. This is done using a level of
507+
* parallelism given by `blocks`.
508+
*
509+
* @param ratings RDD of (userID, productID, rating) pairs
510+
* @param rank number of features to use
511+
* @param iterations number of iterations of ALS (recommended: 10-20)
512+
* @param lambda regularization factor (recommended: 0.01)
513+
* @param blocks level of parallelism to split computation into
514+
*/
515+
def train(
516+
ratings: RDD[Rating],
517+
rank: Int,
518+
iterations: Int,
519+
lambda: Double,
520+
blocks: Int
521+
): MatrixFactorizationModel = {
463522
new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings)
464523
}
465524

@@ -476,8 +535,7 @@ object ALS {
476535
* @param lambda regularization factor (recommended: 0.01)
477536
*/
478537
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double)
479-
: MatrixFactorizationModel =
480-
{
538+
: MatrixFactorizationModel = {
481539
train(ratings, rank, iterations, lambda, -1)
482540
}
483541

@@ -493,8 +551,7 @@ object ALS {
493551
* @param iterations number of iterations of ALS (recommended: 10-20)
494552
*/
495553
def train(ratings: RDD[Rating], rank: Int, iterations: Int)
496-
: MatrixFactorizationModel =
497-
{
554+
: MatrixFactorizationModel = {
498555
train(ratings, rank, iterations, 0.01, -1)
499556
}
500557

@@ -511,16 +568,42 @@ object ALS {
511568
* @param lambda regularization factor (recommended: 0.01)
512569
* @param blocks level of parallelism to split computation into
513570
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
571+
* @param seed random seed
514572
*/
515573
def trainImplicit(
516574
ratings: RDD[Rating],
517575
rank: Int,
518576
iterations: Int,
519577
lambda: Double,
520578
blocks: Int,
521-
alpha: Double)
522-
: MatrixFactorizationModel =
523-
{
579+
alpha: Double,
580+
seed: Long
581+
): MatrixFactorizationModel = {
582+
new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
583+
}
584+
585+
/**
586+
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
587+
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
588+
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
589+
* To solve for these features, we run a given number of iterations of ALS. This is done using
590+
* a level of parallelism given by `blocks`.
591+
*
592+
* @param ratings RDD of (userID, productID, rating) pairs
593+
* @param rank number of features to use
594+
* @param iterations number of iterations of ALS (recommended: 10-20)
595+
* @param lambda regularization factor (recommended: 0.01)
596+
* @param blocks level of parallelism to split computation into
597+
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
598+
*/
599+
def trainImplicit(
600+
ratings: RDD[Rating],
601+
rank: Int,
602+
iterations: Int,
603+
lambda: Double,
604+
blocks: Int,
605+
alpha: Double
606+
): MatrixFactorizationModel = {
524607
new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings)
525608
}
526609

@@ -537,8 +620,7 @@ object ALS {
537620
* @param lambda regularization factor (recommended: 0.01)
538621
*/
539622
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
540-
: MatrixFactorizationModel =
541-
{
623+
: MatrixFactorizationModel = {
542624
trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
543625
}
544626

@@ -555,8 +637,7 @@ object ALS {
555637
* @param iterations number of iterations of ALS (recommended: 10-20)
556638
*/
557639
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int)
558-
: MatrixFactorizationModel =
559-
{
640+
: MatrixFactorizationModel = {
560641
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
561642
}
562643

mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
119119
*/
120120
def run(input: RDD[LabeledPoint]) : M = {
121121
val nfeatures: Int = input.first().features.length
122-
val initialWeights = Array.fill(nfeatures)(1.0)
122+
val initialWeights = new Array[Double](nfeatures)
123123
run(input, initialWeights)
124124
}
125125

@@ -134,15 +134,15 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
134134
throw new SparkException("Input validation failed.")
135135
}
136136

137-
// Add a extra variable consisting of all 1.0's for the intercept.
137+
// Prepend an extra variable consisting of all 1.0's for the intercept.
138138
val data = if (addIntercept) {
139-
input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*)))
139+
input.map(labeledPoint => (labeledPoint.label, labeledPoint.features.+:(1.0)))
140140
} else {
141141
input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
142142
}
143143

144144
val initialWeightsWithIntercept = if (addIntercept) {
145-
Array(1.0, initialWeights:_*)
145+
initialWeights.+:(1.0)
146146
} else {
147147
initialWeights
148148
}

0 commit comments

Comments
 (0)