-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-8977][Streaming] Defines the RateEstimator interface, and impements the RateController #7600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8977][Streaming] Defines the RateEstimator interface, and impements the RateController #7600
Conversation
…river to the block generator
…aming.receiver.maxRate
[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator
- made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
As I mentioned before, I don’t think this is a great idea: - such tests are flaky (original test in ReceiverSuite was ignored for that reason) - Guava’s code has its own test suite, so we can assume it implements `setRate` correctly I noticed one flaky failure in about 10 runs on my machine (receiver got 1 message less than the lower bound, which is within 5% of the nominal rate).
This reverts commit 0c51959.
use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
…ements the ReceiverRateController
*/ | ||
@DeveloperApi | ||
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) | ||
extends StreamingListener with Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not worth to serialize (same for RateEstimator). If we make them @transient
, we should add a test that after recovery from a checkpoint the rate controller is still active (non-null and registered with the streaming listener bus).
@tdas what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Agreed. Transient plus update some existing test case in CheckpointSuite
Test build #38112 has finished for PR 7600 at commit
|
8b08492
to
34a389d
Compare
Test build #38115 has finished for PR 7600 at commit
|
* A rate estimator configured by the user to compute a dynamic ingestion bound for this stream. | ||
* @see `RateEstimator` | ||
*/ | ||
protected [streaming] val rateEstimator = newEstimator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need rateEstimator
as a field in DStream
? All you need expose to modules outside DStream is the RateController. And the RateEstimator can be within the RateController
…p/rate-controller
Test build #38215 has finished for PR 7600 at commit
|
estimator, changed logic for initialising rate estimator.
Test build #38228 has finished for PR 7600 at commit
|
with several rate updates.
These tests took a while to debug, so I'll remove the serializable components tonight or tomorrow (it's a pretty localized and isolated change). In the meantime please have a look at the current state. |
Test build #38243 has finished for PR 7600 at commit
|
@@ -47,6 +49,14 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) | |||
/** This is an unique identifier for the input stream. */ | |||
val id = ssc.getNewInputStreamId() | |||
|
|||
// Keep track of the freshest rate for this stream using the rateEstimator | |||
protected[streaming] val rateController: Option[RateController] = | |||
RateEstimator.makeEstimator(ssc.conf).map { estimator => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should change a little. See comment on what the config params should be in another comment below (near RateEstimator.makeEstimator
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And InputDStream should not have a RateController by default. So this should be None. Its will be upto implementation of InputDStream to override and define.
Test build #38696 has finished for PR 7600 at commit
|
@@ -41,6 +42,16 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont | |||
extends InputDStream[T](ssc_) { | |||
|
|||
/** | |||
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. | |||
*/ | |||
override protected[streaming] val rateController: Option[RateController] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please use {
and }
for multiline methods.
83357bd
to
5125e60
Compare
@tdas I have some comments in my email that don't appear here, so I suppose you changed your mind and deleted it. I think the review would be smoother if you explained a bit better what are your underlying concerns, rather than very specific points. For instance, I don't understand why you insist I should avoid using your own helper methods. I don't think code would get clearer, nor less flaky. If anything, reusing these methods makes the code more uniform and easier to understand, plus any bugs that get fixed in those helpers benefit this test suite too. |
Test build #38719 has finished for PR 7600 at commit
|
@dragos I didnt delete any comment! The comments may have been to earlier threads which are currently hidden due to outdated diffs. I know that these TestSuiteBase are the helper methods that I have provided with Spark Streaming. They were done a long time ago and since then we have learnt quite a bit about the shortcoming of those methods (
Look at all the newly updated streaming testsuitesl, none of them use |
How about you rewrite the last test and compare? If you're still convinced it's better I'll rewrite the rest.
|
} catch { | ||
case NonFatal(_) => () // don't stop if the executor wasn't installed yet | ||
} | ||
Thread.sleep(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be a flaky test. There is no guarantee that this 20 ms sleep will be 20 ms. Can very well be 100 ms. And it cannot be guaranteed that will always get a the 100, 200, 300 will be received by receiver by the time 4 batches are over through runStreams
. This going to be SUPER FLAKY. The non-flaky way of doing this
- run streamingContext normally in real time (no real need for
runStreams
, justssc.start()
) - test using
eventually { observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) }
, no need to run anything in the background and all. observedRates
must be a synchronized hashset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, in the interest of closing this PR I will do the changes you request ASAP.
However, in the interest of understanding, I ran it 100 times without it failing once. If you have evidence of it being SUPER FLAKY in practice, do you mind sharing it here?
I guess the reason I'm not seeing these failures is that even if theoretically the delay could be higher, say 100ms, since the batch interval is 1s, you'd still sample it 10 times. And looking at the implementation of SystemClock
, it's still based on Thread.sleep
, so any systematic delay (say, system under heavy load) will affect both this test and the batch interval sampling rate.
BTW, eventually
is using Thread.sleep
as well (15ms between attempts, by default), so I guess the same considerations regarding flakiness should apply.
|
|
||
/** | ||
* Dummy receiver implementation | ||
*/ | ||
private class DummyReceiver(receiverId: Int, host: Option[String] = None) | ||
private[streaming] class DummyReceiver(receiverId: Int, host: Option[String] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this receiver and associated InputDStream is used in multiple testsuites, mind renaming this to a more contextually relevant name than Dummy
? How about RateTestReceiver
and RateTestInputDStream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you can augment these classes to count the number of times rates have been received, so that the MockRateLimitDStream
can be removed. Trying to minimize the number of new classes we create.
Test build #38833 has finished for PR 7600 at commit
|
A summary of my changes and a few notes, in the hope of speeding up the process:
The rest of the changes should match your requests and I don't list them here. |
Test build #1226 has finished for PR 7600 at commit
|
Test build #1225 has finished for PR 7600 at commit
|
LGTM There are still an issues that occurred to me in the last moment, but I am inclined to merge this as is to unblock other PRs - RateController should not be serialized with checkpoints. Does not make sense to recover the RateController. Can you make a PR for this? Regarding flakiness, it often times occurs only in Jenkins as things can run very slowly in Jenkins, and causes all this Thread.sleep related flakiness to occur. But your current method without background thread make it less flaky, but still does not guarantee. I thought about the test. The least flaky way to test that multiple rate update reach the receiver, would be to send one rate update, wait for it to be received, then send another update with different rate, then wait for it. |
Aright I merged this. Thanks all! |
I forgot to mention this. I spent some time on it, but it looked like the gains were not worth it. The problem is deserialization, and the specific |
Based on #7600 /cc tdas Author: Iulian Dragos <jaguarul@gmail.com> Author: François Garillot <francois@garillot.net> Closes #7648 from dragos/topic/streaming-bp/pid and squashes the following commits: aa5b097 [Iulian Dragos] Add more comments, made all PID constant parameters positive, a couple more tests. 93b74f8 [Iulian Dragos] Better explanation of historicalError. 7975b0c [Iulian Dragos] Add configuration for PID. 26cfd78 [Iulian Dragos] A couple of variable renames. d0bdf7c [Iulian Dragos] Update to latest version of the code, various style and name improvements. d58b845 [François Garillot] [SPARK-8979][Streaming] Implements a PIDRateEstimator
Based on #7471.
RateController
andRateEstimator