Skip to content

Commit c18f849

Browse files
HeartSaVioRMarcelo Vanzin
authored andcommitted
[SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver has been initialized, but with hard timeout
### What changes were proposed in this pull request? This patch fixes the flaky test failure from StreamingContextSuite "stop slow receiver gracefully", via putting flag whether initializing slow receiver is completed, and wait for such flag to be true. As receiver should be submitted via job and initialized in executor, 500ms might not be enough for covering all cases. ### Why are the changes needed? We got some reports for test failure on this test. Please refer [SPARK-24663](https://issues.apache.org/jira/browse/SPARK-24663) ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modified UT. I've artificially made delay on handling job submission via adding below code in `DAGScheduler.submitJob`: ``` if (rdd != null && rdd.name != null && rdd.name.startsWith("Receiver")) { println(s"Receiver Job! rdd name: ${rdd.name}") Thread.sleep(1000) } ``` and the test "stop slow receiver gracefully" failed on current master and passed on the patch. Closes #25725 from HeartSaVioR/SPARK-24663. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent b62ef8f commit c18f849

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ class StreamingContextSuite
346346
logInfo("==================================\n\n\n")
347347
ssc = new StreamingContext(sc, Milliseconds(100))
348348
var runningCount = 0
349-
SlowTestReceiver.receivedAllRecords = false
350349
// Create test receiver that sleeps in onStop()
351350
val totalNumRecords = 15
352351
val recordsPerSecond = 1
@@ -358,6 +357,9 @@ class StreamingContextSuite
358357
}
359358
ssc.start()
360359
ssc.awaitTerminationOrTimeout(500)
360+
eventually(timeout(10.seconds), interval(10.millis)) {
361+
assert(SlowTestReceiver.initialized)
362+
}
361363
ssc.stop(stopSparkContext = false, stopGracefully = true)
362364
logInfo("Running count = " + runningCount)
363365
assert(runningCount > 0)
@@ -949,6 +951,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
949951
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
950952

951953
var receivingThreadOption: Option[Thread] = None
954+
@volatile var receivedAllRecords = false
952955

953956
def onStart() {
954957
val thread = new Thread() {
@@ -958,25 +961,26 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
958961
Thread.sleep(1000 / recordsPerSecond)
959962
store(i)
960963
}
961-
SlowTestReceiver.receivedAllRecords = true
964+
receivedAllRecords = true
962965
logInfo(s"Received all $totalRecords records")
963966
}
964967
}
965968
receivingThreadOption = Some(thread)
966969
thread.start()
970+
SlowTestReceiver.initialized = true
967971
}
968972

969973
def onStop() {
970974
// Simulate slow receiver by waiting for all records to be produced
971-
while (!SlowTestReceiver.receivedAllRecords) {
975+
while (!receivedAllRecords) {
972976
Thread.sleep(100)
973977
}
974978
// no clean to be done, the receiving thread should stop on it own
975979
}
976980
}
977981

978982
object SlowTestReceiver {
979-
var receivedAllRecords = false
983+
var initialized = false
980984
}
981985

982986
/** Streaming application for testing DStream and RDD creation sites */

0 commit comments

Comments
 (0)