Skip to content

Commit 1859701

Browse files
committed
passed compile
1 parent 834ada2 commit 1859701

File tree

11 files changed

+165
-119
lines changed

11 files changed

+165
-119
lines changed

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 84 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -114,74 +114,125 @@ class PythonMLLibAPI extends Serializable {
114114
java.util.LinkedList[java.lang.Object] = {
115115
val data = dataBytesJRDD.rdd.map(xBytes => {
116116
val x = deserializeDoubleVector(xBytes)
117-
LabeledPoint(x(0), x.slice(1, x.length))
117+
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
118118
})
119119
val initialWeights = deserializeDoubleVector(initialWeightsBA)
120120
val model = trainFunc(data, initialWeights)
121121
val ret = new java.util.LinkedList[java.lang.Object]()
122-
ret.add(serializeDoubleVector(model.weights))
122+
ret.add(serializeDoubleVector(model.weights.toArray))
123123
ret.add(model.intercept: java.lang.Double)
124124
ret
125125
}
126126

127127
/**
128128
* Java stub for Python mllib LinearRegressionWithSGD.train()
129129
*/
130-
def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
131-
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
130+
def trainLinearRegressionModelWithSGD(
131+
dataBytesJRDD: JavaRDD[Array[Byte]],
132+
numIterations: Int,
133+
stepSize: Double,
134+
miniBatchFraction: Double,
132135
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
133-
trainRegressionModel((data, initialWeights) =>
134-
LinearRegressionWithSGD.train(data, numIterations, stepSize,
135-
miniBatchFraction, initialWeights),
136-
dataBytesJRDD, initialWeightsBA)
136+
trainRegressionModel(
137+
(data, initialWeights) =>
138+
LinearRegressionWithSGD.train(
139+
data,
140+
numIterations,
141+
stepSize,
142+
miniBatchFraction,
143+
Vectors.dense(initialWeights)),
144+
dataBytesJRDD,
145+
initialWeightsBA)
137146
}
138147

139148
/**
140149
* Java stub for Python mllib LassoWithSGD.train()
141150
*/
142-
def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
143-
stepSize: Double, regParam: Double, miniBatchFraction: Double,
151+
def trainLassoModelWithSGD(
152+
dataBytesJRDD: JavaRDD[Array[Byte]],
153+
numIterations: Int,
154+
stepSize: Double,
155+
regParam: Double,
156+
miniBatchFraction: Double,
144157
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
145-
trainRegressionModel((data, initialWeights) =>
146-
LassoWithSGD.train(data, numIterations, stepSize, regParam,
147-
miniBatchFraction, initialWeights),
148-
dataBytesJRDD, initialWeightsBA)
158+
trainRegressionModel(
159+
(data, initialWeights) =>
160+
LassoWithSGD.train(
161+
data,
162+
numIterations,
163+
stepSize,
164+
regParam,
165+
miniBatchFraction,
166+
Vectors.dense(initialWeights)),
167+
dataBytesJRDD,
168+
initialWeightsBA)
149169
}
150170

151171
/**
152172
* Java stub for Python mllib RidgeRegressionWithSGD.train()
153173
*/
154-
def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
155-
stepSize: Double, regParam: Double, miniBatchFraction: Double,
174+
def trainRidgeModelWithSGD(
175+
dataBytesJRDD: JavaRDD[Array[Byte]],
176+
numIterations: Int,
177+
stepSize: Double,
178+
regParam: Double,
179+
miniBatchFraction: Double,
156180
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
157-
trainRegressionModel((data, initialWeights) =>
158-
RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
159-
miniBatchFraction, initialWeights),
160-
dataBytesJRDD, initialWeightsBA)
181+
trainRegressionModel(
182+
(data, initialWeights) =>
183+
RidgeRegressionWithSGD.train(
184+
data,
185+
numIterations,
186+
stepSize,
187+
regParam,
188+
miniBatchFraction,
189+
Vectors.dense(initialWeights)),
190+
dataBytesJRDD,
191+
initialWeightsBA)
161192
}
162193

