-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23197][STREAMING][TESTS] Fix ReceiverSuite."receiver_life_cycle" to not rely on timing #25862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23197][STREAMING][TESTS] Fix ReceiverSuite."receiver_life_cycle" to not rely on timing #25862
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,7 +73,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { | |
| executorStarted.acquire() | ||
|
|
||
| // Verify that receiver was started | ||
| assert(receiver.onStartCalled) | ||
| assert(receiver.callsRecorder.calls === Seq("onStart")) | ||
| assert(executor.isReceiverStarted) | ||
| assert(receiver.isStarted) | ||
| assert(!receiver.isStopped()) | ||
|
|
@@ -106,19 +106,22 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { | |
| assert(executor.errors.head.eq(exception)) | ||
|
|
||
| // Verify restarting actually stops and starts the receiver | ||
| receiver.restart("restarting", null, 600) | ||
| eventually(timeout(300.milliseconds), interval(10.milliseconds)) { | ||
| // receiver will be stopped async | ||
| assert(receiver.isStopped) | ||
| assert(receiver.onStopCalled) | ||
| } | ||
| eventually(timeout(1.second), interval(10.milliseconds)) { | ||
| // receiver will be started async | ||
| assert(receiver.onStartCalled) | ||
| assert(executor.isReceiverStarted) | ||
| executor.callsRecorder.reset() | ||
| receiver.callsRecorder.reset() | ||
| receiver.restart("restarting", null, 100) | ||
| eventually(timeout(10.seconds), interval(10.milliseconds)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that was actually 1.3 seconds (300ms + 1s) and it hasn't been failing for high probability so it should be pretty enough. |
||
| // below verification ensures for now receiver is already restarted | ||
| assert(receiver.isStarted) | ||
| assert(!receiver.isStopped) | ||
| assert(receiver.receiving) | ||
|
|
||
| // both receiver supervisor and receiver should be stopped first, and started | ||
| assert(executor.callsRecorder.calls === Seq("onReceiverStop", "onReceiverStart")) | ||
| assert(receiver.callsRecorder.calls === Seq("onStop", "onStart")) | ||
|
|
||
| // check whether the delay between stop and start is respected | ||
| assert(executor.callsRecorder.timestamps.reverse.reduceLeft { _ - _ } >= 100) | ||
| assert(receiver.callsRecorder.timestamps.reverse.reduceLeft { _ - _ } >= 100) | ||
| } | ||
|
|
||
| // Verify that stopping actually stops the thread | ||
|
|
@@ -290,6 +293,9 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { | |
| val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]] | ||
| val errors = new ArrayBuffer[Throwable] | ||
|
|
||
| // tracks calls of "onReceiverStart", "onReceiverStop" | ||
| val callsRecorder = new MethodsCallRecorder() | ||
|
|
||
| /** Check if all data structures are clean */ | ||
| def isAllEmpty: Boolean = { | ||
| singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty && | ||
|
|
@@ -325,7 +331,15 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { | |
| errors += throwable | ||
| } | ||
|
|
||
| override protected def onReceiverStart(): Boolean = true | ||
| override protected def onReceiverStart(): Boolean = { | ||
| callsRecorder.record() | ||
| true | ||
| } | ||
|
|
||
| override protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = { | ||
| callsRecorder.record() | ||
| super.onReceiverStop(message, error) | ||
| } | ||
|
|
||
| override def createBlockGenerator( | ||
| blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { | ||
|
|
@@ -363,36 +377,55 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { | |
| class FakeReceiver(sendData: Boolean = false) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { | ||
| @volatile var otherThread: Thread = null | ||
| @volatile var receiving = false | ||
| @volatile var onStartCalled = false | ||
| @volatile var onStopCalled = false | ||
|
|
||
| // tracks calls of "onStart", "onStop" | ||
| @transient lazy val callsRecorder = new MethodsCallRecorder() | ||
|
|
||
| def onStart() { | ||
| otherThread = new Thread() { | ||
| override def run() { | ||
| receiving = true | ||
| var count = 0 | ||
| while(!isStopped()) { | ||
| if (sendData) { | ||
| store(count) | ||
| count += 1 | ||
| try { | ||
| var count = 0 | ||
| while(!isStopped()) { | ||
| if (sendData) { | ||
| store(count) | ||
| count += 1 | ||
| } | ||
| Thread.sleep(10) | ||
| } | ||
| Thread.sleep(10) | ||
| } finally { | ||
| receiving = false | ||
| } | ||
| } | ||
| } | ||
| onStartCalled = true | ||
| callsRecorder.record() | ||
| otherThread.start() | ||
| } | ||
|
|
||
| def onStop() { | ||
| onStopCalled = true | ||
| callsRecorder.record() | ||
| otherThread.join() | ||
| } | ||
| } | ||
|
|
||
| class MethodsCallRecorder { | ||
| // tracks calling methods as (timestamp, methodName) | ||
| private val records = new ArrayBuffer[(Long, String)] | ||
|
|
||
| def record(): Unit = records.append((System.currentTimeMillis(), callerMethodName)) | ||
|
|
||
| def reset(): Unit = records.clear() | ||
|
|
||
| def reset() { | ||
| receiving = false | ||
| onStartCalled = false | ||
| onStopCalled = false | ||
| def callsWithTimestamp: scala.collection.immutable.Seq[(Long, String)] = records.toList | ||
|
|
||
| def calls: scala.collection.immutable.Seq[String] = records.map(_._2).toList | ||
|
|
||
| def timestamps: scala.collection.immutable.Seq[Long] = records.map(_._1).toList | ||
|
|
||
| private def callerMethodName: String = { | ||
| val stackTrace = new Throwable().getStackTrace | ||
| // it should return method name of two levels deeper | ||
| stackTrace(2).getMethodName | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goes down from 600 to 100?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes we no longer need so long delay as we don't rely on timing.