Skip to content

Commit c5ecfc1

Browse files
committed
basic function test cases are passed
1 parent 8dcda84 commit c5ecfc1

File tree

2 files changed

+1
-71
lines changed

2 files changed

+1
-71
lines changed

python/pyspark/worker.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,6 @@ def main(infile, outfile):
8686
(func, deserializer, serializer) = command
8787
init_time = time.time()
8888
iterator = deserializer.load_stream(infile)
89-
print "deserializer in worker: %s" % str(deserializer)
90-
iterator, walk = itertools.tee(iterator)
91-
if isinstance(walk, int):
92-
print "this is int"
93-
print walk
94-
else:
95-
try:
96-
print list(walk)
97-
except:
98-
print list(walk)
9989
serializer.dump_stream(func(split_index, iterator), outfile)
10090
except Exception:
10191
try:

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -206,56 +206,14 @@ class PythonTransformedDStream(
206206
}
207207
*/
208208

209-
<<<<<<< HEAD
210-
=======
211-
/**
212-
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
213-
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
214-
* returns the i_th element at the i_th batch under manual clock.
215-
*/
216-
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int)
217-
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){
218-
219-
def start() {}
220-
221-
def stop() {}
222-
223-
def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
224-
logInfo("Computing RDD for time " + validTime)
225-
inputFiles.foreach(logInfo(_))
226-
// make a temporary file
227-
// make empty RDD
228-
val prefix = "spark"
229-
val suffix = ".tmp"
230-
val tempFile = File.createTempFile(prefix, suffix)
231-
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
232-
logInfo("Index: " + index)
233-
234-
val selectedInputFile: String = {
235-
if (inputFiles.isEmpty){
236-
tempFile.getAbsolutePath
237-
}else if (index < inputFiles.size()) {
238-
inputFiles.get(index)
239-
} else {
240-
tempFile.getAbsolutePath
241-
}
242-
}
243-
val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
244-
logInfo("Created RDD " + rdd.id + " with " + selectedInputFile)
245-
Some(rdd)
246-
}
247-
248-
val asJavaDStream = JavaDStream.fromDStream(this)
249-
}
250-
251209
/**
252210
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
253211
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
254212
* returns the i_th element at the i_th batch under manual clock.
255213
* This implementation is close to QueStream
256214
*/
257215

258-
class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
216+
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
259217
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) {
260218

261219
def start() {}
@@ -280,21 +238,3 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[
280238

281239
val asJavaDStream = JavaDStream.fromDStream(this)
282240
}
283-
284-
285-
class PythonTestInputStream3(ssc_ : JavaStreamingContext)
286-
extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) {
287-
288-
def start() {}
289-
290-
def stop() {}
291-
292-
def compute(validTime: Time): Option[RDD[Any]] = {
293-
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
294-
val selectedInput = ArrayBuffer(1, 2, 3).toSeq
295-
val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2)
296-
Some(rdd)
297-
}
298-
299-
val asJavaDStream = JavaDStream.fromDStream(this)
300-
}>>>>>>> broke something

0 commit comments

Comments
 (0)