File tree Expand file tree Collapse file tree 2 files changed +9
-12
lines changed Expand file tree Collapse file tree 2 files changed +9
-12
lines changed Original file line number Diff line number Diff line change @@ -174,18 +174,19 @@ def take(self, n):
174
174
"""
175
175
Return the first `n` RDDs in the stream (will start and stop).
176
176
"""
177
- rdds = []
177
+ results = []
178
178
179
179
def take (_ , rdd ):
180
- if rdd and len (rdds ) < n :
181
- rdds .append (rdd )
180
+ if rdd and len (results ) < n :
181
+ results .extend (rdd .take (n - len (results )))
182
+
182
183
self .foreachRDD (take )
183
184
184
185
self ._ssc .start ()
185
- while len (rdds ) < n :
186
+ while len (results ) < n :
186
187
time .sleep (0.01 )
187
188
self ._ssc .stop (False , True )
188
- return rdds
189
+ return results
189
190
190
191
def collect (self ):
191
192
"""
Original file line number Diff line number Diff line change @@ -87,16 +87,12 @@ class TestBasicOperations(PySparkStreamingTestCase):
87
87
def test_take (self ):
88
88
input = [range (i ) for i in range (3 )]
89
89
dstream = self .ssc .queueStream (input )
90
- rdds = dstream .take (3 )
91
- self .assertEqual (3 , len (rdds ))
92
- for d , rdd in zip (input , rdds ):
93
- self .assertEqual (d , rdd .collect ())
90
+ self .assertEqual ([0 , 0 , 1 ], dstream .take (3 ))
94
91
95
92
def test_first (self ):
96
93
input = [range (10 )]
97
94
dstream = self .ssc .queueStream (input )
98
- rdd = dstream .first ()
99
- self .assertEqual (range (10 ), rdd .collect ())
95
+ self .assertEqual (0 , dstream )
100
96
101
97
def test_map (self ):
102
98
"""Basic operation test for DStream.map."""
@@ -385,7 +381,7 @@ def func(rdds):
385
381
386
382
dstream = self .ssc .transform ([dstream1 , dstream2 , dstream3 ], func )
387
383
388
- self .assertEqual ([2 , 3 , 1 ], dstream .first (). collect ( ))
384
+ self .assertEqual ([2 , 3 , 1 ], dstream .take ( 3 ))
389
385
390
386
391
387
if __name__ == "__main__" :
You can’t perform that action at this time.
0 commit comments