Skip to content

Commit 8efa266

Browse files
committed
fixed PEP-008 violation
1 parent fa75d71 commit 8efa266

File tree

3 files changed

+31
-30
lines changed

3 files changed

+31
-30
lines changed

python/pyspark/streaming/context.py

-5
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,7 @@
1919
from signal import signal, SIGTERM, SIGINT
2020
from tempfile import NamedTemporaryFile
2121

22-
from pyspark.conf import SparkConf
23-
from pyspark.files import SparkFiles
24-
from pyspark.java_gateway import launch_gateway
2522
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
26-
from pyspark.storagelevel import *
27-
from pyspark.rdd import RDD
2823
from pyspark.context import SparkContext
2924
from pyspark.streaming.dstream import DStream
3025

python/pyspark/streaming/dstream.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def _sum(self):
4949

5050
def print_(self, label=None):
5151
"""
52-
Since print is reserved name for python, we cannot define a print method function.
52+
Since print is reserved name for python, we cannot define a "print" method function.
5353
This function prints serialized data in RDD in DStream because Scala and Java cannot
5454
deserialized pickled python object. Please use DStream.pyprint() instead to print results.
5555
@@ -159,8 +159,8 @@ def partitionBy(self, numPartitions, partitionFunc=None):
159159
# form the hash buckets in Python, transferring O(numPartitions) objects
160160
# to Java. Each object is a (splitNumber, [objects]) pair.
161161
outputSerializer = self.ctx._unbatched_serializer
162-
def add_shuffle_key(split, iterator):
163162

163+
def add_shuffle_key(split, iterator):
164164
buckets = defaultdict(list)
165165

166166
for (k, v) in iterator:
@@ -205,6 +205,11 @@ def getNumPartitions(self):
205205

206206
def foreachRDD(self, func):
207207
"""
208+
Apply userdefined function to all RDD in a DStream.
209+
This python implementation could be expensive because it uses callback server
210+
in order to apply function to RDD in DStream.
211+
This is an output operator, so this DStream will be registered as an output
212+
stream and there materialized.
208213
"""
209214
from utils import RDDFunction
210215
wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
@@ -214,7 +219,6 @@ def pyprint(self):
214219
"""
215220
Print the first ten elements of each RDD generated in this DStream. This is an output
216221
operator, so this DStream will be registered as an output stream and there materialized.
217-
218222
"""
219223
def takeAndPrint(rdd, time):
220224
taken = rdd.take(11)
@@ -235,14 +239,15 @@ def takeAndPrint(rdd, time):
235239
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
236240
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
237241

238-
def _test_output(self, buff):
242+
def _test_output(self, result):
239243
"""
240-
This function is only for testcase.
241-
Store data in dstream to buffer to valify the result in tesecase
244+
This function is only for test case.
245+
Store data in a DStream to result to verify the result in tese case
242246
"""
243247
def get_output(rdd, time):
244248
taken = rdd.collect()
245-
buff.append(taken)
249+
result.append(taken)
250+
246251
self.foreachRDD(get_output)
247252

248253

python/pyspark/streaming_tests.py

+19-18
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,10 @@
2323
to focusing to streaming test case
2424
2525
"""
26-
from fileinput import input
27-
from glob import glob
2826
from itertools import chain
2927
import os
30-
import re
31-
import shutil
32-
import subprocess
33-
import sys
34-
import tempfile
3528
import time
3629
import unittest
37-
import zipfile
3830
import operator
3931

4032
from pyspark.context import SparkContext
@@ -44,12 +36,14 @@
4436

4537
SPARK_HOME = os.environ["SPARK_HOME"]
4638

39+
4740
class StreamOutput:
4841
"""
4942
a class to store the output from stream
5043
"""
5144
result = list()
5245

46+
5347
class PySparkStreamingTestCase(unittest.TestCase):
5448
def setUp(self):
5549
class_name = self.__class__.__name__
@@ -69,6 +63,7 @@ def tearDownClass(cls):
6963
time.sleep(5)
7064
SparkContext._gateway._shutdown_callback_server()
7165

