Skip to content

Commit 6197a11

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
clean up code
1 parent eb4bf48 commit 6197a11

File tree

3 files changed

+20
-22
lines changed

3 files changed

+20
-22
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def _sum(self):
5656
"""
5757
Add up the elements in this DStream.
5858
"""
59-
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
59+
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
6060

6161
def print_(self, label=None):
6262
"""
@@ -65,6 +65,7 @@ def print_(self, label=None):
6565
deserialized pickled python object. Please use DStream.pyprint() to print results.
6666
6767
Call DStream.print() and this function will print byte array in the DStream
68+
6869
"""
6970
# a hack to call print function in DStream
7071
getattr(self._jdstream, "print")(label)
@@ -74,7 +75,7 @@ def filter(self, f):
7475
Return a new DStream containing only the elements that satisfy predicate.
7576
"""
7677
def func(iterator): return ifilter(f, iterator)
77-
return self.mapPartitions(func)
78+
return self._mapPartitions(func)
7879

7980
def flatMap(self, f, preservesPartitioning=False):
8081
"""
@@ -85,7 +86,7 @@ def func(s, iterator):
8586
return chain.from_iterable(imap(f, iterator))
8687
return self._mapPartitionsWithIndex(func, preservesPartitioning)
8788

88-
def map(self, f, preservesPartitioning=False):
89+
def map(self, f):
8990
"""
9091
Return a new DStream by applying a function to each element of DStream.
9192
"""
@@ -217,13 +218,11 @@ def _defaultReducePartitions(self):
217218
return 2
218219

219220
def getNumPartitions(self):
220-
"""
221-
Returns the number of partitions in RDD
222-
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
223-
>>> rdd.getNumPartitions()
224-
2
225-
"""
226-
return self._jdstream.partitions().size()
221+
"""
222+
Return the number of partitions in RDD
223+
"""
224+
# TODO: remove hardcoding. RDD has NumPartitions but DStream does not have.
225+
return 2
227226

228227
def foreachRDD(self, func):
229228
"""

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
5959
* operator, so this PythonDStream will be registered as an output stream and there materialized.
6060
* This function is for PythonAPI.
6161
*/
62-
62+
//TODO move this function to PythonDStream
6363
def pyprint() = dstream.pyprint()
6464

6565
/**

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -620,37 +620,36 @@ abstract class DStream[T: ClassTag] (
620620
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
621621
}
622622

623-
//TODO move pyprint to PythonDStream
623+
//TODO move pyprint to PythonDStream and executed by py4j call back function
624624
/**
625625
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
626626
* operator, so this PythonDStream will be registered as an output stream and there materialized.
627627
* Since serialized Python object is readable by Python, pyprint writes out binary data to
628628
* temporary file and run python script to deserialized and print the first ten elements
629+
*
630+
* Currently call python script directly. We should avoid this
629631
*/
630632
private[streaming] def pyprint() {
631633
def foreachFunc = (rdd: RDD[T], time: Time) => {
632634
val iter = rdd.take(11).iterator
633635

634-
// make a temporary file
636+
// Generate a temporary file
635637
val prefix = "spark"
636638
val suffix = ".tmp"
637639
val tempFile = File.createTempFile(prefix, suffix)
638640
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
639-
//write out serialized python object
641+
// Write out serialized python object to temporary file
640642
PythonRDD.writeIteratorToStream(iter, tempFileStream)
641643
tempFileStream.close()
642644

643-
// This value has to be passed from python
644-
// Python currently does not do cluster deployment. But what happened
645+
// pythonExec should be passed from python. Move pyprint to PythonDStream
645646
val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
646647
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
647-
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
648-
//absolute path to the python script is needed to change because we do not use pysparkstreaming
648+
// Call python script to deserialize and print result in stdout
649649
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)
650650
val workerEnv = pb.environment()
651651

652-
//envVars also need to be pass
653-
//workerEnv.putAll(envVars)
652+
// envVars also should be pass from python
654653
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
655654
workerEnv.put("PYTHONPATH", pythonPath)
656655
val worker = pb.start()
@@ -662,7 +661,7 @@ abstract class DStream[T: ClassTag] (
662661
println ("Time: " + time)
663662
println ("-------------------------------------------")
664663

665-
//print value from python std out
664+
// Print values which is from python std out
666665
var line = ""
667666
breakable {
668667
while (true) {
@@ -671,7 +670,7 @@ abstract class DStream[T: ClassTag] (
671670
println(line)
672671
}
673672
}
674-
//delete temporary file
673+
// Delete temporary file
675674
tempFile.delete()
676675
println()
677676

0 commit comments

Comments
 (0)