Skip to content

Commit befa592

Browse files
committed
passed scala/java tests
1 parent 75c83a4 commit befa592

File tree

11 files changed

+74
-72
lines changed

11 files changed

+74
-72
lines changed

mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ abstract class Gradient extends Serializable {
4242
class LogisticGradient extends Gradient {
4343
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
4444
val brzData = data.toBreeze
45-
val brzWeights = data.toBreeze
45+
val brzWeights = weights.toBreeze
4646
val margin: Double = -1.0 * brzWeights.dot(brzData)
4747
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
4848
val gradient = brzData * gradientMultiplier
@@ -67,7 +67,7 @@ class LeastSquaresGradient extends Gradient {
6767
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
6868
val brzData = data.toBreeze
6969
val brzWeights = weights.toBreeze
70-
val diff: Double = brzWeights.dot(brzData) - label
70+
val diff = brzWeights.dot(brzData) - label
7171
val loss = diff * diff
7272
val gradient = brzData * (2.0 * diff)
7373

mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.spark.mllib.optimization
1919

20-
import org.apache.spark.Logging
21-
import org.apache.spark.rdd.RDD
22-
2320
import scala.collection.mutable.ArrayBuffer
2421

22+
import org.apache.spark.Logging
23+
import org.apache.spark.rdd.RDD
2524
import org.apache.spark.mllib.linalg.{Vectors, Vector}
2625

2726
/**
@@ -92,16 +91,15 @@ class GradientDescent(var gradient: Gradient, var updater: Updater)
9291
}
9392

9493
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
95-
96-
val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
97-
data,
98-
gradient,
99-
updater,
100-
stepSize,
101-
numIterations,
102-
regParam,
103-
miniBatchFraction,
104-
initialWeights)
94+
val (weights, _) = GradientDescent.runMiniBatchSGD(
95+
data,
96+
gradient,
97+
updater,
98+
stepSize,
99+
numIterations,
100+
regParam,
101+
miniBatchFraction,
102+
initialWeights)
105103
weights
106104
}
107105

mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
2121

2222
import org.apache.spark.mllib.linalg.Vector
2323

24-
trait Optimizer {
24+
trait Optimizer extends Serializable {
2525

2626
/**
2727
* Solve the provided convex optimization problem.

mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,12 @@ abstract class Updater extends Serializable {
6363
* Uses a step-size decreasing with the square root of the number of iterations.
6464
*/
6565
class SimpleUpdater extends Updater {
66-
override def compute(weightsOld: Vector, gradient: Vector,
67-
stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = {
66+
override def compute(
67+
weightsOld: Vector,
68+
gradient: Vector,
69+
stepSize: Double,
70+
iter: Int,
71+
regParam: Double): (Vector, Double) = {
6872
val thisIterStepSize = stepSize / math.sqrt(iter)
6973
val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize
7074
(Vectors.fromBreeze(brzWeights), 0)
@@ -101,9 +105,11 @@ class L1Updater extends Updater {
101105
val brzWeights = weightsOld.toBreeze - gradient.toBreeze * thisIterStepSize
102106
// Apply proximal operator (soft thresholding)
103107
val shrinkageVal = regParam * thisIterStepSize
104-
(0 until brzWeights.length).foreach { i =>
108+
var i = 0
109+
while (i < brzWeights.length) {
105110
val wi = brzWeights(i)
106111
brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal)
112+
i += 1
107113
}
108114

109115
(Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam)

mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,12 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
118118

119119
/** Prepends one to the input vector. */
120120
private def prependOne(vector: Vector): Vector = {
121-
val vectorWithIntercept = vector.toBreeze match {
121+
val vector1 = vector.toBreeze match {
122122
case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv)
123123
case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv)
124124
case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
125125
}
126-
Vectors.fromBreeze(vectorWithIntercept)
126+
Vectors.fromBreeze(vector1)
127127
}
128128

129129
/**
@@ -151,10 +151,14 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
151151
}
152152

153153
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
154-
val brzWeightsWithIntercept = weightsWithIntercept.toBreeze
155-
val intercept = if (addIntercept) brzWeightsWithIntercept(0) else 0.0
156-
val brzWeights = if (addIntercept) brzWeightsWithIntercept(1 to -1) else brzWeightsWithIntercept
157-
158-
createModel(Vectors.fromBreeze(brzWeights), intercept)
154+
val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0
155+
val weights =
156+
if (addIntercept) {
157+
Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size))
158+
} else {
159+
weightsWithIntercept
160+
}
161+
162+
createModel(weights, intercept)
159163
}
160164
}

mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ class LassoWithSGD private (
7171
// We don't want to penalize the intercept, so set this to false.
7272
super.setIntercept(false)
7373

74-
var yMean = 0.0
75-
var xColMean: BV[Double] = _
76-
var xColSd: BV[Double] = _
74+
private var yMean = 0.0
75+
private var xColMean: BV[Double] = _
76+
private var xColSd: BV[Double] = _
7777

7878
/**
7979
* Construct a Lasso object with default parameters
@@ -141,11 +141,8 @@ object LassoWithSGD {
141141
stepSize: Double,
142142
regParam: Double,
143143
miniBatchFraction: Double,
144-
initialWeights: Vector)
145-
: LassoModel =
146-
{
147-
new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
148-
initialWeights)
144+
initialWeights: Vector): LassoModel = {
145+
new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input, initialWeights)
149146
}
150147

151148
/**
@@ -165,9 +162,7 @@ object LassoWithSGD {
165162
numIterations: Int,
166163
stepSize: Double,
167164
regParam: Double,
168-
miniBatchFraction: Double)
169-
: LassoModel =
170-
{
165+
miniBatchFraction: Double): LassoModel = {
171166
new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
172167
}
173168

@@ -187,9 +182,7 @@ object LassoWithSGD {
187182
input: RDD[LabeledPoint],
188183
numIterations: Int,
189184
stepSize: Double,
190-
regParam: Double)
191-
: LassoModel =
192-
{
185+
regParam: Double): LassoModel = {
193186
train(input, numIterations, stepSize, regParam, 1.0)
194187
}
195188

@@ -205,9 +198,7 @@ object LassoWithSGD {
205198
*/
206199
def train(
207200
input: RDD[LabeledPoint],
208-
numIterations: Int)
209-
: LassoModel =
210-
{
201+
numIterations: Int): LassoModel = {
211202
train(input, numIterations, 1.0, 1.0, 1.0)
212203
}
213204

mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,9 @@ object LinearRegressionWithSGD {
9797
numIterations: Int,
9898
stepSize: Double,
9999
miniBatchFraction: Double,
100-
initialWeights: Vector)
101-
: LinearRegressionModel =
102-
{
103-
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input,
104-
initialWeights)
100+
initialWeights: Vector): LinearRegressionModel = {
101+
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
102+
.run(input, initialWeights)
105103
}
106104

107105
/**
@@ -119,9 +117,7 @@ object LinearRegressionWithSGD {
119117
input: RDD[LabeledPoint],
120118
numIterations: Int,
121119
stepSize: Double,
122-
miniBatchFraction: Double)
123-
: LinearRegressionModel =
124-
{
120+
miniBatchFraction: Double): LinearRegressionModel = {
125121
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
126122
}
127123

@@ -139,9 +135,7 @@ object LinearRegressionWithSGD {
139135
def train(
140136
input: RDD[LabeledPoint],
141137
numIterations: Int,
142-
stepSize: Double)
143-
: LinearRegressionModel =
144-
{
138+
stepSize: Double): LinearRegressionModel = {
145139
train(input, numIterations, stepSize, 1.0)
146140
}
147141

@@ -157,9 +151,7 @@ object LinearRegressionWithSGD {
157151
*/
158152
def train(
159153
input: RDD[LabeledPoint],
160-
numIterations: Int)
161-
: LinearRegressionModel =
162-
{
154+
numIterations: Int): LinearRegressionModel = {
163155
train(input, numIterations, 1.0, 1.0)
164156
}
165157

mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,10 @@ object MLUtils {
8282
* xColMean - Row vector with mean for every column (or feature) of the input data
8383
* xColSd - Row vector standard deviation for every column (or feature) of the input data.
8484
*/
85-
def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long)
86-
: (Double, Vector, Vector) = {
87-
85+
def computeStats(data: RDD[LabeledPoint], numFeatures: Int, numExamples: Long): (Double, Vector, Vector) = {
8886
val brzData = data.map { case LabeledPoint(label, features) =>
8987
(label, features.toBreeze)
9088
}
91-
9289
val aggStats = brzData.aggregate(
9390
(0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures))
9491
)(
@@ -104,9 +101,10 @@ object MLUtils {
104101
(n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2)
105102
}
106103
)
107-
108104
val (nl, sumLabel, sum, sumSq) = aggStats
105+
109106
require(nl > 0, "Input data is empty.")
107+
require(nl == numExamples)
110108

111109
val n = nl.toDouble
112110
val yMean = sumLabel / n

mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ object GradientDescentSuite {
5757
if (yVal > 0) 1 else 0
5858
}
5959

60-
val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i)))))
61-
testData
60+
(0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i))))
6261
}
6362
}
6463

