Skip to content

Commit 84a021f

Browse files
Ken TakagiwaKen Takagiwa
Ken Takagiwa
authored and
Ken Takagiwa
committed
clean up code
1 parent bd20e17 commit 84a021f

File tree

8 files changed

+89
-128
lines changed

8 files changed

+89
-128
lines changed

python/pyspark/streaming/context.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@
2222
from pyspark.storagelevel import *
2323
from pyspark.rdd import RDD
2424
from pyspark.context import SparkContext
25+
from pyspark.streaming.dstream import DStream
2526

2627
from py4j.java_collections import ListConverter
2728

28-
from pyspark.streaming.dstream import DStream
2929

3030
class StreamingContext(object):
3131
"""
3232
Main entry point for Spark Streaming functionality. A StreamingContext represents the
33-
connection to a Spark cluster, and can be used to create L{RDD}s and
33+
connection to a Spark cluster, and can be used to create L{DStream}s and
3434
broadcast variables on that cluster.
3535
"""
3636

@@ -71,34 +71,35 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
7171
def _initialize_context(self, jspark_context, jduration):
7272
return self._jvm.JavaStreamingContext(jspark_context, jduration)
7373

74-
def actorStream(self, props, name, storageLevel, supervisorStrategy):
75-
raise NotImplementedError
76-
77-
def addStreamingListener(self, streamingListener):
78-
raise NotImplementedError
74+
def start(self):
75+
"""
76+
Start the execution of the streams.
77+
"""
78+
self._jssc.start()
7979

8080
def awaitTermination(self, timeout=None):
81+
"""
82+
Wait for the execution to stop.
83+
"""
8184
if timeout:
8285
self._jssc.awaitTermination(timeout)
8386
else:
8487
self._jssc.awaitTermination()
8588

8689
# start from simple one. storageLevel is not passed for now.
8790
def socketTextStream(self, hostname, port):
91+
"""
92+
Create an input from TCP source hostname:port. Data is received using
93+
a TCP socket and receive byte is interpreted as UTF8 encoded '\n' delimited
94+
lines.
95+
"""
8896
return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer())
8997

90-
def start(self):
91-
self._jssc.start()
92-
93-
def stop(self, stopSparkContext=True):
94-
raise NotImplementedError
95-
9698
def textFileStream(self, directory):
99+
"""
100+
Create an input stream that monitors a Hadoop-compatible file system
101+
for new files and reads them as text files. Files must be wrriten to the
102+
monitored directory by "moving" them from another location within the same
103+
file system. FIle names starting with . are ignored.
104+
"""
97105
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
98-
99-
def transform(self, seq):
100-
raise NotImplementedError
101-
102-
def union(self, seq):
103-
raise NotImplementedError
104-

python/pyspark/streaming/dstream.py

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
from itertools import chain, ifilter, imap
33
import operator
44

