Skip to content

Commit 0a8bbbb

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
clean up codes
1 parent bab31c1 commit 0a8bbbb

File tree

4 files changed

+11
-15
lines changed

4 files changed

+11
-15
lines changed

examples/src/main/python/streaming/network_wordcount.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,15 @@
1111
exit(-1)
1212
conf = SparkConf()
1313
conf.setAppName("PythonStreamingNetworkWordCount")
14-
conf.set("spark.default.parallelism", 1)
1514
ssc = StreamingContext(conf=conf, duration=Seconds(1))
1615

1716
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
1817
fm_lines = lines.flatMap(lambda x: x.split(" "))
19-
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
2018
mapped_lines = fm_lines.map(lambda x: (x, 1))
21-
reduced_lines = mapped_lines.reduce(add)
22-
counted_lines = reduced_lines.count()
19+
reduced_lines = mapped_lines.reduceByKey(add)
2320

2421
fm_lines.pyprint()
25-
filtered_lines.pyprint()
2622
mapped_lines.pyprint()
2723
reduced_lines.pyprint()
28-
counted_lines.pyprint()
2924
ssc.start()
3025
ssc.awaitTermination()

examples/src/main/python/streaming/wordcount.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
fm_lines = lines.flatMap(lambda x: x.split(" "))
2222
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
2323
mapped_lines = fm_lines.map(lambda x: (x, 1))
24-
reduced_lines = mapped_lines.reduce(add)
24+
reduced_lines = mapped_lines.reduceByKey(add)
2525

2626
fm_lines.pyprint()
2727
filtered_lines.pyprint()

python/pyspark/streaming/dstream.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ def count(self):
2222
"""
2323
2424
"""
25-
#TODO make sure count implementation, thiis different from what pyspark does
26-
return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
25+
pass
26+
#TODO: make sure count implementation, thiis different from what pyspark does
27+
#return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
2728

2829
def _sum(self):
2930
"""
3031
"""
31-
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
32+
pass
33+
#return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
3234

3335
def print_(self):
3436
"""
@@ -85,7 +87,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
8587
"""
8688
return PipelinedDStream(self, f, preservesPartitioning)
8789

88-
8990
def reduceByKey(self, func, numPartitions=None):
9091
"""
9192
Merge the value for each key using an associative reduce function.
@@ -121,7 +122,7 @@ def combineLocally(iterator):
121122
else:
122123
combiners[k] = mergeValue(combiners[k], v)
123124
return combiners.iteritems()
124-
locally_combined = self.mapPartitions(combineLocally)
125+
locally_combined = self._mapPartitions(combineLocally)
125126
shuffled = locally_combined.partitionBy(numPartitions)
126127
def _mergeCombiners(iterator):
127128
combiners = {}
@@ -131,12 +132,11 @@ def _mergeCombiners(iterator):
131132
else:
132133
combiners[k] = mergeCombiners(combiners[k], v)
133134
return combiners.iteritems()
134-
return shuffled.mapPartitions(_mergeCombiners)
135+
return shuffled._mapPartitions(_mergeCombiners)
135136

136137
def partitionBy(self, numPartitions, partitionFunc=None):
137138
"""
138139
Return a copy of the DStream partitioned using the specified partitioner.
139-
140140
"""
141141
if numPartitions is None:
142142
numPartitions = self.ctx._defaultReducePartitions()

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ abstract class DStream[T: ClassTag] (
623623
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
624624
}
625625

626-
//TODO move pyprint to PythonDStream and executed by py4j call back function
626+
//TODO: move pyprint to PythonDStream and executed by py4j call back function
627627
/**
628628
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
629629
* operator, so this PythonDStream will be registered as an output stream and there materialized.
@@ -647,6 +647,7 @@ abstract class DStream[T: ClassTag] (
647647

648648
// pythonExec should be passed from python. Move pyprint to PythonDStream
649649
val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
650+
650651
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
651652
// Call python script to deserialize and print result in stdout
652653
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)

0 commit comments

Comments
 (0)