Skip to content

Commit 5726440

Browse files
zsxwingtdas
authored andcommitted
[SPARK-8630] [STREAMING] Prevent from checkpointing QueueInputDStream
This PR throws an exception in `QueueInputDStream.writeObject` so that it can fail the application when calling `StreamingContext.start` rather than failing it during recovering QueueInputDStream. Author: zsxwing <zsxwing@gmail.com> Closes apache#7016 from zsxwing/queueStream-checkpoint and squashes the following commits: 89a3d73 [zsxwing] Fix JavaAPISuite.testQueueStream cc40fd7 [zsxwing] Prevent from checkpointing QueueInputDStream
1 parent ca7e460 commit 5726440

File tree

5 files changed

+56
-8
lines changed

5 files changed

+56
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,10 @@ class StreamingContext private[streaming] (
477477
/**
478478
* Create an input stream from a queue of RDDs. In each batch,
479479
* it will process either one or all of the RDDs returned by the queue.
480+
*
481+
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
482+
* those RDDs, so `queueStream` doesn't support checkpointing.
483+
*
480484
* @param queue Queue of RDDs
481485
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
482486
* @tparam T Type of objects in the RDD
@@ -491,6 +495,10 @@ class StreamingContext private[streaming] (
491495
/**
492496
* Create an input stream from a queue of RDDs. In each batch,
493497
* it will process either one or all of the RDDs returned by the queue.
498+
*
499+
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
500+
* those RDDs, so `queueStream` doesn't support checkpointing.
501+
*
494502
* @param queue Queue of RDDs
495503
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
496504
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty.

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
419419
* Create an input stream from an queue of RDDs. In each batch,
420420
* it will process either one or all of the RDDs returned by the queue.
421421
*
422-
* NOTE: changes to the queue after the stream is created will not be recognized.
422+
* NOTE:
423+
* 1. Changes to the queue after the stream is created will not be recognized.
424+
* 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
425+
* those RDDs, so `queueStream` doesn't support checkpointing.
426+
*
423427
* @param queue Queue of RDDs
424428
* @tparam T Type of objects in the RDD
425429
*/
@@ -435,7 +439,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
435439
* Create an input stream from an queue of RDDs. In each batch,
436440
* it will process either one or all of the RDDs returned by the queue.
437441
*
438-
* NOTE: changes to the queue after the stream is created will not be recognized.
442+
* NOTE:
443+
* 1. Changes to the queue after the stream is created will not be recognized.
444+
* 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
445+
* those RDDs, so `queueStream` doesn't support checkpointing.
446+
*
439447
* @param queue Queue of RDDs
440448
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
441449
* @tparam T Type of objects in the RDD
@@ -455,7 +463,11 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
455463
* Create an input stream from an queue of RDDs. In each batch,
456464
* it will process either one or all of the RDDs returned by the queue.
457465
*
458-
* NOTE: changes to the queue after the stream is created will not be recognized.
466+
* NOTE:
467+
* 1. Changes to the queue after the stream is created will not be recognized.
468+
* 2. Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
469+
* those RDDs, so `queueStream` doesn't support checkpointing.
470+
*
459471
* @param queue Queue of RDDs
460472
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
461473
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty

streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.rdd.UnionRDD
22-
import scala.collection.mutable.Queue
23-
import scala.collection.mutable.ArrayBuffer
24-
import org.apache.spark.streaming.{Time, StreamingContext}
20+
import java.io.{NotSerializableException, ObjectOutputStream}
21+
22+
import scala.collection.mutable.{ArrayBuffer, Queue}
2523
import scala.reflect.ClassTag
2624

25+
import org.apache.spark.rdd.{RDD, UnionRDD}
26+
import org.apache.spark.streaming.{Time, StreamingContext}
27+
2728
private[streaming]
2829
class QueueInputDStream[T: ClassTag](
2930
@transient ssc: StreamingContext,
@@ -36,6 +37,10 @@ class QueueInputDStream[T: ClassTag](
3637

3738
override def stop() { }
3839

40+
private def writeObject(oos: ObjectOutputStream): Unit = {
41+
throw new NotSerializableException("queueStream doesn't support checkpointing")
42+
}
43+
3944
override def compute(validTime: Time): Option[RDD[T]] = {
4045
val buffer = new ArrayBuffer[RDD[T]]()
4146
if (oneAtATime && queue.size > 0) {

streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,14 @@ private void testReduceByWindow(boolean withInverse) {
364364
@SuppressWarnings("unchecked")
365365
@Test
366366
public void testQueueStream() {
367+
ssc.stop();
368+
// Create a new JavaStreamingContext without checkpointing
369+
SparkConf conf = new SparkConf()
370+
.setMaster("local[2]")
371+
.setAppName("test")
372+
.set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
373+
ssc = new JavaStreamingContext(conf, new Duration(1000));
374+
367375
List<List<Integer>> expected = Arrays.asList(
368376
Arrays.asList(1,2,3),
369377
Arrays.asList(4,5,6),

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.streaming
2020
import java.io.{File, NotSerializableException}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

23+
import scala.collection.mutable.Queue
24+
2325
import org.apache.commons.io.FileUtils
2426
import org.scalatest.concurrent.Eventually._
2527
import org.scalatest.concurrent.Timeouts
@@ -665,6 +667,19 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
665667
transformed.foreachRDD { rdd => rdd.collect() } }
666668
}
667669

670+
test("queueStream doesn't support checkpointing") {
671+
val checkpointDir = Utils.createTempDir()
672+
ssc = new StreamingContext(master, appName, batchDuration)
673+
val rdd = ssc.sparkContext.parallelize(1 to 10)
674+
ssc.queueStream[Int](Queue(rdd)).print()
675+
ssc.checkpoint(checkpointDir.getAbsolutePath)
676+
val e = intercept[NotSerializableException] {
677+
ssc.start()
678+
}
679+
// StreamingContext.validate changes the message, so use "contains" here
680+
assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
681+
}
682+
668683
def addInputStream(s: StreamingContext): DStream[Int] = {
669684
val input = (1 to 100).map(i => 1 to i)
670685
val inputStream = new TestInputStream(s, input, 1)

0 commit comments

Comments
 (0)