163194
/**
164195
* Java stub for Python mllib SVMWithSGD.train()
165196
*/
166-
def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
167-
stepSize: Double, regParam: Double, miniBatchFraction: Double,
197+
def trainSVMModelWithSGD(
198+
dataBytesJRDD: JavaRDD[Array[Byte]],
199+
numIterations: Int,
200+
stepSize: Double,
201+
regParam: Double,
202+
miniBatchFraction: Double,
168203
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
169-
trainRegressionModel((data, initialWeights) =>
170-
SVMWithSGD.train(data, numIterations, stepSize, regParam,
171-
miniBatchFraction, initialWeights),
172-
dataBytesJRDD, initialWeightsBA)
204+
trainRegressionModel(
205+
(data, initialWeights) =>
206+
SVMWithSGD.train(
207+
data,
208+
numIterations,
209+
stepSize,
210+
regParam,
211+
miniBatchFraction,
212+
Vectors.dense(initialWeights)),
213+
dataBytesJRDD,
214+
initialWeightsBA)
173215
}
174216

175217
/**
176218
* Java stub for Python mllib LogisticRegressionWithSGD.train()
177219
*/
178-
def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
179-
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
220+
def trainLogisticRegressionModelWithSGD(
221+
dataBytesJRDD: JavaRDD[Array[Byte]],
222+
numIterations: Int,
223+
stepSize: Double,
224+
miniBatchFraction: Double,
180225
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
181-
trainRegressionModel((data, initialWeights) =>
182-
LogisticRegressionWithSGD.train(data, numIterations, stepSize,
183-
miniBatchFraction, initialWeights),
184-
dataBytesJRDD, initialWeightsBA)
226+
trainRegressionModel(
227+
(data, initialWeights) =>
228+
LogisticRegressionWithSGD.train(
229+
data,
230+
numIterations,
231+
stepSize,
232+
miniBatchFraction,
233+
Vectors.dense(initialWeights)),
234+
dataBytesJRDD,
235+
initialWeightsBA)
185236
}
186237

187238
/**
@@ -192,7 +243,7 @@ class PythonMLLibAPI extends Serializable {
192243
{
193244
val data = dataBytesJRDD.rdd.map(xBytes => {
194245
val x = deserializeDoubleVector(xBytes)
195-
LabeledPoint(x(0), x.slice(1, x.length))
246+
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
196247
})
197248
val model = NaiveBayes.train(data, lambda)
198249
val ret = new java.util.LinkedList[java.lang.Object]()

mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.mllib.classification
1919

2020
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.mllib.linalg.Vector
2122

2223
trait ClassificationModel extends Serializable {
2324
/**
@@ -26,13 +27,13 @@ trait ClassificationModel extends Serializable {
2627
* @param testData RDD representing data points to be predicted
2728
* @return RDD[Int] where each entry contains the corresponding prediction
2829
*/
29-
def predict(testData: RDD[Array[Double]]): RDD[Double]
30+
def predict(testData: RDD[Vector]): RDD[Double]
3031

3132
/**
3233
* Predict values for a single data point using the model trained.
3334
*
3435
* @param testData array representing a single data point
3536
* @return Int prediction from the trained model
3637
*/
37-
def predict(testData: Array[Double]): Double
38+
def predict(testData: Vector): Double
3839
}

mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ import org.apache.spark.mllib.optimization._
2525
import org.apache.spark.mllib.regression._
2626
import org.apache.spark.mllib.util.MLUtils
2727
import org.apache.spark.mllib.util.DataValidators
28-
29-
import org.jblas.DoubleMatrix
28+
import org.apache.spark.mllib.linalg.Vector
3029

3130
/**
3231
* Classification model trained using Logistic Regression.
@@ -35,14 +34,14 @@ import org.jblas.DoubleMatrix
3534
* @param intercept Intercept computed for this model.
3635
*/
3736
class LogisticRegressionModel(
38-
override val weights: Array[Double],
37+
override val weights: Vector,
3938
override val intercept: Double)
4039
extends GeneralizedLinearModel(weights, intercept)
4140
with ClassificationModel with Serializable {
4241

43-
override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
42+
override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
4443
intercept: Double) = {
45-
val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
44+
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
4645
round(1.0/ (1.0 + math.exp(margin * -1)))
4746
}
4847
}
@@ -73,7 +72,7 @@ class LogisticRegressionWithSGD private (
7372
*/
7473
def this() = this(1.0, 100, 0.0, 1.0)
7574

76-
def createModel(weights: Array[Double], intercept: Double) = {
75+
def createModel(weights: Vector, intercept: Double) = {
7776
new LogisticRegressionModel(weights, intercept)
7877
}
7978
}
@@ -105,11 +104,9 @@ object LogisticRegressionWithSGD {
105104
numIterations: Int,
106105
stepSize: Double,
107106
miniBatchFraction: Double,
108-
initialWeights: Array[Double])
109-
: LogisticRegressionModel =
110-
{
111-
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
112-
input, initialWeights)
107+
initialWeights: Vector): LogisticRegressionModel = {
108+
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
109+
.run(input, initialWeights)
113110
}
114111

