@@ -284,7 +284,7 @@ def transform(self, func):
284
284
on each RDD of 'this' DStream.
285
285
286
286
`func` can have one argument of `rdd`, or have two arguments of
287
- (`time`, `rdd`)
287
+ (`time`, `rdd`)
288
288
"""
289
289
resue = False
290
290
if func .func_code .co_argcount == 1 :
@@ -328,7 +328,8 @@ def _slideDuration(self):
328
328
def union (self , other ):
329
329
"""
330
330
Return a new DStream by unifying data of another DStream with this DStream.
331
- @param other Another DStream having the same interval (i.e., slideDuration)
331
+
332
+ @param other: Another DStream having the same interval (i.e., slideDuration)
332
333
as this DStream.
333
334
"""
334
335
if self ._slideDuration != other ._slideDuration :
@@ -348,47 +349,47 @@ def cogroup(self, other, numPartitions=None):
348
349
349
350
def join (self , other , numPartitions = None ):
350
351
"""
351
- Return a new DStream by applying 'join' between RDDs of `this` DStream and
352
+ Return a new DStream by applying 'join' between RDDs of `this` DStream and
352
353
`other` DStream.
353
354
354
355
Hash partitioning is used to generate the RDDs with `numPartitions`
355
- partitions.
356
+ partitions.
356
357
"""
357
358
if numPartitions is None :
358
359
numPartitions = self .ctx .defaultParallelism
359
360
return self .transformWith (lambda a , b : a .join (b , numPartitions ), other )
360
361
361
362
def leftOuterJoin (self , other , numPartitions = None ):
362
363
"""
363
- Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
364
+ Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
364
365
`other` DStream.
365
366
366
367
Hash partitioning is used to generate the RDDs with `numPartitions`
367
- partitions.
368
+ partitions.
368
369
"""
369
370
if numPartitions is None :
370
371
numPartitions = self .ctx .defaultParallelism
371
372
return self .transformWith (lambda a , b : a .leftOuterJoin (b , numPartitions ), other )
372
373
373
374
def rightOuterJoin (self , other , numPartitions = None ):
374
375
"""
375
- Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
376
+ Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
376
377
`other` DStream.
377
378
378
379
Hash partitioning is used to generate the RDDs with `numPartitions`
379
- partitions.
380
+ partitions.
380
381
"""
381
382
if numPartitions is None :
382
383
numPartitions = self .ctx .defaultParallelism
383
384
return self .transformWith (lambda a , b : a .rightOuterJoin (b , numPartitions ), other )
384
385
385
386
def fullOuterJoin (self , other , numPartitions = None ):
386
387
"""
387
- Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
388
+ Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
388
389
`other` DStream.
389
390
390
391
Hash partitioning is used to generate the RDDs with `numPartitions`
391
- partitions.
392
+ partitions.
392
393
"""
393
394
if numPartitions is None :
394
395
numPartitions = self .ctx .defaultParallelism
@@ -424,9 +425,9 @@ def window(self, windowDuration, slideDuration=None):
424
425
Return a new DStream in which each RDD contains all the elements in seen in a
425
426
sliding window of time over this DStream.
426
427
427
- @param windowDuration width of the window; must be a multiple of this DStream's
428
+ @param windowDuration: width of the window; must be a multiple of this DStream's
428
429
batching interval
429
- @param slideDuration sliding interval of the window (i.e., the interval after which
430
+ @param slideDuration: sliding interval of the window (i.e., the interval after which
430
431
the new DStream will generate RDDs); must be a multiple of this
431
432
DStream's batching interval
432
433
"""
@@ -448,13 +449,13 @@ def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuratio
448
449
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
449
450
This is more efficient than `invReduceFunc` is None.
450
451
451
- @param reduceFunc associative reduce function
452
- @param invReduceFunc inverse reduce function of `reduceFunc`
453
- @param windowDuration width of the window; must be a multiple of this DStream's
454
- batching interval
455
- @param slideDuration sliding interval of the window (i.e., the interval after which
456
- the new DStream will generate RDDs); must be a multiple of this
457
- DStream's batching interval
452
+ @param reduceFunc: associative reduce function
453
+ @param invReduceFunc: inverse reduce function of `reduceFunc`
454
+ @param windowDuration: width of the window; must be a multiple of this DStream's
455
+ batching interval
456
+ @param slideDuration: sliding interval of the window (i.e., the interval after which
457
+ the new DStream will generate RDDs); must be a multiple of this
458
+ DStream's batching interval
458
459
"""
459
460
keyed = self .map (lambda x : (1 , x ))
460
461
reduced = keyed .reduceByKeyAndWindow (reduceFunc , invReduceFunc ,
@@ -478,12 +479,12 @@ def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=Non
478
479
Return a new DStream in which each RDD contains the count of distinct elements in
479
480
RDDs in a sliding window over this DStream.
480
481
481
- @param windowDuration width of the window; must be a multiple of this DStream's
482
+ @param windowDuration: width of the window; must be a multiple of this DStream's
482
483
batching interval
483
- @param slideDuration sliding interval of the window (i.e., the interval after which
484
+ @param slideDuration: sliding interval of the window (i.e., the interval after which
484
485
the new DStream will generate RDDs); must be a multiple of this
485
486
DStream's batching interval
486
- @param numPartitions number of partitions of each RDD in the new DStream.
487
+ @param numPartitions: number of partitions of each RDD in the new DStream.
487
488
"""
488
489
keyed = self .map (lambda x : (x , 1 ))
489
490
counted = keyed .reduceByKeyAndWindow (operator .add , operator .sub ,
@@ -495,12 +496,12 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None)
495
496
Return a new DStream by applying `groupByKey` over a sliding window.
496
497
Similar to `DStream.groupByKey()`, but applies it over a sliding window.
497
498
498
- @param windowDuration width of the window; must be a multiple of this DStream's
499
+ @param windowDuration: width of the window; must be a multiple of this DStream's
499
500
batching interval
500
- @param slideDuration sliding interval of the window (i.e., the interval after which
501
+ @param slideDuration: sliding interval of the window (i.e., the interval after which
501
502
the new DStream will generate RDDs); must be a multiple of this
502
503
DStream's batching interval
503
- @param numPartitions Number of partitions of each RDD in the new DStream.
504
+ @param numPartitions: Number of partitions of each RDD in the new DStream.
504
505
"""
505
506
ls = self .mapValues (lambda x : [x ])
506
507
grouped = ls .reduceByKeyAndWindow (lambda a , b : a .extend (b ) or a , lambda a , b : a [len (b ):],
@@ -519,15 +520,15 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None
519
520
`invFunc` can be None, then it will reduce all the RDDs in window, could be slower
520
521
than having `invFunc`.
521
522
522
- @param reduceFunc associative reduce function
523
- @param invReduceFunc inverse function of `reduceFunc`
524
- @param windowDuration width of the window; must be a multiple of this DStream's
523
+ @param reduceFunc: associative reduce function
524
+ @param invReduceFunc: inverse function of `reduceFunc`
525
+ @param windowDuration: width of the window; must be a multiple of this DStream's
525
526
batching interval
526
- @param slideDuration sliding interval of the window (i.e., the interval after which
527
+ @param slideDuration: sliding interval of the window (i.e., the interval after which
527
528
the new DStream will generate RDDs); must be a multiple of this
528
529
DStream's batching interval
529
- @param numPartitions number of partitions of each RDD in the new DStream.
530
- @param filterFunc function to filter expired key-value pairs;
530
+ @param numPartitions: number of partitions of each RDD in the new DStream.
531
+ @param filterFunc: function to filter expired key-value pairs;
531
532
only pairs that satisfy the function are retained
532
533
set this to null if you do not want to filter
533
534
"""
@@ -567,7 +568,7 @@ def updateStateByKey(self, updateFunc, numPartitions=None):
567
568
Return a new "state" DStream where the state for each key is updated by applying
568
569
the given function on the previous state of the key and the new values of the key.
569
570
570
- @param updateFunc State update function ([(k, vs, s)] -> [(k, s)]).
571
+ @param updateFunc: State update function ([(k, vs, s)] -> [(k, s)]).
571
572
If `s` is None, then `k` will be eliminated.
572
573
"""
573
574
if numPartitions is None :
0 commit comments