66+
7267
class TestBasicOperationsSuite(PySparkStreamingTestCase):
7368
"""
7469
Input and output of this TestBasicOperationsSuite is the equivalent to
@@ -77,7 +72,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
7772
def setUp(self):
7873
PySparkStreamingTestCase.setUp(self)
7974
StreamOutput.result = list()
80-
self.timeout = 10 # seconds
75+
self.timeout = 10 # seconds
8176

8277
def tearDown(self):
8378
PySparkStreamingTestCase.tearDown(self)
@@ -88,7 +83,8 @@ def tearDownClass(cls):
8883

8984
def test_map(self):
9085
"""Basic operation test for DStream.map"""
91-
test_input = [range(1,5), range(5,9), range(9, 13)]
86+
test_input = [range(1, 5), range(5, 9), range(9, 13)]
87+
9288
def test_func(dstream):
9389
return dstream.map(lambda x: str(x))
9490
expected_output = map(lambda x: map(lambda y: str(y), x), test_input)
@@ -97,17 +93,19 @@ def test_func(dstream):
9793

9894
def test_flatMap(self):
9995
"""Basic operation test for DStream.faltMap"""
100-
test_input = [range(1,5), range(5,9), range(9, 13)]
96+
test_input = [range(1, 5), range(5, 9), range(9, 13)]
97+
10198
def test_func(dstream):
10299
return dstream.flatMap(lambda x: (x, x * 2))
103100
expected_output = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
104-
test_input)
101+
test_input)
105102
output = self._run_stream(test_input, test_func, expected_output)
106103
self.assertEqual(expected_output, output)
107104

108105
def test_filter(self):
109106
"""Basic operation test for DStream.filter"""
110-
test_input = [range(1,5), range(5,9), range(9, 13)]
107+
test_input = [range(1, 5), range(5, 9), range(9, 13)]
108+
111109
def test_func(dstream):
112110
return dstream.filter(lambda x: x % 2 == 0)
113111
expected_output = map(lambda x: filter(lambda y: y % 2 == 0, x), test_input)
@@ -116,7 +114,8 @@ def test_func(dstream):
116114

117115
def test_count(self):
118116
"""Basic operation test for DStream.count"""
119-
test_input = [[], [1], range(1, 3), range(1,4), range(1,5)]
117+
test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]
118+
120119
def test_func(dstream):
121120
return dstream.count()
122121
expected_output = map(lambda x: [len(x)], test_input)
@@ -125,7 +124,8 @@ def test_func(dstream):
125124

126125
def test_reduce(self):
127126
"""Basic operation test for DStream.reduce"""
128-
test_input = [range(1,5), range(5,9), range(9, 13)]
127+
test_input = [range(1, 5), range(5, 9), range(9, 13)]
128+
129129
def test_func(dstream):
130130
return dstream.reduce(operator.add)
131131
expected_output = map(lambda x: [reduce(operator.add, x)], test_input)
@@ -135,19 +135,20 @@ def test_func(dstream):
135135
def test_reduceByKey(self):
136136
"""Basic operation test for DStream.reduceByKey"""
137137
test_input = [["a", "a", "b"], ["", ""], []]
138+
138139
def test_func(dstream):
139140
return dstream.map(lambda x: (x, 1)).reduceByKey(operator.add)
140-
expected_output = [[("a", 2), ("b", 1)],[("", 2)], []]
141+
expected_output = [[("a", 2), ("b", 1)], [("", 2)], []]
141142
output = self._run_stream(test_input, test_func, expected_output)
142143
self.assertEqual(expected_output, output)
143144

144145
def _run_stream(self, test_input, test_func, expected_output):
145146
"""Start stream and return the output"""
146147
# Generate input stream with user-defined input
147148
test_input_stream = self.ssc._testInputStream(test_input)
148-
# Applyed test function to stream
149+
# Applied test function to stream
149150
test_stream = test_func(test_input_stream)
150-
# Add job to get outpuf from stream
151+
# Add job to get output from stream
151152
test_stream._test_output(StreamOutput.result)
152153
self.ssc.start()
153154

0 commit comments

Comments
 (0)