@@ -86,7 +85,7 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
8685
}
8786

8887
val dataRDD = sc.parallelize(data, 2).cache()
89-
val initialWeightsWithIntercept = Vectors.dense(0.0, initialWeights: _*)
88+
val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*)
9089

9190
val (_, loss) = GradientDescent.runMiniBatchSGD(
9291
dataRDD,

mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class LassoSuite extends FunSuite with LocalSparkContext {
3434
}
3535

3636
test("Lasso local random SGD") {
37-
val nPoints = 10000
37+
val nPoints = 1000
3838

3939
val A = 2.0
4040
val B = -1.5
@@ -46,7 +46,7 @@ class LassoSuite extends FunSuite with LocalSparkContext {
4646
testRDD.cache()
4747

4848
val ls = new LassoWithSGD()
49-
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
49+
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
5050

5151
val model = ls.run(testRDD)
5252
val weight0 = model.weights(0)
@@ -66,23 +66,23 @@ class LassoSuite extends FunSuite with LocalSparkContext {
6666
}
6767

6868
test("Lasso local random SGD with initial weights") {
69-
val nPoints = 10000
69+
val nPoints = 1000
7070

7171
val A = 2.0
7272
val B = -1.5
7373
val C = 1.0e-2
7474

75-
val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
75+
val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
7676

7777
val initialB = -1.0
7878
val initialC = -1.0
79-
val initialWeights = Vectors.dense(Array(initialB, initialC))
79+
val initialWeights = Vectors.dense(initialB, initialC)
8080

8181
val testRDD = sc.parallelize(testData, 2)
8282
testRDD.cache()
8383

8484
val ls = new LassoWithSGD()
85-
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
85+
ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
8686

8787
val model = ls.run(testRDD, initialWeights)
8888
val weight0 = model.weights(0)

mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNor
2323
squaredDistance => breezeSquaredDistance}
2424

2525
import org.apache.spark.mllib.util.MLUtils._
26+
import org.apache.spark.mllib.linalg.Vectors
27+
import org.apache.spark.mllib.regression.LabeledPoint
2628

27-
class MLUtilsSuite extends FunSuite {
29+
class MLUtilsSuite extends FunSuite with LocalSparkContext {
2830

2931
test("epsilon computation") {
3032
assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
@@ -49,4 +51,16 @@ class MLUtilsSuite extends FunSuite {
4951
assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
5052
}
5153
}
54+
55+
test("compute stats") {
56+
val data = Seq.fill(3)(Seq(
57+
LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)),
58+
LabeledPoint(0.0, Vectors.dense(3.0, 4.0, 5.0))
59+
)).flatten
60+
val rdd = sc.parallelize(data, 2)
61+
val (meanLabel, mean, std) = MLUtils.computeStats(rdd, 3, 6)
62+
assert(meanLabel === 0.5)
63+
assert(mean === Vectors.dense(2.0, 3.0, 4.0))
64+
assert(std === Vectors.dense(1.0, 1.0, 1.0))
65+
}
5266
}

0 commit comments

Comments
 (0)