Skip to content

Commit b983f0f

Browse files
committed
address comments
1 parent 847f9b9 commit b983f0f

File tree

10 files changed

+61
-53
lines changed

10 files changed

+61
-53
lines changed

bin/pyspark

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,7 @@ export PYSPARK_SUBMIT_ARGS
8787
if [[ -n "$SPARK_TESTING" ]]; then
8888
unset YARN_CONF_DIR
8989
unset HADOOP_CONF_DIR
90-
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
91-
exec "$PYSPARK_PYTHON" -m doctest $1
92-
else
93-
exec "$PYSPARK_PYTHON" $1
94-
fi
90+
exec "$PYSPARK_PYTHON" $1
9591
exit
9692
fi
9793

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
293293
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
294294
* This is used by PySpark's shuffle operations.
295295
*/
296-
private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends
296+
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
297297
RDD[(Long, Array[Byte])](prev) {
298298
override def getPartitions = prev.partitions
299299
override def compute(split: Partition, context: TaskContext) =

examples/src/main/python/streaming/network_wordcount.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
counts = lines.flatMap(lambda line: line.split(" "))\
1515
.map(lambda word: (word, 1))\
1616
.reduceByKey(lambda a, b: a+b)
17-
counts.pyprint()
17+
counts.pprint()
1818

1919
ssc.start()
2020
ssc.awaitTermination()

examples/src/main/python/streaming/wordcount.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
counts = lines.flatMap(lambda line: line.split(" "))\
1616
.map(lambda x: (x, 1))\
1717
.reduceByKey(lambda a, b: a+b)
18-
counts.pyprint()
18+
counts.pprint()
1919

2020
ssc.start()
2121
ssc.awaitTermination()

python/pyspark/accumulators.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,8 @@ def _start_update_server():
256256
thread.daemon = True
257257
thread.start()
258258
return server
259+
260+
261+
if __name__ == "__main__":
262+
import doctest
263+
doctest.testmod()

python/pyspark/serializers.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,3 +526,8 @@ def write_int(value, stream):
526526
def write_with_length(obj, stream):
527527
write_int(len(obj), stream)
528528
stream.write(obj)
529+
530+
531+
if __name__ == "__main__":
532+
import doctest
533+
doctest.testmod()

python/pyspark/streaming/dstream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ def fullOuterJoin(self, other, numPartitions=None):
410410
return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other)
411411

412412
def _jtime(self, timestamp):
413-
""" convert datetime or unix_timestamp into Time
413+
""" Convert datetime or unix_timestamp into Time
414414
"""
415415
if isinstance(timestamp, datetime):
416416
timestamp = time.mktime(timestamp.timetuple())

python/pyspark/streaming/tests.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
from pyspark.context import SparkContext
3131
from pyspark.streaming.context import StreamingContext
32-
from pyspark.streaming.duration import Seconds
3332

3433

3534
class PySparkStreamingTestCase(unittest.TestCase):
@@ -46,11 +45,6 @@ def setUp(self):
4645
def tearDown(self):
4746
self.ssc.stop()
4847

49-
@classmethod
50-
def tearDownClass(cls):
51-
# Make sure tp shutdown the callback server
52-
SparkContext._gateway._shutdown_callback_server()
53-
5448
def _test_func(self, input, func, expected, sort=False):
5549
"""
5650
@param input: dataset for the test. This should be list of lists.

python/pyspark/streaming/util.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,8 @@ def rddToFileName(prefix, suffix, time):
6464
return prefix + "-" + str(time)
6565
else:
6666
return prefix + "-" + str(time) + "." + suffix
67+
68+
69+
if __name__ == "__main__":
70+
import doctest
71+
doctest.testmod()

python/run-tests

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,39 @@ function run_test() {
4848
fi
4949
}
5050

