diff --git a/docs/mllib-classification-regression.md b/docs/mllib-classification-regression.md index cc8acf15ac5ee..2c42f60c2ecce 100644 --- a/docs/mllib-classification-regression.md +++ b/docs/mllib-classification-regression.md @@ -356,16 +356,17 @@ error. import org.apache.spark.SparkContext import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vectors // Load and parse the data file val data = sc.textFile("mllib/data/sample_svm_data.txt") val parsedData = data.map { line => - val parts = line.split(' ') - LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray) + val parts = line.split(' ').map(_.toDouble) + LabeledPoint(parts(0), Vectors.dense(parts.tail)) } // Run training algorithm to build the model -val numIterations = 20 +val numIterations = 100 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error @@ -401,21 +402,22 @@ val modelL1 = svmAlg.run(parsedData) The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the Mean Squared Error at the end to evaluate -[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit) +[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit). {% highlight scala %} import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vectors // Load and parse the data val data = sc.textFile("mllib/data/ridge-data/lpsa.data") val parsedData = data.map { line => val parts = line.split(',') - LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray) + LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) } // Building the model -val numIterations = 20 +val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error @@ -423,7 +425,7 @@ val valuesAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } -val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count +val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.reduce(_ + _) / valuesAndPreds.count println("training Mean Squared Error = " + MSE) {% endhighlight %} @@ -518,18 +520,22 @@ and make predictions with the resulting model to compute the training error. {% highlight python %} from pyspark.mllib.classification import LogisticRegressionWithSGD +from pyspark.mllib.regression import LabeledPoint from numpy import array # Load and parse the data +def parsePoint(line): + values = [float(x) for x in line.split(' ')] + return LabeledPoint(values[0], values[1:]) + data = sc.textFile("mllib/data/sample_svm_data.txt") -parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) -model = LogisticRegressionWithSGD.train(parsedData) +parsedData = data.map(parsePoint) # Build the model -labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)), - model.predict(point.take(range(1, point.size))))) +model = LogisticRegressionWithSGD.train(parsedData) # Evaluating the model on training data +labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) print("Training Error = " + str(trainErr)) {% endhighlight %} @@ -538,22 +544,25 @@ print("Training Error = " + str(trainErr)) The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the Mean Squared Error at the end to evaluate -[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit) +[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit). {% highlight python %} -from pyspark.mllib.regression import LinearRegressionWithSGD +from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD from numpy import array # Load and parse the data +def parsePoint(line): + values = [float(x) for x in line.replace(',', ' ').split(' ')] + return LabeledPoint(values[0], values[1:]) + data = sc.textFile("mllib/data/ridge-data/lpsa.data") -parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) +parsedData = data.map(parsePoint) # Build the model model = LinearRegressionWithSGD.train(parsedData) # Evaluate the model on training data -valuesAndPreds = parsedData.map(lambda point: (point.item(0), - model.predict(point.take(range(1, point.size))))) -MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count() +valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) +MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count() print("Mean Squared Error = " + str(MSE)) -{% endhighlight %} \ No newline at end of file +{% endhighlight %} diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 65ed75b82ea5b..50a8671560737 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -48,14 +48,15 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph. {% highlight scala %} import org.apache.spark.mllib.clustering.KMeans +import org.apache.spark.mllib.linalg.Vectors // Load and parse the data -val data = sc.textFile("kmeans_data.txt") -val parsedData = data.map( _.split(' ').map(_.toDouble)) +val data = sc.textFile("data/kmeans_data.txt") +val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))) // Cluster the data into two classes using KMeans -val numIterations = 20 val numClusters = 2 +val numIterations = 20 val clusters = KMeans.train(parsedData, numClusters, numIterations) // Evaluate clustering by computing Within Set Sum of Squared Errors @@ -85,12 +86,12 @@ from numpy import array from math import sqrt # Load and parse the data -data = sc.textFile("kmeans_data.txt") +data = sc.textFile("data/kmeans_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) # Build the model (cluster the data) clusters = KMeans.train(parsedData, 2, maxIterations=10, - runs=30, initialization_mode="random") + runs=10, initialization_mode="random") # Evaluate clustering by computing Within Set Sum of Squared Errors def error(point): diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 1ac5cc13db0b1..4236b0c8b6c99 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -7,8 +7,9 @@ title: Machine Learning Library (MLlib) MLlib is a Spark implementation of some common machine learning (ML) functionality, as well associated tests and data generators. MLlib currently supports four common types of machine learning problem settings, -namely, binary classification, regression, clustering and collaborative -filtering, as well as an underlying gradient descent optimization primitive. +namely classification, regression, clustering and collaborative filtering, +as well as an underlying gradient descent optimization primitive and several +linear algebra methods. # Available Methods The following links provide a detailed explanation of the methods and usage examples for each of them: @@ -32,6 +33,28 @@ The following links provide a detailed explanation of the methods and usage exam * Singular Value Decomposition * Principal Component Analysis +# Data Types + +Most MLlib algorithms operate on RDDs containing vectors. In Java and Scala, the +[Vector](api/mllib/index.html#org.apache.spark.mllib.linalg.Vector) class is used to +represent vectors. You can create either dense or sparse vectors using the +[Vectors](api/mllib/index.html#org.apache.spark.mllib.linalg.Vectors$) factory. + +In Python, MLlib can take the following vector types: + +* [NumPy](http://www.numpy.org) arrays +* Standard Python lists (e.g. `[1, 2, 3]`) +* The MLlib [SparseVector](api/pyspark/pyspark.mllib.linalg.SparseVector-class.html) class +* [SciPy sparse matrices](http://docs.scipy.org/doc/scipy/reference/sparse.html) + +For efficiency, we recommend using NumPy arrays over lists, and using the +[CSC format](http://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csc_matrix.html#scipy.sparse.csc_matrix) +for SciPy matrices, or MLlib's own SparseVector class. + +Several other simple data types are used throughout the library, e.g. the LabeledPoint +class ([Java/Scala](api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint), +[Python](api/pyspark/pyspark.mllib.regression.LabeledPoint-class.html)) for labeled data. + # Dependencies MLlib uses the [jblas](https://github.com/mikiobraun/jblas) linear algebra library, which itself depends on native Fortran routines. You may need to install the diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index a6c049e517ee0..7c65b0d4750fa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.rdd.RDD @@ -31,56 +31,112 @@ import org.apache.spark.rdd.RDD /** * :: DeveloperApi :: * The Java stubs necessary for the Python mllib bindings. + * + * See python/pyspark/mllib/_common.py for the mutually agreed upon data format. */ @DeveloperApi class PythonMLLibAPI extends Serializable { - private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = { - val packetLength = bytes.length - if (packetLength < 16) { - throw new IllegalArgumentException("Byte array too short.") - } - val bb = ByteBuffer.wrap(bytes) - bb.order(ByteOrder.nativeOrder()) - val magic = bb.getLong() - if (magic != 1) { + private val DENSE_VECTOR_MAGIC: Byte = 1 + private val SPARSE_VECTOR_MAGIC: Byte = 2 + private val DENSE_MATRIX_MAGIC: Byte = 3 + private val LABELED_POINT_MAGIC: Byte = 4 + + private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { + require(bytes.length - offset >= 5, "Byte array too short") + val magic = bytes(offset) + if (magic == DENSE_VECTOR_MAGIC) { + deserializeDenseVector(bytes, offset) + } else if (magic == SPARSE_VECTOR_MAGIC) { + deserializeSparseVector(bytes, offset) + } else { throw new IllegalArgumentException("Magic " + magic + " is wrong.") } - val length = bb.getLong() - if (packetLength != 16 + 8 * length) { - throw new IllegalArgumentException("Length " + length + " is wrong.") - } + } + + private def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = { + val packetLength = bytes.length - offset + require(packetLength >= 5, "Byte array too short") + val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset) + bb.order(ByteOrder.nativeOrder()) + val magic = bb.get() + require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic) + val length = bb.getInt() + require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength) val db = bb.asDoubleBuffer() val ans = new Array[Double](length.toInt) db.get(ans) - ans + Vectors.dense(ans) } - private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = { + private def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = { + val packetLength = bytes.length - offset + require(packetLength >= 9, "Byte array too short") + val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset) + bb.order(ByteOrder.nativeOrder()) + val magic = bb.get() + require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic) + val size = bb.getInt() + val nonZeros = bb.getInt() + require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength) + val ib = bb.asIntBuffer() + val indices = new Array[Int](nonZeros) + ib.get(indices) + bb.position(bb.position() + 4 * nonZeros) + val db = bb.asDoubleBuffer() + val values = new Array[Double](nonZeros) + db.get(values) + Vectors.sparse(size, indices, values) + } + + private def serializeDenseVector(doubles: Array[Double]): Array[Byte] = { val len = doubles.length - val bytes = new Array[Byte](16 + 8 * len) + val bytes = new Array[Byte](5 + 8 * len) val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) - bb.putLong(1) - bb.putLong(len) + bb.put(DENSE_VECTOR_MAGIC) + bb.putInt(len) val db = bb.asDoubleBuffer() db.put(doubles) bytes } + private def serializeSparseVector(vector: SparseVector): Array[Byte] = { + val nonZeros = vector.indices.length + val bytes = new Array[Byte](9 + 12 * nonZeros) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.put(SPARSE_VECTOR_MAGIC) + bb.putInt(vector.size) + bb.putInt(nonZeros) + val ib = bb.asIntBuffer() + ib.put(vector.indices) + bb.position(bb.position() + 4 * nonZeros) + val db = bb.asDoubleBuffer() + db.put(vector.values) + bytes + } + + private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { + case s: SparseVector => + serializeSparseVector(s) + case _ => + serializeDenseVector(vector.toArray) + } + private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = { val packetLength = bytes.length - if (packetLength < 24) { + if (packetLength < 9) { throw new IllegalArgumentException("Byte array too short.") } val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) - val magic = bb.getLong() - if (magic != 2) { + val magic = bb.get() + if (magic != DENSE_MATRIX_MAGIC) { throw new IllegalArgumentException("Magic " + magic + " is wrong.") } - val rows = bb.getLong() - val cols = bb.getLong() - if (packetLength != 24 + 8 * rows * cols) { + val rows = bb.getInt() + val cols = bb.getInt() + if (packetLength != 9 + 8 * rows * cols) { throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.") } val db = bb.asDoubleBuffer() @@ -98,12 +154,12 @@ class PythonMLLibAPI extends Serializable { if (rows > 0) { cols = doubles(0).length } - val bytes = new Array[Byte](24 + 8 * rows * cols) + val bytes = new Array[Byte](9 + 8 * rows * cols) val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) - bb.putLong(2) - bb.putLong(rows) - bb.putLong(cols) + bb.put(DENSE_MATRIX_MAGIC) + bb.putInt(rows) + bb.putInt(cols) val db = bb.asDoubleBuffer() for (i <- 0 until rows) { db.put(doubles(i)) @@ -111,18 +167,27 @@ class PythonMLLibAPI extends Serializable { bytes } + private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = { + require(bytes.length >= 9, "Byte array too short") + val magic = bytes(0) + if (magic != LABELED_POINT_MAGIC) { + throw new IllegalArgumentException("Magic " + magic + " is wrong.") + } + val labelBytes = ByteBuffer.wrap(bytes, 1, 8) + labelBytes.order(ByteOrder.nativeOrder()) + val label = labelBytes.asDoubleBuffer().get(0) + LabeledPoint(label, deserializeDoubleVector(bytes, 9)) + } + private def trainRegressionModel( - trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel, + trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel, dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = { - val data = dataBytesJRDD.rdd.map(xBytes => { - val x = deserializeDoubleVector(xBytes) - LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length))) - }) + val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint) val initialWeights = deserializeDoubleVector(initialWeightsBA) val model = trainFunc(data, initialWeights) val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(serializeDoubleVector(model.weights.toArray)) + ret.add(serializeDoubleVector(model.weights)) ret.add(model.intercept: java.lang.Double) ret } @@ -143,7 +208,7 @@ class PythonMLLibAPI extends Serializable { numIterations, stepSize, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -166,7 +231,7 @@ class PythonMLLibAPI extends Serializable { stepSize, regParam, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -189,7 +254,7 @@ class PythonMLLibAPI extends Serializable { stepSize, regParam, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -212,7 +277,7 @@ class PythonMLLibAPI extends Serializable { stepSize, regParam, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -233,7 +298,7 @@ class PythonMLLibAPI extends Serializable { numIterations, stepSize, miniBatchFraction, - Vectors.dense(initialWeights)), + initialWeights), dataBytesJRDD, initialWeightsBA) } @@ -244,14 +309,11 @@ class PythonMLLibAPI extends Serializable { def trainNaiveBayes( dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double): java.util.List[java.lang.Object] = { - val data = dataBytesJRDD.rdd.map(xBytes => { - val x = deserializeDoubleVector(xBytes) - LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length))) - }) + val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint) val model = NaiveBayes.train(data, lambda) val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(serializeDoubleVector(model.labels)) - ret.add(serializeDoubleVector(model.pi)) + ret.add(serializeDoubleVector(Vectors.dense(model.labels))) + ret.add(serializeDoubleVector(Vectors.dense(model.pi))) ret.add(serializeDoubleMatrix(model.theta)) ret } @@ -265,7 +327,7 @@ class PythonMLLibAPI extends Serializable { maxIterations: Int, runs: Int, initializationMode: String): java.util.List[java.lang.Object] = { - val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes))) + val data = dataBytesJRDD.rdd.map(bytes => deserializeDoubleVector(bytes)) val model = KMeans.train(data, k, maxIterations, runs, initializationMode) val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray))) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 99a849f1c66b1..7cdf6bd56acd9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -130,9 +130,11 @@ object Vectors { private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = { breezeVector match { case v: BDV[Double] => - require(v.offset == 0, s"Do not support non-zero offset ${v.offset}.") - require(v.stride == 1, s"Do not support stride other than 1, but got ${v.stride}.") - new DenseVector(v.data) + if (v.offset == 0 && v.stride == 1) { + new DenseVector(v.data) + } else { + new DenseVector(v.toArray) // Can't use underlying array directly, so make a new one + } case v: BSV[Double] => new SparseVector(v.length, v.index, v.data) case v: BV[_] => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 8a200310e0bb1..cfe8a27fcb71e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -82,4 +82,22 @@ class VectorsSuite extends FunSuite { assert(v.## != another.##) } } + + test("indexing dense vectors") { + val vec = Vectors.dense(1.0, 2.0, 3.0, 4.0) + assert(vec(0) === 1.0) + assert(vec(3) === 4.0) + } + + test("indexing sparse vectors") { + val vec = Vectors.sparse(7, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0)) + assert(vec(0) === 1.0) + assert(vec(1) === 0.0) + assert(vec(2) === 2.0) + assert(vec(3) === 0.0) + assert(vec(6) === 4.0) + val vec2 = Vectors.sparse(8, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0)) + assert(vec2(6) === 4.0) + assert(vec2(7) === 0.0) + } } diff --git a/python/epydoc.conf b/python/epydoc.conf index 95a6af0974806..081ed215ae60c 100644 --- a/python/epydoc.conf +++ b/python/epydoc.conf @@ -33,5 +33,6 @@ target: docs/ private: no exclude: pyspark.cloudpickle pyspark.worker pyspark.join - pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test + pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests pyspark.rddsampler pyspark.daemon pyspark.mllib._common + pyspark.mllib.tests diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py index ba31af92fca25..d8387b0b183e6 100755 --- a/python/examples/kmeans.py +++ b/python/examples/kmeans.py @@ -16,8 +16,13 @@ # """ -This example requires numpy (http://www.numpy.org/) +The K-means algorithm written from scratch against PySpark. In practice, +one may prefer to use the KMeans algorithm in MLlib, as shown in +python/examples/mllib/kmeans.py. + +This example requires NumPy (http://www.numpy.org/). """ + import sys import numpy as np @@ -49,9 +54,7 @@ def closestPoint(p, centers): K = int(sys.argv[3]) convergeDist = float(sys.argv[4]) - # TODO: change this after we port takeSample() - #kPoints = data.takeSample(False, K, 34) - kPoints = data.take(K) + kPoints = data.takeSample(False, K, 1) tempDist = 1.0 while tempDist > convergeDist: diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py index 1117dea5380e7..28d52e6a40b45 100755 --- a/python/examples/logistic_regression.py +++ b/python/examples/logistic_regression.py @@ -16,9 +16,13 @@ # """ -A logistic regression implementation that uses NumPy (http://www.numpy.org) to act on batches -of input data using efficient matrix operations. +A logistic regression implementation that uses NumPy (http://www.numpy.org) +to act on batches of input data using efficient matrix operations. + +In practice, one may prefer to use the LogisticRegression algorithm in +MLlib, as shown in python/examples/mllib/logistic_regression.py. """ + from collections import namedtuple from math import exp from os.path import realpath diff --git a/python/examples/mllib/kmeans.py b/python/examples/mllib/kmeans.py new file mode 100755 index 0000000000000..dec82ff34fbac --- /dev/null +++ b/python/examples/mllib/kmeans.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A K-means clustering program using MLlib. + +This example requires NumPy (http://www.numpy.org/). +""" + +import sys + +import numpy as np +from pyspark import SparkContext +from pyspark.mllib.clustering import KMeans + + +def parseVector(line): + return np.array([float(x) for x in line.split(' ')]) + + +if __name__ == "__main__": + if len(sys.argv) < 4: + print >> sys.stderr, "Usage: kmeans " + exit(-1) + sc = SparkContext(sys.argv[1], "KMeans") + lines = sc.textFile(sys.argv[2]) + data = lines.map(parseVector) + k = int(sys.argv[3]) + model = KMeans.train(data, k) + print "Final centers: " + str(model.clusterCenters) diff --git a/python/examples/mllib/logistic_regression.py b/python/examples/mllib/logistic_regression.py new file mode 100755 index 0000000000000..8631051d00ff2 --- /dev/null +++ b/python/examples/mllib/logistic_regression.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Logistic regression using MLlib. + +This example requires NumPy (http://www.numpy.org/). +""" + +from math import exp +import sys + +import numpy as np +from pyspark import SparkContext +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.classification import LogisticRegressionWithSGD + + +# Parse a line of text into an MLlib LabeledPoint object +def parsePoint(line): + values = [float(s) for s in line.split(' ')] + if values[0] == -1: # Convert -1 labels to 0 for MLlib + values[0] = 0 + return LabeledPoint(values[0], values[1:]) + + +if __name__ == "__main__": + if len(sys.argv) != 4: + print >> sys.stderr, "Usage: logistic_regression " + exit(-1) + sc = SparkContext(sys.argv[1], "PythonLR") + points = sc.textFile(sys.argv[2]).map(parsePoint) + iterations = int(sys.argv[3]) + model = LogisticRegressionWithSGD.train(points, iterations) + print "Final weights: " + str(model.weights) + print "Final intercept: " + str(model.intercept) diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e19f5d2aaa958..e6f0953810ed7 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -15,38 +15,86 @@ # limitations under the License. # -from numpy import ndarray, float64, int64, int32, ones, array_equal, array, dot, shape, complex, issubdtype +import struct +import numpy +from numpy import ndarray, float64, int64, int32, array_equal, array from pyspark import SparkContext, RDD -import numpy as np - +from pyspark.mllib.linalg import SparseVector from pyspark.serializers import Serializer -import struct -# Double vector format: +""" +Common utilities shared throughout MLlib, primarily for dealing with +different data types. These include: +- Serialization utilities to / from byte arrays that Java can handle +- Serializers for other data types, like ALS Rating objects +- Common methods for linear models +- Methods to deal with the different vector types we support, such as + SparseVector and scipy.sparse matrices. +""" + + +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. + +_have_scipy = False +_scipy_issparse = None +try: + import scipy.sparse + _have_scipy = True + _scipy_issparse = scipy.sparse.issparse +except: + # No SciPy in environment, but that's okay + pass + + +# Serialization functions to and from Scala. These use the following formats, understood +# by the PythonMLLibAPI class in Scala: +# +# Dense double vector format: +# +# [1-byte 1] [4-byte length] [length*8 bytes of data] # -# [8-byte 1] [8-byte length] [length*8 bytes of data] +# Sparse double vector format: +# +# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values] # # Double matrix format: # -# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data] +# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data] +# +# LabeledPoint format: +# +# [1-byte 4] [8-byte label] [dense or sparse vector] # # This is all in machine-endian. That means that the Java interpreter and the # Python interpreter must agree on what endian the machine is. -def _deserialize_byte_array(shape, ba, offset): - """Wrapper around ndarray aliasing hack. + +DENSE_VECTOR_MAGIC = 1 +SPARSE_VECTOR_MAGIC = 2 +DENSE_MATRIX_MAGIC = 3 +LABELED_POINT_MAGIC = 4 + + +def _deserialize_numpy_array(shape, ba, offset, dtype=float64): + """ + Deserialize a numpy array of the given type from an offset in + bytearray ba, assigning it the given shape. >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0]) - >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0)) + >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0)) True >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2) - >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0)) + >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0)) + True + >>> x = array([1, 2, 3], dtype=int32) + >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32)) True """ - ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64", - order='C') + ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C') return ar.copy() + def _serialize_double_vector(v): """Serialize a double vector into a mutually understood format. @@ -55,160 +103,231 @@ def _serialize_double_vector(v): >>> array_equal(y, array([1.0, 2.0, 3.0])) True """ - if type(v) != ndarray: - raise TypeError("_serialize_double_vector called on a %s; " - "wanted ndarray" % type(v)) - """complex is only datatype that can't be converted to float64""" - if issubdtype(v.dtype, complex): + v = _convert_vector(v) + if type(v) == ndarray: + return _serialize_dense_vector(v) + elif type(v) == SparseVector: + return _serialize_sparse_vector(v) + else: raise TypeError("_serialize_double_vector called on a %s; " - "wanted ndarray" % type(v)) - if v.dtype != float64: - v = v.astype(float64) + "wanted ndarray or SparseVector" % type(v)) + + +def _serialize_dense_vector(v): + """Serialize a dense vector given as a NumPy array.""" if v.ndim != 1: raise TypeError("_serialize_double_vector called on a %ddarray; " "wanted a 1darray" % v.ndim) + if v.dtype != float64: + if numpy.issubdtype(v.dtype, numpy.complex): + raise TypeError("_serialize_double_vector called on an ndarray of %s; " + "wanted ndarray of float64" % v.dtype) + v = v.astype(float64) length = v.shape[0] - ba = bytearray(16 + 8*length) - header = ndarray(shape=[2], buffer=ba, dtype="int64") - header[0] = 1 - header[1] = length - arr_mid = ndarray(shape=[length], buffer=ba, offset=16, dtype="float64") - arr_mid[...] = v + ba = bytearray(5 + 8 * length) + ba[0] = DENSE_VECTOR_MAGIC + length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32) + length_bytes[0] = length + _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64) + return ba + + +def _serialize_sparse_vector(v): + """Serialize a pyspark.mllib.linalg.SparseVector.""" + nonzeros = len(v.indices) + ba = bytearray(9 + 12 * nonzeros) + ba[0] = SPARSE_VECTOR_MAGIC + header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) + header[0] = v.size + header[1] = nonzeros + _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32) + values_offset = 9 + 4 * nonzeros + _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64) return ba + def _deserialize_double_vector(ba): """Deserialize a double vector from a mutually understood format. >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]) >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x))) True + >>> s = SparseVector(4, [1, 3], [3.0, 5.5]) + >>> s == _deserialize_double_vector(_serialize_double_vector(s)) + True """ if type(ba) != bytearray: raise TypeError("_deserialize_double_vector called on a %s; " "wanted bytearray" % type(ba)) - if len(ba) < 16: + if len(ba) < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " "which is too short" % len(ba)) - if (len(ba) & 7) != 0: - raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is not a multiple of 8" % len(ba)) - header = ndarray(shape=[2], buffer=ba, dtype="int64") - if header[0] != 1: + if ba[0] == DENSE_VECTOR_MAGIC: + return _deserialize_dense_vector(ba) + elif ba[0] == SPARSE_VECTOR_MAGIC: + return _deserialize_sparse_vector(ba) + else: raise TypeError("_deserialize_double_vector called on bytearray " "with wrong magic") - length = header[1] - if len(ba) != 8*length + 16: - raise TypeError("_deserialize_double_vector called on bytearray " + + +def _deserialize_dense_vector(ba): + """Deserialize a dense vector into a numpy array.""" + if len(ba) < 5: + raise TypeError("_deserialize_dense_vector called on a %d-byte array, " + "which is too short" % len(ba)) + length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0] + if len(ba) != 8 * length + 5: + raise TypeError("_deserialize_dense_vector called on bytearray " + "with wrong length") + return _deserialize_numpy_array([length], ba, 5) + + +def _deserialize_sparse_vector(ba): + """Deserialize a sparse vector into a MLlib SparseVector object.""" + if len(ba) < 9: + raise TypeError("_deserialize_sparse_vector called on a %d-byte array, " + "which is too short" % len(ba)) + header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) + size = header[0] + nonzeros = header[1] + if len(ba) != 9 + 12 * nonzeros: + raise TypeError("_deserialize_sparse_vector called on bytearray " "with wrong length") - return _deserialize_byte_array([length], ba, 16) + indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32) + values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64) + return SparseVector(int(size), indices, values) + def _serialize_double_matrix(m): """Serialize a double matrix into a mutually understood format.""" - if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2): + if (type(m) == ndarray and m.ndim == 2): + if m.dtype != float64: + if numpy.issubdtype(m.dtype, numpy.complex): + raise TypeError("_serialize_double_matrix called on an ndarray of %s; " + "wanted ndarray of float64" % m.dtype) + m = m.astype(float64) rows = m.shape[0] cols = m.shape[1] - ba = bytearray(24 + 8 * rows * cols) - header = ndarray(shape=[3], buffer=ba, dtype="int64") - header[0] = 2 - header[1] = rows - header[2] = cols - arr_mid = ndarray(shape=[rows, cols], buffer=ba, offset=24, - dtype="float64", order='C') - arr_mid[...] = m + ba = bytearray(9 + 8 * rows * cols) + ba[0] = DENSE_MATRIX_MAGIC + lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32) + lengths[0] = rows + lengths[1] = cols + _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64) return ba else: raise TypeError("_serialize_double_matrix called on a " "non-double-matrix") + def _deserialize_double_matrix(ba): """Deserialize a double matrix from a mutually understood format.""" if type(ba) != bytearray: raise TypeError("_deserialize_double_matrix called on a %s; " "wanted bytearray" % type(ba)) - if len(ba) < 24: + if len(ba) < 9: raise TypeError("_deserialize_double_matrix called on a %d-byte array, " "which is too short" % len(ba)) - if (len(ba) & 7) != 0: - raise TypeError("_deserialize_double_matrix called on a %d-byte array, " - "which is not a multiple of 8" % len(ba)) - header = ndarray(shape=[3], buffer=ba, dtype="int64") - if (header[0] != 2): + if ba[0] != DENSE_MATRIX_MAGIC: raise TypeError("_deserialize_double_matrix called on bytearray " "with wrong magic") - rows = header[1] - cols = header[2] - if (len(ba) != 8*rows*cols + 24): + lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) + rows = lengths[0] + cols = lengths[1] + if (len(ba) != 8 * rows * cols + 9): raise TypeError("_deserialize_double_matrix called on bytearray " "with wrong length") - return _deserialize_byte_array([rows, cols], ba, 24) + return _deserialize_numpy_array([rows, cols], ba, 9) + + +def _serialize_labeled_point(p): + """Serialize a LabeledPoint with a features vector of any type.""" + from pyspark.mllib.regression import LabeledPoint + serialized_features = _serialize_double_vector(p.features) + header = bytearray(9) + header[0] = LABELED_POINT_MAGIC + header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64) + header_float[0] = p.label + return header + serialized_features + + +def _copyto(array, buffer, offset, shape, dtype): + """ + Copy the contents of a vector to a destination bytearray at the + given offset. + + TODO: In the future this could use numpy.copyto on NumPy 1.7+, but + we should benchmark that to see whether it provides a benefit. + """ + temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C') + temp_array[...] = array -def _linear_predictor_typecheck(x, coeffs): - """Check that x is a one-dimensional vector of the right shape. - This is a temporary hackaround until I actually implement bulk predict.""" - if type(x) == ndarray: - if x.ndim == 1: - if x.shape == coeffs.shape: - pass - else: - raise RuntimeError("Got array of %d elements; wanted %d" - % (shape(x)[0], shape(coeffs)[0])) - else: - raise RuntimeError("Bulk predict not yet supported.") - elif (type(x) == RDD): - raise RuntimeError("Bulk predict not yet supported.") - else: - raise TypeError("Argument of type " + type(x).__name__ + " unsupported") def _get_unmangled_rdd(data, serializer): dataBytes = data.map(serializer) dataBytes._bypass_serializer = True - dataBytes.cache() + dataBytes.cache() # TODO: users should unpersist() this later! return dataBytes -# Map a pickled Python RDD of numpy double vectors to a Java RDD of + +# Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of # _serialized_double_vectors def _get_unmangled_double_vector_rdd(data): return _get_unmangled_rdd(data, _serialize_double_vector) -class LinearModel(object): - """Something that has a vector of coefficients and an intercept.""" - def __init__(self, coeff, intercept): - self._coeff = coeff - self._intercept = intercept -class LinearRegressionModelBase(LinearModel): - """A linear regression model. +# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points +def _get_unmangled_labeled_point_rdd(data): + return _get_unmangled_rdd(data, _serialize_labeled_point) - >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) - >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6 - True + +# Common functions for dealing with and training linear models + +def _linear_predictor_typecheck(x, coeffs): """ - def predict(self, x): - """Predict the value of the dependent variable given a vector x""" - """containing values for the independent variables.""" - _linear_predictor_typecheck(x, self._coeff) - return dot(self._coeff, x) + self._intercept + Check that x is a one-dimensional vector of the right shape. + This is a temporary hackaround until we actually implement bulk predict. + """ + x = _convert_vector(x) + if type(x) == ndarray: + if x.ndim == 1: + if x.shape != coeffs.shape: + raise RuntimeError("Got array of %d elements; wanted %d" + % (numpy.shape(x)[0], coeffs.shape[0])) + else: + raise RuntimeError("Bulk predict not yet supported.") + elif type(x) == SparseVector: + if x.size != coeffs.shape[0]: + raise RuntimeError("Got sparse vector of size %d; wanted %d" + % (x.size, coeffs.shape[0])) + elif (type(x) == RDD): + raise RuntimeError("Bulk predict not yet supported.") + else: + raise TypeError("Argument of type " + type(x).__name__ + " unsupported") + # If we weren't given initial weights, take a zero vector of the appropriate # length. def _get_initial_weights(initial_weights, data): if initial_weights is None: - initial_weights = data.first() - if type(initial_weights) != ndarray: - raise TypeError("At least one data element has type " - + type(initial_weights).__name__ + " which is not ndarray") - if initial_weights.ndim != 1: - raise TypeError("At least one data element has " - + initial_weights.ndim + " dimensions, which is not 1") - initial_weights = ones([initial_weights.shape[0] - 1]) + initial_weights = _convert_vector(data.first().features) + if type(initial_weights) == ndarray: + if initial_weights.ndim != 1: + raise TypeError("At least one data element has " + + initial_weights.ndim + " dimensions, which is not 1") + initial_weights = numpy.zeros([initial_weights.shape[0]]) + elif type(initial_weights) == SparseVector: + initial_weights = numpy.zeros([initial_weights.size]) return initial_weights + # train_func should take two parameters, namely data and initial_weights, and # return the result of a call to the appropriate JVM stub. # _regression_train_wrapper is responsible for setup and error checking. def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): initial_weights = _get_initial_weights(initial_weights, data) - dataBytes = _get_unmangled_double_vector_rdd(data) + dataBytes = _get_unmangled_labeled_point_rdd(data) ans = train_func(dataBytes, _serialize_double_vector(initial_weights)) if len(ans) != 2: raise RuntimeError("JVM call result had unexpected length") @@ -220,6 +339,9 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): + type(ans[0]).__name__ + " which is not float") return klass(_deserialize_double_vector(ans[0]), ans[1]) + +# Functions for serializing ALS Rating objects and tuples + def _serialize_rating(r): ba = bytearray(16) intpart = ndarray(shape=[2], buffer=ba, dtype=int32) @@ -227,11 +349,12 @@ def _serialize_rating(r): intpart[0], intpart[1], doublepart[0] = r return ba + class RatingDeserializer(Serializer): def loads(self, stream): length = struct.unpack("!i", stream.read(4))[0] ba = stream.read(length) - res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4) + res = ndarray(shape=(3, ), buffer=ba, dtype=float64, offset=4) return int(res[0]), int(res[1]), res[2] def load_stream(self, stream): @@ -243,12 +366,86 @@ def load_stream(self, stream): except EOFError: return + def _serialize_tuple(t): ba = bytearray(8) intpart = ndarray(shape=[2], buffer=ba, dtype=int32) intpart[0], intpart[1] = t return ba + +# Vector math functions that support all of our vector types + +def _convert_vector(vec): + """ + Convert a vector to a format we support internally. This does + the following: + + * For dense NumPy vectors (ndarray), returns them as is + * For our SparseVector class, returns that as is + * For Python lists, converts them to NumPy vectors + * For scipy.sparse.*_matrix column vectors, converts them to + our own SparseVector type. + + This should be called before passing any data to our algorithms + or attempting to serialize it to Java. + """ + if type(vec) == ndarray or type(vec) == SparseVector: + return vec + elif type(vec) == list: + return array(vec, dtype=float64) + elif _have_scipy: + if _scipy_issparse(vec): + assert vec.shape[1] == 1, "Expected column vector" + csc = vec.tocsc() + return SparseVector(vec.shape[0], csc.indices, csc.data) + raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix") + + +def _squared_distance(v1, v2): + """ + Squared distance of two NumPy or sparse vectors. + + >>> dense1 = array([1., 2.]) + >>> sparse1 = SparseVector(2, [0, 1], [1., 2.]) + >>> dense2 = array([2., 1.]) + >>> sparse2 = SparseVector(2, [0, 1], [2., 1.]) + >>> _squared_distance(dense1, dense2) + 2.0 + >>> _squared_distance(dense1, sparse2) + 2.0 + >>> _squared_distance(sparse1, dense2) + 2.0 + >>> _squared_distance(sparse1, sparse2) + 2.0 + """ + v1 = _convert_vector(v1) + v2 = _convert_vector(v2) + if type(v1) == ndarray and type(v2) == ndarray: + diff = v1 - v2 + return diff.dot(diff) + elif type(v1) == ndarray: + return v2.squared_distance(v1) + else: + return v1.squared_distance(v2) + + +def _dot(vec, target): + """ + Compute the dot product of a vector of the types we support + (Numpy array, list, SparseVector, or SciPy sparse) and a target + NumPy array that is either 1- or 2-dimensional. Equivalent to + calling numpy.dot of the two vectors, but for SciPy ones, we + have to transpose them because they're column vectors. + """ + if type(vec) == ndarray or type(vec) == SparseVector: + return vec.dot(target) + elif type(vec) == list: + return _convert_vector(vec).dot(target) + else: + return vec.transpose().dot(target)[0] + + def _test(): import doctest globs = globals().copy() @@ -259,5 +456,6 @@ def _test(): if failure_count: exit(-1) + if __name__ == "__main__": _test() diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index d2f9cdb3f4298..3a23e0801fe7b 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -17,30 +17,55 @@ import numpy -from numpy import array, dot, shape +from numpy import array, shape from pyspark import SparkContext from pyspark.mllib._common import \ - _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ + _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ _serialize_double_matrix, _deserialize_double_matrix, \ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ - LinearModel, _linear_predictor_typecheck + _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd +from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.regression import LabeledPoint, LinearModel from math import exp, log class LogisticRegressionModel(LinearModel): """A linear binary classification model derived from logistic regression. - >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) + >>> data = [ + ... LabeledPoint(0.0, [0.0]), + ... LabeledPoint(1.0, [1.0]), + ... LabeledPoint(1.0, [2.0]), + ... LabeledPoint(1.0, [3.0]) + ... ] >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) >>> lrm.predict(array([1.0])) > 0 True + >>> lrm.predict(array([0.0])) <= 0 + True + >>> sparse_data = [ + ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), + ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})), + ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), + ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) + ... ] + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data)) + >>> lrm.predict(array([0.0, 1.0])) > 0 + True + >>> lrm.predict(array([0.0, 0.0])) <= 0 + True + >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0 + True + >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0 + True """ def predict(self, x): _linear_predictor_typecheck(x, self._coeff) - margin = dot(x, self._coeff) + self._intercept + margin = _dot(x, self._coeff) + self._intercept prob = 1/(1 + exp(-margin)) return 1 if prob > 0.5 else 0 + class LogisticRegressionWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, @@ -55,14 +80,30 @@ def train(cls, data, iterations=100, step=1.0, class SVMModel(LinearModel): """A support vector machine. - >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) + >>> data = [ + ... LabeledPoint(0.0, [0.0]), + ... LabeledPoint(1.0, [1.0]), + ... LabeledPoint(1.0, [2.0]), + ... LabeledPoint(1.0, [3.0]) + ... ] >>> svm = SVMWithSGD.train(sc.parallelize(data)) >>> svm.predict(array([1.0])) > 0 True + >>> sparse_data = [ + ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), + ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})), + ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), + ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) + ... ] + >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data)) + >>> svm.predict(SparseVector(2, {1: 1.0})) > 0 + True + >>> svm.predict(SparseVector(2, {1: 0.0})) <= 0 + True """ def predict(self, x): _linear_predictor_typecheck(x, self._coeff) - margin = dot(x, self._coeff) + self._intercept + margin = _dot(x, self._coeff) + self._intercept return 1 if margin >= 0 else 0 class SVMWithSGD(object): @@ -84,12 +125,26 @@ class NaiveBayesModel(object): - pi: vector of logs of class priors (dimension C) - theta: matrix of logs of class conditional probabilities (CxD) - >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3) + >>> data = [ + ... LabeledPoint(0.0, [0.0, 0.0]), + ... LabeledPoint(0.0, [0.0, 1.0]), + ... LabeledPoint(1.0, [1.0, 0.0]), + ... ] >>> model = NaiveBayes.train(sc.parallelize(data)) >>> model.predict(array([0.0, 1.0])) 0.0 >>> model.predict(array([1.0, 0.0])) 1.0 + >>> sparse_data = [ + ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})), + ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})), + ... LabeledPoint(1.0, SparseVector(2, {0: 1.0})) + ... ] + >>> model = NaiveBayes.train(sc.parallelize(sparse_data)) + >>> model.predict(SparseVector(2, {1: 1.0})) + 0.0 + >>> model.predict(SparseVector(2, {0: 1.0})) + 1.0 """ def __init__(self, labels, pi, theta): @@ -99,7 +154,7 @@ def __init__(self, labels, pi, theta): def predict(self, x): """Return the most likely class for a data vector x""" - return self.labels[numpy.argmax(self.pi + dot(x, self.theta))] + return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))] class NaiveBayes(object): @classmethod @@ -119,7 +174,7 @@ def train(cls, data, lambda_=1.0): @param lambda_: The smoothing parameter """ sc = data.context - dataBytes = _get_unmangled_double_vector_rdd(data) + dataBytes = _get_unmangled_labeled_point_rdd(data) ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) return NaiveBayesModel( _deserialize_double_vector(ans[0]), diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 30862918c3f86..f65088c9170e0 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -19,37 +19,61 @@ from math import sqrt from pyspark import SparkContext from pyspark.mllib._common import \ - _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ + _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \ _serialize_double_matrix, _deserialize_double_matrix, \ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper +from pyspark.mllib.linalg import SparseVector + class KMeansModel(object): """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) - >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") - >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0])) + >>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") + >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) + True + >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0])) + True + >>> model = KMeans.train(sc.parallelize(data), 2) + >>> sparse_data = [ + ... SparseVector(3, {1: 1.0}), + ... SparseVector(3, {1: 1.1}), + ... SparseVector(3, {2: 1.0}), + ... SparseVector(3, {2: 1.1}) + ... ] + >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||") + >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.])) + True + >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1])) + True + >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1]) True - >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0])) + >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3]) True - >>> clusters = KMeans.train(sc.parallelize(data), 2) + >>> type(model.clusterCenters) + """ - def __init__(self, centers_): - self.centers = centers_ + def __init__(self, centers): + self.centers = centers + + @property + def clusterCenters(self): + """Get the cluster centers, represented as a list of NumPy arrays.""" + return self.centers def predict(self, x): """Find the cluster to which x belongs in this model.""" best = 0 - best_distance = 1e75 - for i in range(0, self.centers.shape[0]): - diff = x - self.centers[i] - distance = sqrt(dot(diff, diff)) + best_distance = float("inf") + for i in range(0, len(self.centers)): + distance = _squared_distance(x, self.centers[i]) if distance < best_distance: best = i best_distance = distance return best + class KMeans(object): @classmethod def train(cls, data, k, maxIterations=100, runs=1, @@ -64,7 +88,9 @@ def train(cls, data, k, maxIterations=100, runs=1, elif type(ans[0]) != bytearray: raise RuntimeError("JVM call result had first element of type " + type(ans[0]) + " which is not bytearray") - return KMeansModel(_deserialize_double_matrix(ans[0])) + matrix = _deserialize_double_matrix(ans[0]) + return KMeansModel([row for row in matrix]) + def _test(): import doctest @@ -76,5 +102,6 @@ def _test(): if failure_count: exit(-1) + if __name__ == "__main__": _test() diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py new file mode 100644 index 0000000000000..0aa3a51de706b --- /dev/null +++ b/python/pyspark/mllib/linalg.py @@ -0,0 +1,245 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +MLlib utilities for linear algebra. For dense vectors, MLlib +uses the NumPy C{array} type, so you can simply pass NumPy arrays +around. For sparse vectors, users can construct a L{SparseVector} +object from MLlib or pass SciPy C{scipy.sparse} column vectors if +SciPy is available in their environment. +""" + +from numpy import array, array_equal, ndarray, float64, int32 + + +class SparseVector(object): + """ + A simple sparse vector class for passing data to MLlib. Users may + alternatively pass SciPy's {scipy.sparse} data types. + """ + + def __init__(self, size, *args): + """ + Create a sparse vector, using either a dictionary, a list of + (index, value) pairs, or two separate arrays of indices and + values (sorted by index). + + @param size: Size of the vector. + @param args: Non-zero entries, as a dictionary, list of tupes, + or two sorted lists containing indices and values. + + >>> print SparseVector(4, {1: 1.0, 3: 5.5}) + [1: 1.0, 3: 5.5] + >>> print SparseVector(4, [(1, 1.0), (3, 5.5)]) + [1: 1.0, 3: 5.5] + >>> print SparseVector(4, [1, 3], [1.0, 5.5]) + [1: 1.0, 3: 5.5] + """ + assert type(size) == int, "first argument must be an int" + self.size = size + assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments" + if len(args) == 1: + pairs = args[0] + if type(pairs) == dict: + pairs = pairs.items() + pairs = sorted(pairs) + self.indices = array([p[0] for p in pairs], dtype=int32) + self.values = array([p[1] for p in pairs], dtype=float64) + else: + assert len(args[0]) == len(args[1]), "index and value arrays not same length" + self.indices = array(args[0], dtype=int32) + self.values = array(args[1], dtype=float64) + for i in xrange(len(self.indices) - 1): + if self.indices[i] >= self.indices[i + 1]: + raise TypeError("indices array must be sorted") + + def dot(self, other): + """ + Dot product with a SparseVector or 1- or 2-dimensional Numpy array. + + >>> a = SparseVector(4, [1, 3], [3.0, 4.0]) + >>> a.dot(a) + 25.0 + >>> a.dot(array([1., 2., 3., 4.])) + 22.0 + >>> b = SparseVector(4, [2, 4], [1.0, 2.0]) + >>> a.dot(b) + 0.0 + >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]])) + array([ 22., 22.]) + """ + if type(other) == ndarray: + if other.ndim == 1: + result = 0.0 + for i in xrange(len(self.indices)): + result += self.values[i] * other[self.indices[i]] + return result + elif other.ndim == 2: + results = [self.dot(other[:,i]) for i in xrange(other.shape[1])] + return array(results) + else: + raise Exception("Cannot call dot with %d-dimensional array" % other.ndim) + else: + result = 0.0 + i, j = 0, 0 + while i < len(self.indices) and j < len(other.indices): + if self.indices[i] == other.indices[j]: + result += self.values[i] * other.values[j] + i += 1 + j += 1 + elif self.indices[i] < other.indices[j]: + i += 1 + else: + j += 1 + return result + + def squared_distance(self, other): + """ + Squared distance from a SparseVector or 1-dimensional NumPy array. + + >>> a = SparseVector(4, [1, 3], [3.0, 4.0]) + >>> a.squared_distance(a) + 0.0 + >>> a.squared_distance(array([1., 2., 3., 4.])) + 11.0 + >>> b = SparseVector(4, [2, 4], [1.0, 2.0]) + >>> a.squared_distance(b) + 30.0 + >>> b.squared_distance(a) + 30.0 + """ + if type(other) == ndarray: + if other.ndim == 1: + result = 0.0 + j = 0 # index into our own array + for i in xrange(other.shape[0]): + if j < len(self.indices) and self.indices[j] == i: + diff = self.values[j] - other[i] + result += diff * diff + j += 1 + else: + result += other[i] * other[i] + return result + else: + raise Exception("Cannot call squared_distance with %d-dimensional array" % + other.ndim) + else: + result = 0.0 + i, j = 0, 0 + while i < len(self.indices) and j < len(other.indices): + if self.indices[i] == other.indices[j]: + diff = self.values[i] - other.values[j] + result += diff * diff + i += 1 + j += 1 + elif self.indices[i] < other.indices[j]: + result += self.values[i] * self.values[i] + i += 1 + else: + result += other.values[j] * other.values[j] + j += 1 + while i < len(self.indices): + result += self.values[i] * self.values[i] + i += 1 + while j < len(other.indices): + result += other.values[j] * other.values[j] + j += 1 + return result + + def __str__(self): + inds = self.indices + vals = self.values + entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) + return "[" + entries + "]" + + def __repr__(self): + inds = self.indices + vals = self.values + entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) + return "SparseVector({0}, {{{1}}})".format(self.size, entries) + + def __eq__(self, other): + """ + Test SparseVectors for equality. + + >>> v1 = SparseVector(4, [(1, 1.0), (3, 5.5)]) + >>> v2 = SparseVector(4, [(1, 1.0), (3, 5.5)]) + >>> v1 == v2 + True + >>> v1 != v2 + False + """ + + return (isinstance(other, self.__class__) + and other.size == self.size + and array_equal(other.indices, self.indices) + and array_equal(other.values, self.values)) + + def __ne__(self, other): + return not self.__eq__(other) + + + +class Vectors(object): + """ + Factory methods for working with vectors. Note that dense vectors + are simply represented as NumPy array objects, so there is no need + to covert them for use in MLlib. For sparse vectors, the factory + methods in this class create an MLlib-compatible type, or users + can pass in SciPy's C{scipy.sparse} column vectors. + """ + + @staticmethod + def sparse(size, *args): + """ + Create a sparse vector, using either a dictionary, a list of + (index, value) pairs, or two separate arrays of indices and + values (sorted by index). + + @param size: Size of the vector. + @param args: Non-zero entries, as a dictionary, list of tupes, + or two sorted lists containing indices and values. + + >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5}) + [1: 1.0, 3: 5.5] + >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) + [1: 1.0, 3: 5.5] + >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5]) + [1: 1.0, 3: 5.5] + """ + return SparseVector(size, *args) + + @staticmethod + def dense(elements): + """ + Create a dense vector of 64-bit floats from a Python list. Always + returns a NumPy array. + + >>> Vectors.dense([1, 2, 3]) + array([ 1., 2., 3.]) + """ + return array(elements, dtype=float64) + + +def _test(): + import doctest + (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) + if failure_count: + exit(-1) + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 7656db07f61cc..266b31d3fab0e 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -15,41 +15,98 @@ # limitations under the License. # -from numpy import array, dot +from numpy import array, ndarray from pyspark import SparkContext from pyspark.mllib._common import \ - _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ + _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ _serialize_double_matrix, _deserialize_double_matrix, \ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ - _linear_predictor_typecheck + _linear_predictor_typecheck, _have_scipy, _scipy_issparse +from pyspark.mllib.linalg import SparseVector + + +class LabeledPoint(object): + """ + The features and labels of a data point. + + @param label: Label for this data point. + @param features: Vector of features for this point (NumPy array, list, + pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix) + """ + def __init__(self, label, features): + self.label = label + if (type(features) == ndarray or type(features) == SparseVector + or (_have_scipy and _scipy_issparse(features))): + self.features = features + elif type(features) == list: + self.features = array(features) + else: + raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") + class LinearModel(object): - """Something that has a vector of coefficients and an intercept.""" - def __init__(self, coeff, intercept): - self._coeff = coeff + """A linear model that has a vector of coefficients and an intercept.""" + def __init__(self, weights, intercept): + self._coeff = weights self._intercept = intercept + @property + def weights(self): + return self._coeff + + @property + def intercept(self): + return self._intercept + + class LinearRegressionModelBase(LinearModel): """A linear regression model. >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6 True + >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 + True """ def predict(self, x): """Predict the value of the dependent variable given a vector x""" """containing values for the independent variables.""" _linear_predictor_typecheck(x, self._coeff) - return dot(self._coeff, x) + self._intercept + return _dot(x, self._coeff) + self._intercept + class LinearRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit. - >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) + >>> from pyspark.mllib.regression import LabeledPoint + >>> data = [ + ... LabeledPoint(0.0, [0.0]), + ... LabeledPoint(1.0, [1.0]), + ... LabeledPoint(3.0, [2.0]), + ... LabeledPoint(2.0, [3.0]) + ... ] >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 + True + >>> data = [ + ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), + ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), + ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), + ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) + ... ] + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 + True """ + class LinearRegressionWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, @@ -61,14 +118,39 @@ def train(cls, data, iterations=100, step=1.0, d._jrdd, iterations, step, miniBatchFraction, i), LinearRegressionModel, data, initialWeights) + class LassoModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit with an l_1 penalty term. - >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) + >>> from pyspark.mllib.regression import LabeledPoint + >>> data = [ + ... LabeledPoint(0.0, [0.0]), + ... LabeledPoint(1.0, [1.0]), + ... LabeledPoint(3.0, [2.0]), + ... LabeledPoint(2.0, [3.0]) + ... ] >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 + True + >>> data = [ + ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), + ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), + ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), + ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) + ... ] + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 + True """ + class LassoWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, @@ -80,14 +162,39 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, iterations, step, regParam, miniBatchFraction, i), LassoModel, data, initialWeights) + class RidgeRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit with an l_2 penalty term. - >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) + >>> from pyspark.mllib.regression import LabeledPoint + >>> data = [ + ... LabeledPoint(0.0, [0.0]), + ... LabeledPoint(1.0, [1.0]), + ... LabeledPoint(3.0, [2.0]), + ... LabeledPoint(2.0, [3.0]) + ... ] >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 + True + >>> data = [ + ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})), + ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})), + ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), + ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) + ... ] + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 + True + >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 + True """ + class RidgeRegressionWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, @@ -99,6 +206,7 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, iterations, step, regParam, miniBatchFraction, i), RidgeRegressionModel, data, initialWeights) + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py new file mode 100644 index 0000000000000..d4771d779f9f4 --- /dev/null +++ b/python/pyspark/mllib/tests.py @@ -0,0 +1,302 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Fuller unit tests for Python MLlib. +""" + +from numpy import array, array_equal +import unittest + +from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \ + _deserialize_double_vector, _dot, _squared_distance +from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.regression import LabeledPoint +from pyspark.tests import PySparkTestCase + + +_have_scipy = False +try: + import scipy.sparse + _have_scipy = True +except: + # No SciPy, but that's okay, we'll skip those tests + pass + + +class VectorTests(unittest.TestCase): + def test_serialize(self): + sv = SparseVector(4, {1: 1, 3: 2}) + dv = array([1., 2., 3., 4.]) + lst = [1, 2, 3, 4] + self.assertTrue(sv is _convert_vector(sv)) + self.assertTrue(dv is _convert_vector(dv)) + self.assertTrue(array_equal(dv, _convert_vector(lst))) + self.assertEquals(sv, + _deserialize_double_vector(_serialize_double_vector(sv))) + self.assertTrue(array_equal(dv, + _deserialize_double_vector(_serialize_double_vector(dv)))) + self.assertTrue(array_equal(dv, + _deserialize_double_vector(_serialize_double_vector(lst)))) + + def test_dot(self): + sv = SparseVector(4, {1: 1, 3: 2}) + dv = array([1., 2., 3., 4.]) + lst = [1, 2, 3, 4] + mat = array([[1., 2., 3., 4.], + [1., 2., 3., 4.], + [1., 2., 3., 4.], + [1., 2., 3., 4.]]) + self.assertEquals(10.0, _dot(sv, dv)) + self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat))) + self.assertEquals(30.0, _dot(dv, dv)) + self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat))) + self.assertEquals(30.0, _dot(lst, dv)) + self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat))) + + def test_squared_distance(self): + sv = SparseVector(4, {1: 1, 3: 2}) + dv = array([1., 2., 3., 4.]) + lst = [4, 3, 2, 1] + self.assertEquals(15.0, _squared_distance(sv, dv)) + self.assertEquals(25.0, _squared_distance(sv, lst)) + self.assertEquals(20.0, _squared_distance(dv, lst)) + self.assertEquals(15.0, _squared_distance(dv, sv)) + self.assertEquals(25.0, _squared_distance(lst, sv)) + self.assertEquals(20.0, _squared_distance(lst, dv)) + self.assertEquals(0.0, _squared_distance(sv, sv)) + self.assertEquals(0.0, _squared_distance(dv, dv)) + self.assertEquals(0.0, _squared_distance(lst, lst)) + + +class ListTests(PySparkTestCase): + """ + Test MLlib algorithms on plain lists, to make sure they're passed through + as NumPy arrays. + """ + + def test_clustering(self): + from pyspark.mllib.clustering import KMeans + data = [ + [0, 1.1], + [0, 1.2], + [1.1, 0], + [1.2, 0], + ] + clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||") + self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) + self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) + + def test_classification(self): + from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes + data = [ + LabeledPoint(0.0, [1, 0]), + LabeledPoint(1.0, [0, 1]), + LabeledPoint(0.0, [2, 0]), + LabeledPoint(1.0, [0, 2]) + ] + rdd = self.sc.parallelize(data) + features = [p.features.tolist() for p in data] + + lr_model = LogisticRegressionWithSGD.train(rdd) + self.assertTrue(lr_model.predict(features[0]) <= 0) + self.assertTrue(lr_model.predict(features[1]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) + + svm_model = SVMWithSGD.train(rdd) + self.assertTrue(svm_model.predict(features[0]) <= 0) + self.assertTrue(svm_model.predict(features[1]) > 0) + self.assertTrue(svm_model.predict(features[2]) <= 0) + self.assertTrue(svm_model.predict(features[3]) > 0) + + nb_model = NaiveBayes.train(rdd) + self.assertTrue(nb_model.predict(features[0]) <= 0) + self.assertTrue(nb_model.predict(features[1]) > 0) + self.assertTrue(nb_model.predict(features[2]) <= 0) + self.assertTrue(nb_model.predict(features[3]) > 0) + + def test_regression(self): + from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ + RidgeRegressionWithSGD + data = [ + LabeledPoint(-1.0, [0, -1]), + LabeledPoint(1.0, [0, 1]), + LabeledPoint(-1.0, [0, -2]), + LabeledPoint(1.0, [0, 2]) + ] + rdd = self.sc.parallelize(data) + features = [p.features.tolist() for p in data] + + lr_model = LinearRegressionWithSGD.train(rdd) + self.assertTrue(lr_model.predict(features[0]) <= 0) + self.assertTrue(lr_model.predict(features[1]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) + + lasso_model = LassoWithSGD.train(rdd) + self.assertTrue(lasso_model.predict(features[0]) <= 0) + self.assertTrue(lasso_model.predict(features[1]) > 0) + self.assertTrue(lasso_model.predict(features[2]) <= 0) + self.assertTrue(lasso_model.predict(features[3]) > 0) + + rr_model = RidgeRegressionWithSGD.train(rdd) + self.assertTrue(rr_model.predict(features[0]) <= 0) + self.assertTrue(rr_model.predict(features[1]) > 0) + self.assertTrue(rr_model.predict(features[2]) <= 0) + self.assertTrue(rr_model.predict(features[3]) > 0) + + +@unittest.skipIf(not _have_scipy, "SciPy not installed") +class SciPyTests(PySparkTestCase): + """ + Test both vector operations and MLlib algorithms with SciPy sparse matrices, + if SciPy is available. + """ + + def test_serialize(self): + from scipy.sparse import lil_matrix + lil = lil_matrix((4, 1)) + lil[1, 0] = 1 + lil[3, 0] = 2 + sv = SparseVector(4, {1: 1, 3: 2}) + self.assertEquals(sv, _convert_vector(lil)) + self.assertEquals(sv, _convert_vector(lil.tocsc())) + self.assertEquals(sv, _convert_vector(lil.tocoo())) + self.assertEquals(sv, _convert_vector(lil.tocsr())) + self.assertEquals(sv, _convert_vector(lil.todok())) + self.assertEquals(sv, + _deserialize_double_vector(_serialize_double_vector(lil))) + self.assertEquals(sv, + _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) + self.assertEquals(sv, + _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) + self.assertEquals(sv, + _deserialize_double_vector(_serialize_double_vector(lil.todok()))) + + def test_dot(self): + from scipy.sparse import lil_matrix + lil = lil_matrix((4, 1)) + lil[1, 0] = 1 + lil[3, 0] = 2 + dv = array([1., 2., 3., 4.]) + sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4}) + mat = array([[1., 2., 3., 4.], + [1., 2., 3., 4.], + [1., 2., 3., 4.], + [1., 2., 3., 4.]]) + self.assertEquals(10.0, _dot(lil, dv)) + self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat))) + + def test_squared_distance(self): + from scipy.sparse import lil_matrix + lil = lil_matrix((4, 1)) + lil[1, 0] = 3 + lil[3, 0] = 2 + dv = array([1., 2., 3., 4.]) + sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4}) + self.assertEquals(15.0, _squared_distance(lil, dv)) + self.assertEquals(15.0, _squared_distance(lil, sv)) + self.assertEquals(15.0, _squared_distance(dv, lil)) + self.assertEquals(15.0, _squared_distance(sv, lil)) + + def scipy_matrix(self, size, values): + """Create a column SciPy matrix from a dictionary of values""" + from scipy.sparse import lil_matrix + lil = lil_matrix((size, 1)) + for key, value in values.items(): + lil[key, 0] = value + return lil + + def test_clustering(self): + from pyspark.mllib.clustering import KMeans + data = [ + self.scipy_matrix(3, {1: 1.0}), + self.scipy_matrix(3, {1: 1.1}), + self.scipy_matrix(3, {2: 1.0}), + self.scipy_matrix(3, {2: 1.1}) + ] + clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||") + self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) + self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) + + def test_classification(self): + from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes + data = [ + LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})), + LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0})) + ] + rdd = self.sc.parallelize(data) + features = [p.features for p in data] + + lr_model = LogisticRegressionWithSGD.train(rdd) + self.assertTrue(lr_model.predict(features[0]) <= 0) + self.assertTrue(lr_model.predict(features[1]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) + + svm_model = SVMWithSGD.train(rdd) + self.assertTrue(svm_model.predict(features[0]) <= 0) + self.assertTrue(svm_model.predict(features[1]) > 0) + self.assertTrue(svm_model.predict(features[2]) <= 0) + self.assertTrue(svm_model.predict(features[3]) > 0) + + nb_model = NaiveBayes.train(rdd) + self.assertTrue(nb_model.predict(features[0]) <= 0) + self.assertTrue(nb_model.predict(features[1]) > 0) + self.assertTrue(nb_model.predict(features[2]) <= 0) + self.assertTrue(nb_model.predict(features[3]) > 0) + + def test_regression(self): + from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ + RidgeRegressionWithSGD + data = [ + LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})), + LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})), + LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0})) + ] + rdd = self.sc.parallelize(data) + features = [p.features for p in data] + + lr_model = LinearRegressionWithSGD.train(rdd) + self.assertTrue(lr_model.predict(features[0]) <= 0) + self.assertTrue(lr_model.predict(features[1]) > 0) + self.assertTrue(lr_model.predict(features[2]) <= 0) + self.assertTrue(lr_model.predict(features[3]) > 0) + + lasso_model = LassoWithSGD.train(rdd) + self.assertTrue(lasso_model.predict(features[0]) <= 0) + self.assertTrue(lasso_model.predict(features[1]) > 0) + self.assertTrue(lasso_model.predict(features[2]) <= 0) + self.assertTrue(lasso_model.predict(features[3]) > 0) + + rr_model = RidgeRegressionWithSGD.train(rdd) + self.assertTrue(rr_model.predict(features[0]) <= 0) + self.assertTrue(rr_model.predict(features[1]) > 0) + self.assertTrue(rr_model.predict(features[2]) <= 0) + self.assertTrue(rr_model.predict(features[3]) > 0) + + +if __name__ == "__main__": + if not _have_scipy: + print "NOTE: Skipping SciPy tests as it does not seem to be installed" + unittest.main() + if not _have_scipy: + print "NOTE: SciPy tests were skipped as it does not seem to be installed" diff --git a/python/run-tests b/python/run-tests index dabb714da9f5b..7bbf10d05a817 100755 --- a/python/run-tests +++ b/python/run-tests @@ -34,7 +34,7 @@ rm -rf metastore warehouse function run_test() { SPARK_TESTING=0 $FWDIR/bin/pyspark $1 2>&1 | tee -a > unit-tests.log FAILED=$((PIPESTATUS[0]||$FAILED)) - + # Fail and exit on the first test failure. if [[ $FAILED != 0 ]]; then cat unit-tests.log | grep -v "^[0-9][0-9]*" # filter all lines starting with a number. @@ -57,8 +57,10 @@ run_test "pyspark/tests.py" run_test "pyspark/mllib/_common.py" run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" +run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" +run_test "pyspark/mllib/tests.py" if [[ $FAILED == 0 ]]; then echo -en "\033[32m" # Green