5-
import logging
6-
75
from pyspark.serializers import NoOpSerializer,\
86
BatchedSerializer, CloudPickleSerializer, pack_long
97
from pyspark.rdd import _JavaStackTrace
@@ -25,64 +23,86 @@ def count(self):
2523
2624
"""
2725
#TODO make sure count implementation, thiis different from what pyspark does
28-
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
26+
return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
2927

3028
def _sum(self):
3129
"""
3230
"""
33-
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
31+
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
3432

3533
def print_(self):
3634
"""
35+
Since print is reserved name for python, we cannot make a print method function.
36+
This function prints serialized data in RDD in DStream because Scala and Java cannot
37+
deserialized pickled python object. Please use DStream.pyprint() instead to print result.
38+
39+
Call DStream.print().
3740
"""
38-
# print is a reserved name of Python. We cannot give print to function name
41+
#hack to call print function in DStream
3942
getattr(self._jdstream, "print")()
4043

4144
def pyprint(self):
4245
"""
46+
Print the first ten elements of each RDD generated in this DStream. This is an output
47+
operator, so this DStream will be registered as an output stream and there materialized.
48+
4349
"""
4450
self._jdstream.pyprint()
4551

4652
def filter(self, f):
4753
"""
54+
Return DStream containing only the elements that satisfy predicate.
4855
"""
4956
def func(iterator): return ifilter(f, iterator)
50-
return self.mapPartitions(func)
57+
return self._mapPartitions(func)
5158

5259
def flatMap(self, f, preservesPartitioning=False):
5360
"""
61+
Pass each value in the key-value pair DStream through flatMap function
62+
without changing the keys: this also retains the original RDD's partition.
5463
"""
5564
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
56-
return self.mapPartitionsWithIndex(func, preservesPartitioning)
65+
return self._mapPartitionsWithIndex(func, preservesPartitioning)
5766

58-
def map(self, f, preservesPartitioning=False):
67+
def map(self, f):
5968
"""
69+
Return DStream by applying a function to each element of DStream.
6070
"""
6171
def func(iterator): return imap(f, iterator)
62-
return self.mapPartitions(func)
63-
#return PipelinedDStream(self, func, preservesPartitioning)
72+
return self._mapPartitions(func)
6473

65-
def mapPartitions(self, f):
74+
def _mapPartitions(self, f):
6675
"""
76+
Return a new DStream by applying a function to each partition of this DStream.
6777
"""
6878
def func(s, iterator): return f(iterator)
69-
return self.mapPartitionsWithIndex(func)
79+
return self._mapPartitionsWithIndex(func)
7080

71-
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
81+
def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
7282
"""
73-
83+
Return a new DStream by applying a function to each partition of this DStream,
84+
While tracking the index of the original partition.
7485
"""
7586
return PipelinedDStream(self, f, preservesPartitioning)
7687

77-
def reduce(self, func, numPartitions=None):
88+
89+
def reduceByKey(self, func, numPartitions=None):
7890
"""
91+
Merge the value for each key using an associative reduce function.
92+
93+
This will also perform the merging locally on each mapper before
94+
sending resuls to reducer, similarly to a "combiner" in MapReduce.
7995
96+
Output will be hash-partitioned with C{numPartitions} partitions, or
97+
the default parallelism level if C{numPartitions} is not specified.
8098
"""
8199
return self.combineByKey(lambda x:x, func, func, numPartitions)
82100

83101
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
84102
numPartitions = None):
85103
"""
104+
Count the number of elements for each key, and return the result to the
105+
master as a dictionary
86106
"""
87107
if numPartitions is None:
88108
numPartitions = self._defaultReducePartitions()
@@ -148,42 +168,27 @@ def add_shuffle_key(split, iterator):
148168
dstream._partitionFunc = partitionFunc
149169
return dstream
150170

151-
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
152-
"""
153-
154-
"""
155-
return PipelinedDStream(self, f, preservesPartitioning)
156-
157171
def _defaultReducePartitions(self):
158172
"""
173+
Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
174+
If spark.default.parallelism is set, then we'll use the value from SparkContext
175+
defaultParallelism, otherwise we'll use the number of partitions in this RDD.
159176
177+
This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
178+
the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
179+
be inherent.
160180
"""
161-
# hard code to avoid the error
162181
if self.ctx._conf.contains("spark.default.parallelism"):
163182
return self.ctx.defaultParallelism
164183
else:
165184
return self.getNumPartitions()
166185

167-
return self._jdstream.partitions().size()
168-
169-
def _defaultReducePartitions(self):
186+
def getNumPartitions(self):
170187
"""
171-
188+
Return the number of partitions in RDD
172189
"""
173-
# hard code to avoid the error
174-
if self.ctx._conf.contains("spark.default.parallelism"):
175-
return self.ctx.defaultParallelism
176-
else:
177-
return self.getNumPartitions()
178-
179-
def getNumPartitions(self):
180-
"""
181-
Returns the number of partitions in RDD
182-
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
183-
>>> rdd.getNumPartitions()
184-
2
185-
"""
186-
return self._jdstream.partitions().size()
190+
# TODO: remove hardcoding. RDD has NumPartitions but DStream does not have.
191+
return 2
187192

188193

189194
class PipelinedDStream(DStream):

python/pyspark/streaming/duration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from pyspark.streaming import utils
1919

20+
2021
class Duration(object):
2122
"""
2223
Duration for Spark Streaming application. Used to set duration

python/pyspark/streaming/pyprint.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,22 @@
2121

2222
from pyspark.serializers import PickleSerializer
2323

24+
2425
def collect(binary_file_path):
26+
"""
27+
Read pickled file written by SparkStreaming
28+
"""
2529
dse = PickleSerializer()
2630
with open(binary_file_path, 'rb') as tempFile:
2731
for item in dse.load_stream(tempFile):
2832
yield item
33+
34+
2935
def main():
3036
try:
3137
binary_file_path = sys.argv[1]
3238
except:
33-
print "Missed FilePath in argement"
39+
print "Missed FilePath in argements"
3440

3541
if not binary_file_path:
3642
return
@@ -43,5 +49,6 @@ def main():
4349
print "..."
4450
break
4551

52+
4653
if __name__ =="__main__":
4754
exit(main())

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
5959
* operator, so this PythonDStream will be registered as an output stream and there materialized.
6060
* This function is for PythonAPI.
6161
*/
62-
62+
//TODO move this function to PythonDStream
6363
def pyprint() = dstream.pyprint()
6464

6565
/**

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ DStream[Array[Byte]](prev.ssc){
7171
case Some(rdd)=>Some(rdd)
7272
val pairwiseRDD = new PairwiseRDD(rdd)
7373
/*
74-
* This is equivalent to following python code
74+
* Since python operation is executed by Scala after StreamingContext.start.
75+
* What PairwiseDStream does is equivalent to following python code in pySpark.
76+
*
7577
* with _JavaStackTrace(self.context) as st:
7678
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
7779
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)