115112
/**
@@ -128,11 +125,9 @@ object LogisticRegressionWithSGD {
128125
input: RDD[LabeledPoint],
129126
numIterations: Int,
130127
stepSize: Double,
131-
miniBatchFraction: Double)
132-
: LogisticRegressionModel =
133-
{
134-
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
135-
input)
128+
miniBatchFraction: Double): LogisticRegressionModel = {
129+
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
130+
.run(input)
136131
}
137132

138133
/**
@@ -150,9 +145,7 @@ object LogisticRegressionWithSGD {
150145
def train(
151146
input: RDD[LabeledPoint],
152147
numIterations: Int,
153-
stepSize: Double)
154-
: LogisticRegressionModel =
155-
{
148+
stepSize: Double): LogisticRegressionModel = {
156149
train(input, numIterations, stepSize, 1.0)
157150
}
158151

@@ -168,9 +161,7 @@ object LogisticRegressionWithSGD {
168161
*/
169162
def train(
170163
input: RDD[LabeledPoint],
171-
numIterations: Int)
172-
: LogisticRegressionModel =
173-
{
164+
numIterations: Int): LogisticRegressionModel = {
174165
train(input, numIterations, 1.0, 1.0)
175166
}
176167

@@ -183,7 +174,7 @@ object LogisticRegressionWithSGD {
183174
val sc = new SparkContext(args(0), "LogisticRegression")
184175
val data = MLUtils.loadLabeledData(sc, args(1))
185176
val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
186-
println("Weights: " + model.weights.mkString("[", ", ", "]"))
177+
println("Weights: " + model.weights)
187178
println("Intercept: " + model.intercept)
188179

189180
sc.stop()

mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.{SparkContext, Logging}
2525
import org.apache.spark.mllib.regression.LabeledPoint
2626
import org.apache.spark.rdd.RDD
2727
import org.apache.spark.mllib.util.MLUtils
28+
import org.apache.spark.mllib.linalg.Vector
2829

2930
/**
3031
* Model for Naive Bayes Classifiers.
@@ -39,9 +40,11 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
3940
private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
4041
private val _theta = new DoubleMatrix(theta)
4142

42-
def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
43+
override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
4344

44-
def predict(testData: Array[Double]): Double = {
45+
override def predict(testData: Vector): Double = predict(testData.toArray)
46+
47+
private def predict(testData: Array[Double]): Double = {
4548
val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
4649
val result = _pi.add(_theta.mmul(dataMatrix))
4750
result.argmax()
@@ -70,17 +73,26 @@ class NaiveBayes private (var lambda: Double)
7073
/**
7174
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
7275
*
73-
* @param data RDD of (label, array of features) pairs.
76+
* @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
7477
*/
7578
def run(data: RDD[LabeledPoint]) = {
79+
runRaw(data.map(v => (v.label, v.features.toArray)))
80+
}
81+
82+
/**
83+
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
84+
*
85+
* @param data RDD of (label, array of features) pairs.
86+
*/
87+
private def runRaw(data: RDD[(Double, Array[Double])]) = {
7688
// Aggregates all sample points to driver side to get sample count and summed feature vector
7789
// for each label. The shape of `zeroCombiner` & `aggregated` is:
7890
//
7991
// label: Int -> (count: Int, featuresSum: DoubleMatrix)
8092
val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
8193
val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
8294
point match {
83-
case LabeledPoint(label, features) =>
95+
case (label, features) =>
8496
val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
8597
val fs = new DoubleMatrix(features.length, 1, features: _*)
8698
combiner += label.toInt -> (count + 1, featuresSum.addi(fs))

0 commit comments

Comments
 (0)