Skip to content

[SPARK-11295] [PYSPARK] Add packages to JUnit output for Python tests #10850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ def test_fit_maximize_metric(self):


if __name__ == "__main__":
from pyspark.ml.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
else:
Expand Down
26 changes: 15 additions & 11 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,24 @@
pass

ser = PickleSerializer()
sc = SparkContext('local[4]', "MLlib tests")


class MLlibTestCase(unittest.TestCase):
def setUp(self):
self.sc = sc
self.sc = SparkContext('local[4]', "MLlib tests")

def tearDown(self):
self.sc.stop()


class MLLibStreamingTestCase(unittest.TestCase):
def setUp(self):
self.sc = sc
self.sc = SparkContext('local[4]', "MLlib tests")
self.ssc = StreamingContext(self.sc, 1.0)

def tearDown(self):
self.ssc.stop(False)
self.sc.stop()

@staticmethod
def _eventually(condition, timeout=30.0, catch_assertions=False):
Expand Down Expand Up @@ -423,7 +426,7 @@ def test_bisecting_kmeans(self):
from pyspark.mllib.clustering import BisectingKMeans
data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2)
bskm = BisectingKMeans()
model = bskm.train(sc.parallelize(data, 2), k=4)
model = bskm.train(self.sc.parallelize(data, 2), k=4)
p = array([0.0, 0.0])
rdd_p = self.sc.parallelize([p])
self.assertEqual(model.predict(p), model.predict(rdd_p).first())
Expand Down Expand Up @@ -1166,7 +1169,7 @@ def test_predictOn_model(self):
clusterWeights=[1.0, 1.0, 1.0, 1.0])

predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
predict_data = [self.sc.parallelize(batch, 1) for batch in predict_data]
predict_stream = self.ssc.queueStream(predict_data)
predict_val = stkm.predictOn(predict_stream)

Expand Down Expand Up @@ -1197,7 +1200,7 @@ def test_trainOn_predictOn(self):
# classification based in the initial model would have been 0
# proving that the model is updated.
batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
batches = [sc.parallelize(batch) for batch in batches]
batches = [self.sc.parallelize(batch) for batch in batches]
input_stream = self.ssc.queueStream(batches)
predict_results = []

Expand Down Expand Up @@ -1230,7 +1233,7 @@ def test_dim(self):
self.assertEqual(len(point.features), 3)

linear_data = LinearDataGenerator.generateLinearRDD(
sc=sc, nexamples=6, nfeatures=2, eps=0.1,
sc=self.sc, nexamples=6, nfeatures=2, eps=0.1,
nParts=2, intercept=0.0).collect()
self.assertEqual(len(linear_data), 6)
for point in linear_data:
Expand Down Expand Up @@ -1406,7 +1409,7 @@ def test_parameter_accuracy(self):
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
batches.append(sc.parallelize(batch))
batches.append(self.sc.parallelize(batch))

input_stream = self.ssc.queueStream(batches)
slr.trainOn(input_stream)
Expand All @@ -1430,7 +1433,7 @@ def test_parameter_convergence(self):
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
batches.append(sc.parallelize(batch))
batches.append(self.sc.parallelize(batch))

model_weights = []
input_stream = self.ssc.queueStream(batches)
Expand Down Expand Up @@ -1463,7 +1466,7 @@ def test_prediction(self):
0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
100, 42 + i, 0.1)
batches.append(
sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
self.sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))

input_stream = self.ssc.queueStream(batches)
output_stream = slr.predictOnValues(input_stream)
Expand Down Expand Up @@ -1494,7 +1497,7 @@ def test_train_prediction(self):
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
batches.append(sc.parallelize(batch))
batches.append(self.sc.parallelize(batch))

predict_batches = [
b.map(lambda lp: (lp.label, lp.features)) for b in batches]
Expand Down Expand Up @@ -1580,6 +1583,7 @@ def test_als_ratings_id_long_error(self):


if __name__ == "__main__":
from pyspark.mllib.tests import *
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if xmlrunner:
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ def test_collect_functions(self):


if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
else:
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,7 @@ def search_kinesis_asl_assembly_jar():
are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'

if __name__ == "__main__":
from pyspark.streaming.tests import *
kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar()
mqtt_assembly_jar = search_mqtt_assembly_jar()
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,7 @@ def test_statcounter_array(self):


if __name__ == "__main__":
from pyspark.tests import *
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if not _have_numpy:
Expand Down