17
17
18
18
from itertools import chain , ifilter , imap
19
19
import operator
20
+ from datetime import datetime
20
21
21
22
from pyspark import RDD
22
23
from pyspark .storagelevel import StorageLevel
@@ -54,17 +55,6 @@ def sum(self):
54
55
"""
55
56
return self .mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
56
57
57
- def print_ (self , label = None ):
58
- """
59
- Since print is reserved name for python, we cannot define a "print" method function.
60
- This function prints serialized data in RDD in DStream because Scala and Java cannot
61
- deserialized pickled python object. Please use DStream.pyprint() to print results.
62
-
63
- Call DStream.print() and this function will print byte array in the DStream
64
- """
65
- # a hack to call print function in DStream
66
- getattr (self ._jdstream , "print" )(label )
67
-
68
58
def filter (self , f ):
69
59
"""
70
60
Return a new DStream containing only the elements that satisfy predicate.
@@ -154,19 +144,15 @@ def foreachRDD(self, func):
154
144
jfunc = RDDFunction (self .ctx , func , self ._jrdd_deserializer )
155
145
self .ctx ._jvm .PythonForeachDStream (self ._jdstream .dstream (), jfunc )
156
146
157
- def pyprint (self ):
147
+ def pprint (self ):
158
148
"""
159
149
Print the first ten elements of each RDD generated in this DStream. This is an output
160
150
operator, so this DStream will be registered as an output stream and there materialized.
161
151
"""
162
152
def takeAndPrint (rdd , time ):
163
- """
164
- Closure to take element from RDD and print first 10 elements.
165
- This closure is called by py4j callback server.
166
- """
167
153
taken = rdd .take (11 )
168
154
print "-------------------------------------------"
169
- print "Time: %s" % ( str ( time ) )
155
+ print "Time: %s" % datetime . fromtimestamp ( time / 1000.0 )
170
156
print "-------------------------------------------"
171
157
for record in taken [:10 ]:
172
158
print record
@@ -176,6 +162,20 @@ def takeAndPrint(rdd, time):
176
162
177
163
self .foreachRDD (takeAndPrint )
178
164
165
+ def collect (self ):
166
+ """
167
+ Collect each RDDs into the returned list.
168
+
169
+ :return: list, which will have the collected items.
170
+ """
171
+ result = []
172
+
173
+ def get_output (rdd , time ):
174
+ r = rdd .collect ()
175
+ result .append (r )
176
+ self .foreachRDD (get_output )
177
+ return result
178
+
179
179
def mapValues (self , f ):
180
180
"""
181
181
Pass each value in the key-value pair RDD through a map function
@@ -196,9 +196,9 @@ def flatMapValues(self, f):
196
196
197
197
def glom (self ):
198
198
"""
199
- Return a new DStream in which RDD is generated by applying glom() to RDD of
200
- this DStream. Applying glom() to an RDD coalesces all elements within each partition into
201
- an list.
199
+ Return a new DStream in which RDD is generated by applying glom()
200
+ to RDD of this DStream. Applying glom() to an RDD coalesces all
201
+ elements within each partition into an list.
202
202
"""
203
203
def func (iterator ):
204
204
yield list (iterator )
@@ -228,11 +228,11 @@ def checkpoint(self, interval):
228
228
Mark this DStream for checkpointing. It will be saved to a file inside the
229
229
checkpoint directory set with L{SparkContext.setCheckpointDir()}
230
230
231
- @param interval: Time interval after which generated RDD will be checkpointed
232
- interval has to be pyspark.streaming.duration.Duration
231
+ @param interval: time in seconds, after which generated RDD will
232
+ be checkpointed
233
233
"""
234
234
self .is_checkpointed = True
235
- self ._jdstream .checkpoint (interval . _jduration )
235
+ self ._jdstream .checkpoint (self . _ssc . _jduration ( interval ) )
236
236
return self
237
237
238
238
def groupByKey (self , numPartitions = None ):
@@ -245,7 +245,6 @@ def groupByKey(self, numPartitions=None):
245
245
Note: If you are grouping in order to perform an aggregation (such as a
246
246
sum or average) over each key, using reduceByKey will provide much
247
247
better performance.
248
-
249
248
"""
250
249
return self .transform (lambda rdd : rdd .groupByKey (numPartitions ))
251
250
@@ -288,15 +287,6 @@ def saveAsPickleFile(rdd, time):
288
287
289
288
return self .foreachRDD (saveAsPickleFile )
290
289
291
- def collect (self ):
292
- result = []
293
-
294
- def get_output (rdd , time ):
295
- r = rdd .collect ()
296
- result .append (r )
297
- self .foreachRDD (get_output )
298
- return result
299
-
300
290
def transform (self , func ):
301
291
return TransformedDStream (self , lambda a , t : func (a ), True )
302
292
0 commit comments