Skip to content

[SPARK-1357] [MLLIB] Annotate developer and experimental APIs #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.mllib.api.python

import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
Expand All @@ -28,8 +29,11 @@ import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD

/**
* :: DeveloperApi ::
*
* The Java stubs necessary for the Python mllib bindings.
*/
@DeveloperApi
class PythonMLLibAPI extends Serializable {
private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
val packetLength = bytes.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class LogisticRegressionModel(
this
}

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
override protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
val score = 1.0/ (1.0 + math.exp(-margin))
Expand All @@ -71,27 +71,27 @@ class LogisticRegressionModel(
* NOTE: Labels used in Logistic Regression should be {0, 1}
*/
class LogisticRegressionWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
private var stepSize: Double,
private var numIterations: Int,
private var regParam: Double,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

val gradient = new LogisticGradient()
val updater = new SimpleUpdater()
private val gradient = new LogisticGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
override val validators = List(DataValidators.classificationLabels)
override protected val validators = List(DataValidators.binaryLabelValidator)

/**
* Construct a LogisticRegression object with default parameters
*/
def this() = this(1.0, 100, 0.0, 1.0)

def createModel(weights: Vector, intercept: Double) = {
override protected def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}

import org.apache.spark.annotation.Experimental
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vector
Expand All @@ -27,11 +28,16 @@ import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

/**
* :: Experimental ::
*
* Model for Naive Bayes Classifiers.
*
* @param pi Log of class priors, whose dimension is C.
* @param theta Log of class conditional probabilities, whose dimension is CxD.
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
*/
@Experimental
class NaiveBayesModel(
val labels: Array[Double],
val pi: Array[Double],
Expand All @@ -40,14 +46,17 @@ class NaiveBayesModel(
private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)

var i = 0
while (i < theta.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j += 1
{
// Need to put an extra pair of braces to prevent Scala treating `i` as a member.
var i = 0
while (i < theta.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j += 1
}
i += 1
}
i += 1
}

override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
Expand All @@ -65,7 +74,7 @@ class NaiveBayesModel(
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*/
class NaiveBayes private (var lambda: Double) extends Serializable with Logging {
class NaiveBayes private (private var lambda: Double) extends Serializable with Logging {

def this() = this(1.0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class SVMModel(
this
}

override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double) = {
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
threshold match {
Expand All @@ -70,28 +72,27 @@ class SVMModel(
* NOTE: Labels used in SVM should be {0, 1}.
*/
class SVMWithSGD private (
var stepSize: Double,
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
private var stepSize: Double,
private var numIterations: Int,
private var regParam: Double,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {

val gradient = new HingeGradient()
val updater = new SquaredL2Updater()
private val gradient = new HingeGradient()
private val updater = new SquaredL2Updater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)

override val validators = List(DataValidators.classificationLabels)
override protected val validators = List(DataValidators.binaryLabelValidator)

/**
* Construct a SVM object with default parameters
*/
def this() = this(1.0, 100, 1.0, 1.0)

def createModel(weights: Vector, intercept: Double) = {
override protected def createModel(weights: Vector, intercept: Double) = {
new SVMModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}

import org.apache.spark.annotation.Experimental
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
Expand All @@ -37,12 +38,17 @@ import org.apache.spark.util.random.XORShiftRandom
* to it should be cached by the user.
*/
class KMeans private (
var k: Int,
var maxIterations: Int,
var runs: Int,
var initializationMode: String,
var initializationSteps: Int,
var epsilon: Double) extends Serializable with Logging {
private var k: Int,
private var maxIterations: Int,
private var runs: Int,
private var initializationMode: String,
private var initializationSteps: Int,
private var epsilon: Double) extends Serializable with Logging {

/**
* Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1,
* initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}.
*/
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)

/** Set the number of clusters to create (k). Default: 2. */
Expand Down Expand Up @@ -71,6 +77,8 @@ class KMeans private (
}

/**
* :: Experimental ::
*
* Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
Expand Down Expand Up @@ -316,15 +324,36 @@ object KMeans {
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int = 1,
initializationMode: String = K_MEANS_PARALLEL): KMeansModel = {
runs: Int,
initializationMode: String): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.run(data)
}

/**
* Trains a k-means model using specified parameters and the default values for unspecified.
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}

/**
* Trains a k-means model using specified parameters and the default values for unspecified.
*/
def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}

/**
* Returns the index of the closest center to the given point, as well as the squared distance.
*/
Expand Down Expand Up @@ -369,6 +398,10 @@ object KMeans {
MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm)
}

/**
* :: Experimental ::
*/
@Experimental
def main(args: Array[String]) {
if (args.length < 4) {
println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]")
Expand Down
19 changes: 11 additions & 8 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ trait Vector extends Serializable {

/**
* Factory methods for [[org.apache.spark.mllib.linalg.Vector]].
* We don't use the name `Vector` because Scala imports
* [[scala.collection.immutable.Vector]] by default.
*/
object Vectors {

/**
* Creates a dense vector.
* Creates a dense vector from its values.
*/
@varargs
def dense(firstValue: Double, otherValues: Double*): Vector =
Expand Down Expand Up @@ -158,20 +160,21 @@ class DenseVector(val values: Array[Double]) extends Vector {
/**
* A sparse vector represented by an index array and an value array.
*
* @param n size of the vector.
* @param size size of the vector.
* @param indices index array, assume to be strictly increasing.
* @param values value array, must have the same length as the index array.
*/
class SparseVector(val n: Int, val indices: Array[Int], val values: Array[Double]) extends Vector {

override def size: Int = n
class SparseVector(
override val size: Int,
val indices: Array[Int],
val values: Array[Double]) extends Vector {

override def toString: String = {
"(" + n + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
"(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
}

override def toArray: Array[Double] = {
val data = new Array[Double](n)
val data = new Array[Double](size)
var i = 0
val nnz = indices.length
while (i < nnz) {
Expand All @@ -181,5 +184,5 @@ class SparseVector(val n: Int, val indices: Array[Int], val values: Array[Double
data
}

private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, n)
private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
Expand All @@ -32,6 +33,8 @@ import org.apache.spark.mllib.linalg.Vectors
case class MatrixEntry(i: Long, j: Long, value: Double)

/**
* :: Experimental ::
*
* Represents a matrix in coordinate format.
*
* @param entries matrix entries
Expand All @@ -40,6 +43,7 @@ case class MatrixEntry(i: Long, j: Long, value: Double)
* @param nCols number of columns. A non-positive value means unknown, and then the number of
* columns will be determined by the max column index plus one.
*/
@Experimental
class CoordinateMatrix(
val entries: RDD[MatrixEntry],
private var nRows: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.mllib.linalg.Matrix

/**
* Represents a distributively stored matrix backed by one or more RDDs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@ package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.SingularValueDecomposition

/** Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]]. */
/**
* :: Experimental ::
*
* Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]].
*/
@Experimental
case class IndexedRow(index: Long, vector: Vector)

/**
* :: Experimental ::
*
* Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
* indexed rows.
*
Expand All @@ -36,6 +44,7 @@ case class IndexedRow(index: Long, vector: Vector)
* @param nCols number of columns. A non-positive value means unknown, and then the number of
* columns will be determined by the size of the first row.
*/
@Experimental
class IndexedRowMatrix(
val rows: RDD[IndexedRow],
private var nRows: Long,
Expand Down
Loading