@@ -71,8 +71,9 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
71
71
"""
72
72
def setUp (self ):
73
73
PySparkStreamingTestCase .setUp (self )
74
- StreamOutput .result = list ()
75
74
self .timeout = 10 # seconds
75
+ self .numInputPartitions = 2
76
+ self .result = list ()
76
77
77
78
def tearDown (self ):
78
79
PySparkStreamingTestCase .tearDown (self )
@@ -137,6 +138,8 @@ def test_reduceByKey(self):
137
138
test_input = [["a" , "a" , "b" ], ["" , "" ], []]
138
139
139
140
def test_func (dstream ):
141
+ print "reduceByKey"
142
+ dstream .map (lambda x : (x , 1 )).pyprint ()
140
143
return dstream .map (lambda x : (x , 1 )).reduceByKey (operator .add )
141
144
expected_output = [[("a" , 2 ), ("b" , 1 )], [("" , 2 )], []]
142
145
output = self ._run_stream (test_input , test_func , expected_output )
@@ -168,9 +171,8 @@ def test_glom(self):
168
171
numSlices = 2
169
172
170
173
def test_func (dstream ):
171
- dstream .pyprint ()
172
174
return dstream .glom ()
173
- expected_output = [[[1 ,2 ], [3 ,4 ]],[[5 ,6 ], [7 ,8 ]],[[9 ,10 ], [11 ,12 ]]]
175
+ expected_output = [[[1 ,2 ], [3 ,4 ]], [[5 ,6 ], [7 ,8 ]], [[9 ,10 ], [11 ,12 ]]]
174
176
output = self ._run_stream (test_input , test_func , expected_output , numSlices )
175
177
self .assertEqual (expected_output , output )
176
178
@@ -180,20 +182,21 @@ def test_mapPartitions(self):
180
182
numSlices = 2
181
183
182
184
def test_func (dstream ):
183
- dstream . pyprint ( )
184
- return dstream .mapPartitions (lambda x : reduce ( operator . add , x ) )
185
- expected_output = [[3 , 7 ],[11 , 15 ],[19 , 23 ]]
185
+ def f ( iterator ): yield sum ( iterator )
186
+ return dstream .mapPartitions (f )
187
+ expected_output = [[3 , 7 ], [11 , 15 ], [19 , 23 ]]
186
188
output = self ._run_stream (test_input , test_func , expected_output , numSlices )
187
189
self .assertEqual (expected_output , output )
188
190
189
191
def _run_stream (self , test_input , test_func , expected_output , numSlices = None ):
190
192
"""Start stream and return the output"""
191
193
# Generate input stream with user-defined input
192
- test_input_stream = self .ssc ._testInputStream (test_input , numSlices )
194
+ numSlices = numSlices or self .numInputPartitions
195
+ test_input_stream = self .ssc ._testInputStream2 (test_input , numSlices )
193
196
# Apply test function to stream
194
197
test_stream = test_func (test_input_stream )
195
198
# Add job to get output from stream
196
- test_stream ._test_output (StreamOutput .result )
199
+ test_stream ._test_output (self .result )
197
200
self .ssc .start ()
198
201
199
202
start_time = time .time ()
@@ -205,9 +208,9 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
205
208
break
206
209
self .ssc .awaitTermination (50 )
207
210
# check if the output is the same length of expexted output
208
- if len (expected_output ) == len (StreamOutput .result ):
211
+ if len (expected_output ) == len (self .result ):
209
212
break
210
- return StreamOutput .result
213
+ return self .result
211
214
212
215
if __name__ == "__main__" :
213
216
unittest .main ()
0 commit comments