Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
executorStarted.acquire()

// Verify that receiver was started
assert(receiver.onStartCalled)
assert(receiver.callsRecorder.calls === Seq("onStart"))
assert(executor.isReceiverStarted)
assert(receiver.isStarted)
assert(!receiver.isStopped())
Expand Down Expand Up @@ -106,19 +106,22 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
assert(executor.errors.head.eq(exception))

// Verify restarting actually stops and starts the receiver
receiver.restart("restarting", null, 600)
eventually(timeout(300.milliseconds), interval(10.milliseconds)) {
// receiver will be stopped async
assert(receiver.isStopped)
assert(receiver.onStopCalled)
}
eventually(timeout(1.second), interval(10.milliseconds)) {
// receiver will be started async
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
executor.callsRecorder.reset()
receiver.callsRecorder.reset()
receiver.restart("restarting", null, 100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes down from 600 to 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes we no longer need so long delay as we don't rely on timing.

eventually(timeout(10.seconds), interval(10.milliseconds)) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, 10.seconds is enough? Or, do you need to re-trigger this PR to validate more?
BTW, thank you so much for taking care of this case! This is really an long standing issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that was actually 1.3 seconds (300ms + 1s) and it hasn't been failing for high probability so it should be pretty enough.

// below verification ensures for now receiver is already restarted
assert(receiver.isStarted)
assert(!receiver.isStopped)
assert(receiver.receiving)

// both receiver supervisor and receiver should be stopped first, and started
assert(executor.callsRecorder.calls === Seq("onReceiverStop", "onReceiverStart"))
assert(receiver.callsRecorder.calls === Seq("onStop", "onStart"))

// check whether the delay between stop and start is respected
assert(executor.callsRecorder.timestamps.reverse.reduceLeft { _ - _ } >= 100)
assert(receiver.callsRecorder.timestamps.reverse.reduceLeft { _ - _ } >= 100)
}

// Verify that stopping actually stops the thread
Expand Down Expand Up @@ -290,6 +293,9 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
val errors = new ArrayBuffer[Throwable]

// tracks calls of "onReceiverStart", "onReceiverStop"
val callsRecorder = new MethodsCallRecorder()

/** Check if all data structures are clean */
def isAllEmpty: Boolean = {
singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
Expand Down Expand Up @@ -325,7 +331,15 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
errors += throwable
}

override protected def onReceiverStart(): Boolean = true
override protected def onReceiverStart(): Boolean = {
callsRecorder.record()
true
}

override protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = {
callsRecorder.record()
super.onReceiverStop(message, error)
}

override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
Expand Down Expand Up @@ -363,36 +377,55 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
class FakeReceiver(sendData: Boolean = false) extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
@volatile var otherThread: Thread = null
@volatile var receiving = false
@volatile var onStartCalled = false
@volatile var onStopCalled = false

// tracks calls of "onStart", "onStop"
@transient lazy val callsRecorder = new MethodsCallRecorder()

def onStart() {
otherThread = new Thread() {
override def run() {
receiving = true
var count = 0
while(!isStopped()) {
if (sendData) {
store(count)
count += 1
try {
var count = 0
while(!isStopped()) {
if (sendData) {
store(count)
count += 1
}
Thread.sleep(10)
}
Thread.sleep(10)
} finally {
receiving = false
}
}
}
onStartCalled = true
callsRecorder.record()
otherThread.start()
}

def onStop() {
onStopCalled = true
callsRecorder.record()
otherThread.join()
}
}

class MethodsCallRecorder {
// tracks calling methods as (timestamp, methodName)
private val records = new ArrayBuffer[(Long, String)]

def record(): Unit = records.append((System.currentTimeMillis(), callerMethodName))

def reset(): Unit = records.clear()

def reset() {
receiving = false
onStartCalled = false
onStopCalled = false
def callsWithTimestamp: scala.collection.immutable.Seq[(Long, String)] = records.toList

def calls: scala.collection.immutable.Seq[String] = records.map(_._2).toList

def timestamps: scala.collection.immutable.Seq[Long] = records.map(_._1).toList

private def callerMethodName: String = {
val stackTrace = new Throwable().getStackTrace
// it should return method name of two levels deeper
stackTrace(2).getMethodName
}
}