1
+ #
2
+ # Licensed to the Apache Software Foundation (ASF) under one or more
3
+ # contributor license agreements. See the NOTICE file distributed with
4
+ # this work for additional information regarding copyright ownership.
5
+ # The ASF licenses this file to You under the Apache License, Version 2.0
6
+ # (the "License"); you may not use this file except in compliance with
7
+ # the License. You may obtain a copy of the License at
8
+ #
9
+ # http://www.apache.org/licenses/LICENSE-2.0
10
+ #
11
+ # Unless required by applicable law or agreed to in writing, software
12
+ # distributed under the License is distributed on an "AS IS" BASIS,
13
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ # See the License for the specific language governing permissions and
15
+ # limitations under the License.
16
+ #
17
+
1
18
from collections import defaultdict
2
19
from itertools import chain , ifilter , imap
3
20
import operator
@@ -20,11 +37,13 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
20
37
21
38
def count (self ):
22
39
"""
40
+ Return a new DStream which contains the number of elements in this DStream.
23
41
"""
24
42
return self ._mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
25
43
26
44
def _sum (self ):
27
45
"""
46
+ Add up the elements in this DStream.
28
47
"""
29
48
return self ._mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
30
49
@@ -41,7 +60,7 @@ def print_(self):
41
60
42
61
def filter (self , f ):
43
62
"""
44
- Return DStream containing only the elements that satisfy predicate.
63
+ Return a new DStream containing only the elements that satisfy predicate.
45
64
"""
46
65
def func (iterator ): return ifilter (f , iterator )
47
66
return self ._mapPartitions (func )
@@ -56,7 +75,7 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
56
75
57
76
def map (self , f ):
58
77
"""
59
- Return DStream by applying a function to each element of DStream.
78
+ Return a new DStream by applying a function to each element of DStream.
60
79
"""
61
80
def func (iterator ): return imap (f , iterator )
62
81
return self ._mapPartitions (func )
@@ -71,12 +90,14 @@ def func(s, iterator): return f(iterator)
71
90
def _mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
72
91
"""
73
92
Return a new DStream by applying a function to each partition of this DStream,
74
- While tracking the index of the original partition.
93
+ while tracking the index of the original partition.
75
94
"""
76
95
return PipelinedDStream (self , f , preservesPartitioning )
77
96
78
97
def reduce (self , func ):
79
98
"""
99
+ Return a new DStream by reduceing the elements of this RDD using the specified
100
+ commutative and associative binary operator.
80
101
"""
81
102
return self .map (lambda x : (None , x )).reduceByKey (func , 1 ).map (lambda x : x [1 ])
82
103
@@ -267,4 +288,3 @@ def _jdstream(self):
267
288
268
289
def _is_pipelinable (self ):
269
290
return not (self .is_cached )
270
-
0 commit comments