Skip to content

Commit

Permalink
Added some tests for passing lists as input, and added mllib/tests.py to
Browse files Browse the repository at this point in the history
run-tests script.
  • Loading branch information
mateiz committed Apr 15, 2014
1 parent a07ba10 commit c48e85a
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 24 deletions.
151 changes: 127 additions & 24 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
_deserialize_double_vector, _dot, _squared_distance
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.tests import PySparkTestCase


Expand All @@ -41,16 +42,21 @@ class VectorTests(unittest.TestCase):
def test_serialize(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = array([1., 2., 3., 4.])
lst = [1, 2, 3, 4]
self.assertTrue(sv is _convert_vector(sv))
self.assertTrue(dv is _convert_vector(dv))
self.assertTrue(array_equal(dv, _convert_vector(lst)))
self.assertEquals(sv,
_deserialize_double_vector(_serialize_double_vector(sv)))
self.assertTrue(array_equal(dv,
_deserialize_double_vector(_serialize_double_vector(dv))))
self.assertTrue(array_equal(dv,
_deserialize_double_vector(_serialize_double_vector(lst))))

def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = array([1., 2., 3., 4.])
lst = [1, 2, 3, 4]
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
Expand All @@ -59,10 +65,109 @@ def test_dot(self):
self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
self.assertEquals(30.0, _dot(dv, dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
self.assertEquals(30.0, _dot(lst, dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))

def test_squared_distance(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = array([1., 2., 3., 4.])
lst = [4, 3, 2, 1]
self.assertEquals(15.0, _squared_distance(sv, dv))
self.assertEquals(25.0, _squared_distance(sv, lst))
self.assertEquals(20.0, _squared_distance(dv, lst))
self.assertEquals(15.0, _squared_distance(dv, sv))
self.assertEquals(25.0, _squared_distance(lst, sv))
self.assertEquals(20.0, _squared_distance(lst, dv))
self.assertEquals(0.0, _squared_distance(sv, sv))
self.assertEquals(0.0, _squared_distance(dv, dv))
self.assertEquals(0.0, _squared_distance(lst, lst))


class ListTests(PySparkTestCase):
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
as NumPy arrays.
"""

def test_clustering(self):
from pyspark.mllib.clustering import KMeans
data = [
[0, 1.1],
[0, 1.2],
[1.1, 0],
[1.2, 0],
]
clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))

def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
data = [
LabeledPoint(0.0, [1, 0]),
LabeledPoint(1.0, [0, 1]),
LabeledPoint(0.0, [2, 0]),
LabeledPoint(1.0, [0, 2])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]

lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)

svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)

nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)

def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
data = [
LabeledPoint(-1.0, [0, -1]),
LabeledPoint(1.0, [0, 1]),
LabeledPoint(-1.0, [0, -2]),
LabeledPoint(1.0, [0, 2])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]

lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)

lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)

rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)


@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):
"""
Test both vector operations and MLlib algorithms with SciPy sparse matrices,
if SciPy is available.
"""

def test_serialize(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
Expand Down Expand Up @@ -132,63 +237,61 @@ def test_clustering(self):
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
data = [
self.scipy_matrix(3, {0: 0.0, 2: 0.0}),
self.scipy_matrix(3, {0: 1.0, 2: 1.0}),
self.scipy_matrix(3, {0: 0.0, 2: 0.0}),
self.scipy_matrix(3, {0: 1.0, 2: 2.0})
LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [
self.scipy_matrix(2, {1: 0.0}),
self.scipy_matrix(2, {1: 1.0}),
self.scipy_matrix(2, {1: 2.0}),
]
features = [p.features for p in data]

lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)

svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)

nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)

def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
data = [
self.scipy_matrix(3, {0: -1.0, 2: -1.0}),
self.scipy_matrix(3, {0: 1.0, 2: 1.0}),
self.scipy_matrix(3, {0: -1.0, 2: -2.0}),
self.scipy_matrix(3, {0: 1.0, 2: 2.0})
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [
self.scipy_matrix(2, {1: -1.0}),
self.scipy_matrix(2, {1: 1.0}),
self.scipy_matrix(2, {1: 2.0}),
]
features = [p.features for p in data]

lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)

lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)

rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/tests.py"

if [[ $FAILED == 0 ]]; then
echo -en "\033[32m" # Green
Expand Down

0 comments on commit c48e85a

Please sign in to comment.