51+
function run_core_tests() {
52+
run_test "pyspark/conf.py"
53+
run_test "pyspark/context.py"
54+
run_test "pyspark/broadcast.py"
55+
run_test "pyspark/accumulators.py"
56+
run_test "pyspark/serializers.py"
57+
run_test "pyspark/shuffle.py"
58+
run_test "pyspark/rdd.py"
59+
run_test "pyspark/tests.py"
60+
}
61+
62+
function run_sql_tests() {
63+
run_test "pyspark/sql.py"
64+
}
65+
66+
function run_mllib_tests() {
67+
run_test "pyspark/mllib/util.py"
68+
run_test "pyspark/mllib/linalg.py"
69+
run_test "pyspark/mllib/classification.py"
70+
run_test "pyspark/mllib/clustering.py"
71+
run_test "pyspark/mllib/random.py"
72+
run_test "pyspark/mllib/recommendation.py"
73+
run_test "pyspark/mllib/regression.py"
74+
run_test "pyspark/mllib/stat.py"
75+
run_test "pyspark/mllib/tree.py"
76+
run_test "pyspark/mllib/tests.py"
77+
}
78+
79+
function run_streaming_tests() {
80+
run_test "pyspark/streaming/util.py"
81+
run_test "pyspark/streaming/tests.py"
82+
}
83+
5184
echo "Running PySpark tests. Output is in python/unit-tests.log."
5285

5386
export PYSPARK_PYTHON="python"
@@ -60,51 +93,21 @@ fi
6093
echo "Testing with Python version:"
6194
$PYSPARK_PYTHON --version
6295

63-
run_test "pyspark/rdd.py"
64-
run_test "pyspark/context.py"
65-
run_test "pyspark/conf.py"
66-
run_test "pyspark/sql.py"
67-
# These tests are included in the module-level docs, and so must
68-
# be handled on a higher level rather than within the python file.
69-
export PYSPARK_DOC_TEST=1
70-
run_test "pyspark/broadcast.py"
71-
run_test "pyspark/accumulators.py"
72-
run_test "pyspark/serializers.py"
73-
unset PYSPARK_DOC_TEST
74-
run_test "pyspark/shuffle.py"
75-
run_test "pyspark/tests.py"
76-
run_test "pyspark/mllib/classification.py"
77-
run_test "pyspark/mllib/clustering.py"
78-
run_test "pyspark/mllib/linalg.py"
79-
run_test "pyspark/mllib/random.py"
80-
run_test "pyspark/mllib/recommendation.py"
81-
run_test "pyspark/mllib/regression.py"
82-
run_test "pyspark/mllib/stat.py"
83-
run_test "pyspark/mllib/tests.py"
84-
run_test "pyspark/mllib/tree.py"
85-
run_test "pyspark/mllib/util.py"
86-
run_test "pyspark/streaming/tests.py"
96+
#run_core_tests
97+
#run_sql_tests
98+
#run_mllib_tests
99+
run_streaming_tests
87100

88101
# Try to test with PyPy
89102
if [ $(which pypy) ]; then
90103
export PYSPARK_PYTHON="pypy"
91104
echo "Testing with PyPy version:"
92105
$PYSPARK_PYTHON --version
93106

94-
run_test "pyspark/rdd.py"
95-
run_test "pyspark/context.py"
96-
run_test "pyspark/conf.py"
97-
run_test "pyspark/sql.py"
98-
# These tests are included in the module-level docs, and so must
99-
# be handled on a higher level rather than within the python file.
100-
export PYSPARK_DOC_TEST=1
101-
run_test "pyspark/broadcast.py"
102-
run_test "pyspark/accumulators.py"
103-
run_test "pyspark/serializers.py"
104-
unset PYSPARK_DOC_TEST
105-
run_test "pyspark/shuffle.py"
106-
run_test "pyspark/tests.py"
107-
run_test "pyspark/streaming/tests.py"
107+
run_core_tests
108+
run_sql_tests
109+
run_mllib_tests
110+
run_streaming_tests
108111
fi
109112

110113
if [[ $FAILED == 0 ]]; then

0 commit comments

Comments
 (0)