Skip to content

Commit 35933e1

Browse files
committed
broke something
1 parent 9767712 commit 35933e1

File tree

5 files changed

+114
-4
lines changed

5 files changed

+114
-4
lines changed

python/pyspark/streaming/context.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,21 +169,23 @@ def _testInputStream(self, test_inputs, numSlices=None):
169169
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
170170
jtempFiles,
171171
numSlices).asJavaDStream()
172-
return DStream(jinput_stream, self, PickleSerializer())
173-
172+
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
174173

175174
def _testInputStream2(self, test_inputs, numSlices=None):
176175
"""
177176
This is inpired by QueStream implementation. Give list of RDD and generate DStream
178177
which contain the RDD.
179178
"""
180179
test_rdds = list()
180+
test_rdd_deserializers = list()
181181
for test_input in test_inputs:
182182
test_rdd = self._sc.parallelize(test_input, numSlices)
183-
print test_rdd.glom().collect()
184183
test_rdds.append(test_rdd._jrdd)
184+
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
185185

186186
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
187187
jinput_stream = self._jvm.PythonTestInputStream2(self._jssc, jtest_rdds).asJavaDStream()
188188

189-
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
189+
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
190+
dstream._test_switch_dserializer(test_rdd_deserializers)
191+
return dstream

python/pyspark/streaming/dstream.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from collections import defaultdict
1919
from itertools import chain, ifilter, imap
20+
import time
2021
import operator
2122

2223
from pyspark.serializers import NoOpSerializer,\
@@ -289,6 +290,25 @@ def get_output(rdd, time):
289290

290291
self.foreachRDD(get_output)
291292

293+
def _test_switch_dserializer(self, serializer_que):
294+
"""
295+
Deserializer is dynamically changed based on numSlice and the number of
296+
input. This function choose deserializer. Currently this is just FIFO.
297+
"""
298+
299+
jrdd_deserializer = self._jrdd_deserializer
300+
301+
def switch(rdd, jtime):
302+
try:
303+
print serializer_que
304+
jrdd_deserializer = serializer_que.pop(0)
305+
print jrdd_deserializer
306+
except Exception as e:
307+
print e
308+
309+
self.foreachRDD(switch)
310+
311+
292312

293313
# TODO: implement groupByKey
294314
# TODO: impelment union

python/pyspark/streaming_tests.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ def test_count(self):
118118
test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]
119119

120120
def test_func(dstream):
121+
print "count"
122+
dstream.count().pyprint()
121123
return dstream.count()
122124
expected_output = map(lambda x: [len(x)], test_input)
123125
output = self._run_stream(test_input, test_func, expected_output)

python/pyspark/worker.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import time
2424
import socket
2525
import traceback
26+
import itertools
2627
# CloudPickler needs to be imported so that depicklers are registered using the
2728
# copy_reg module.
2829
from pyspark.accumulators import _accumulatorRegistry
@@ -76,6 +77,16 @@ def main(infile, outfile):
7677
(func, deserializer, serializer) = command
7778
init_time = time.time()
7879
iterator = deserializer.load_stream(infile)
80+
print "deserializer in worker: %s" % str(deserializer)
81+
iterator, walk = itertools.tee(iterator)
82+
if isinstance(walk, int):
83+
print "this is int"
84+
print walk
85+
else:
86+
try:
87+
print list(walk)
88+
except:
89+
print list(walk)
7990
serializer.dump_stream(func(split_index, iterator), outfile)
8091
except Exception:
8192
try:

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,78 @@ class PythonTransformedDStream(
134134
}
135135
*/
136136

137+
<<<<<<< HEAD
138+
=======
139+
/**
140+
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
141+
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
142+
* returns the i_th element at the i_th batch under manual clock.
143+
*/
144+
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int)
145+
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){
146+
147+
def start() {}
148+
149+
def stop() {}
150+
151+
def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
152+
logInfo("Computing RDD for time " + validTime)
153+
inputFiles.foreach(logInfo(_))
154+
// make a temporary file
155+
// make empty RDD
156+
val prefix = "spark"
157+
val suffix = ".tmp"
158+
val tempFile = File.createTempFile(prefix, suffix)
159+
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
160+
logInfo("Index: " + index)
161+
162+
val selectedInputFile: String = {
163+
if (inputFiles.isEmpty){
164+
tempFile.getAbsolutePath
165+
}else if (index < inputFiles.size()) {
166+
inputFiles.get(index)
167+
} else {
168+
tempFile.getAbsolutePath
169+
}
170+
}
171+
val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
172+
logInfo("Created RDD " + rdd.id + " with " + selectedInputFile)
173+
Some(rdd)
174+
}
175+
176+
val asJavaDStream = JavaDStream.fromDStream(this)
177+
}
178+
179+
/**
180+
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
181+
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
182+
* returns the i_th element at the i_th batch under manual clock.
183+
* This implementation is close to QueStream
184+
*/
185+
186+
class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
187+
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) {
188+
189+
def start() {}
190+
191+
def stop() {}
192+
193+
def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
194+
val emptyRDD = ssc.sparkContext.emptyRDD[Array[Byte]]
195+
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
196+
val selectedRDD = {
197+
if (inputRDDs.isEmpty) {
198+
emptyRDD
199+
} else if (index < inputRDDs.size()) {
200+
inputRDDs.get(index).rdd
201+
} else {
202+
emptyRDD
203+
}
204+
}
205+
206+
Some(selectedRDD)
207+
}
208+
209+
val asJavaDStream = JavaDStream.fromDStream(this)
210+
}
211+
>>>>>>> broke something

0 commit comments

Comments
 (0)