@@ -253,53 +253,6 @@ def _test_func(self, input, func, expected, numSlices=None, sort=False):
253
253
self .assertEqual (expected , result )
254
254
255
255
256
- class TestTransform (PySparkStreamingTestCase ):
257
- def setUp (self ):
258
- PySparkStreamingTestCase .setUp (self )
259
- self .timeout = 10
260
-
261
- def test_transform (self ):
262
- input = [range (1 , 5 ), range (5 , 9 ), range (9 , 13 )]
263
-
264
- def func (stream ):
265
- return stream .transform (lambda r : r and r .map (str ))
266
-
267
- expected = map (lambda x : map (str , x ), input )
268
- self ._test_func (input , func , expected )
269
- self .assertEqual (expected , output )
270
-
271
- def _test_func (self , input , func , expected , numSlices = None ):
272
- """
273
- Start stream and return the result.
274
- @param input: dataset for the test. This should be list of lists.
275
- @param func: wrapped function. This function should return PythonDStream object.
276
- @param expected: expected output for this testcase.
277
- @param numSlices: the number of slices in the rdd in the dstream.
278
- """
279
- # Generate input stream with user-defined input.
280
- input_stream = self .ssc ._makeStream (input , numSlices )
281
- # Apply test function to stream.
282
- stream = func (input_stream )
283
- result = stream .collect ()
284
- self .ssc .start ()
285
-
286
- start_time = time .time ()
287
- # Loop until get the expected the number of the result from the stream.
288
- while True :
289
- current_time = time .time ()
290
- # Check time out.
291
- if (current_time - start_time ) > self .timeout :
292
- break
293
- # StreamingContext.awaitTermination is not used to wait because
294
- # if py4j server is called every 50 milliseconds, it gets an error.
295
- time .sleep (0.05 )
296
- # Check if the output is the same length of expected output.
297
- if len (expected ) == len (result ):
298
- break
299
-
300
- return result
301
-
302
-
303
256
class TestStreamingContext (unittest .TestCase ):
304
257
"""
305
258
Should we have conf property in SparkContext?
0 commit comments