Skip to content

[SPARK-8977][Streaming] Defines the RateEstimator interface, and impements the RateController #7600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

dragos
Copy link
Contributor

@dragos dragos commented Jul 22, 2015

Based on #7471.

  • add a test that exercises the publish path from driver to receiver
  • remove Serializable from RateController and RateEstimator

huitseeker and others added 10 commits July 15, 2015 23:17
[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator
- made rate limit a Long and default to Long.MaxValue (consequence of the above)
- removed custom `waitUntil` and replaced it by `eventually`
As I mentioned before, I don’t think this is a great idea:

- such tests are flaky (original test in ReceiverSuite was ignored for
that reason)
- Guava’s code has its own test suite, so we can assume it implements
`setRate` correctly

I noticed one flaky failure in about 10 runs on my machine (receiver got
1 message less than the lower bound, which is within 5% of the nominal rate).
use the listener bus to know when receivers have registered (`onStart`
is called before receivers have registered, leading to flaky behavior).
*/
@DeveloperApi
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not worth to serialize (same for RateEstimator). If we make them @transient, we should add a test that after recovery from a checkpoint the rate controller is still active (non-null and registered with the streaming listener bus).

@tdas what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Agreed. Transient plus update some existing test case in CheckpointSuite

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38112 has finished for PR 7600 at commit 8b08492.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos dragos force-pushed the topic/streaming-bp/rate-controller branch from 8b08492 to 34a389d Compare July 22, 2015 21:43
@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38115 has finished for PR 7600 at commit 34a389d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* A rate estimator configured by the user to compute a dynamic ingestion bound for this stream.
* @see `RateEstimator`
*/
protected [streaming] val rateEstimator = newEstimator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need rateEstimator as a field in DStream? All you need expose to modules outside DStream is the RateController. And the RateEstimator can be within the RateController

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38215 has finished for PR 7600 at commit 238cfc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

estimator, changed logic for initialising rate estimator.
@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38228 has finished for PR 7600 at commit b425d32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos
Copy link
Contributor Author

dragos commented Jul 23, 2015

These tests took a while to debug, so I'll remove the serializable components tonight or tomorrow (it's a pretty localized and isolated change). In the meantime please have a look at the current state.

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38243 has finished for PR 7600 at commit e57c66b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -47,6 +49,14 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()

// Keep track of the freshest rate for this stream using the rateEstimator
protected[streaming] val rateController: Option[RateController] =
RateEstimator.makeEstimator(ssc.conf).map { estimator =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should change a little. See comment on what the config params should be in another comment below (near RateEstimator.makeEstimator)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And InputDStream should not have a RateController by default. So this should be None. Its will be upto implementation of InputDStream to override and define.

@SparkQA
Copy link

SparkQA commented Jul 28, 2015

Test build #38696 has finished for PR 7600 at commit 83357bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static class StructWriter
    • case class CreateStructUnsafe(children: Seq[Expression]) extends Expression
    • case class CreateNamedStructUnsafe(children: Seq[Expression]) extends Expression
    • case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode

@@ -41,6 +42,16 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
extends InputDStream[T](ssc_) {

/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please use { and } for multiline methods.

@dragos dragos force-pushed the topic/streaming-bp/rate-controller branch from 83357bd to 5125e60 Compare July 28, 2015 16:38
@dragos
Copy link
Contributor Author

dragos commented Jul 28, 2015

@tdas I have some comments in my email that don't appear here, so I suppose you changed your mind and deleted it.

I think the review would be smoother if you explained a bit better what are your underlying concerns, rather than very specific points. For instance, I don't understand why you insist I should avoid using your own helper methods. I don't think code would get clearer, nor less flaky. If anything, reusing these methods makes the code more uniform and easier to understand, plus any bugs that get fixed in those helpers benefit this test suite too.

@SparkQA
Copy link

SparkQA commented Jul 28, 2015

Test build #38719 has finished for PR 7600 at commit 5125e60.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jul 28, 2015

@dragos I didnt delete any comment! The comments may have been to earlier threads which are currently hidden due to outdated diffs.

I know that these TestSuiteBase are the helper methods that I have provided with Spark Streaming. They were done a long time ago and since then we have learnt quite a bit about the shortcoming of those methods (runStreams), and when to use them and when its cleaner to not use them. The problem with runStreams is that

  1. it hides part of the management of life cycle of the context. It starts and stop the StreamingContext, which give little control when you want to optimize stuff like.
  • Share SparkContexts across StreamingContexts is not possible through this as the Improves test times if we dont have to spin up SparkContext for every unit test.
  • Tests have unnecessary delays when designed using runStreams. This you see for yourself. With or without runStreams you have to use eventually to test your condition. So why wait for the runStreams to complete (especially when running a real clock) and then test with eventually? And how do you decide how many batches to run in runStreams? Is it even important to care about the number of batches to run considering the test really wants to verify some other condition being eventually true? Also why go to the complexity of attaching a TestOutputDStream if the test does care about the output? It will improve test times as well simplify the test if in these cases you just test with eventually alone, without using abstractions that does not really help.

Look at all the newly updated streaming testsuitesl, none of them use runStreams. For a reason. Hope this clarifies why I am trying to move the code away from runStreams.

@dragos
Copy link
Contributor Author

dragos commented Jul 28, 2015

On 28 iul. 2015, at 21:42, Tathagata Das notifications@github.com wrote:

@dragos I didnt delete any comment! The comments may have been to earlier threads which are currently hidden due to outdated diffs.

Weird. Probably you attached it to a commit that went away. They seemed to refer to code that was no longer there, but still new.
I know that these TestSuiteBase are the helper methods that I have provided with Spark Streaming. They were done a long time ago and since then we have learnt quite a bit about the shortcoming of those methods (runStreams), and when to use them and when its cleaner to not use them.

How about you deprecate them? It would save time for everyone touching spark streaming.
Tests have unnecessary delays when designed using runStreams. This you see for yourself. With or without runStreams you have to use eventually to test your condition. So why wait for the runStreams to complete (especially when running a real clock) and then test with eventually?
If you run on a real clock, I don't see what you gain in terms of time. It's not faster, is it?
And how do you decide how many batches to run in runStreams?
It's quite simple: in my test I have 3 rates I want to observe, so I need at least 3 batches.
Is it even important to care about the number of batches to run considering the test really wants to verify some other condition being eventually true?
Yes, since I need to make sure I observe the right number of rate updates, with the right values.
Also why go to the complexity of attaching a TestOutputDStream if the test does care about the output?
The 'complexity' we're talking about is 2 lines of code. I'm happy to be proven wrong, but inlining the useful parts of runStreams will be more than that.

It will improve test times as well simplify the test if in these cases you just test with eventually alone, without using abstractions that does not really help.
I think they help, but again, happy to be proven wrong.

How about you rewrite the last test and compare? If you're still convinced it's better I'll rewrite the rest.

Look at all the newly updated streaming testsuitesl, none of them use runStreams. For a reason. Hope this clarifies why I am trying to move the code away from runStreams.


Reply to this email directly or view it on GitHub.

} catch {
case NonFatal(_) => () // don't stop if the executor wasn't installed yet
}
Thread.sleep(20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be a flaky test. There is no guarantee that this 20 ms sleep will be 20 ms. Can very well be 100 ms. And it cannot be guaranteed that will always get a the 100, 200, 300 will be received by receiver by the time 4 batches are over through runStreams. This going to be SUPER FLAKY. The non-flaky way of doing this

  1. run streamingContext normally in real time (no real need for runStreams, just ssc.start())
  2. test using eventually { observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) }, no need to run anything in the background and all.
  3. observedRates must be a synchronized hashset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, in the interest of closing this PR I will do the changes you request ASAP.

However, in the interest of understanding, I ran it 100 times without it failing once. If you have evidence of it being SUPER FLAKY in practice, do you mind sharing it here?

I guess the reason I'm not seeing these failures is that even if theoretically the delay could be higher, say 100ms, since the batch interval is 1s, you'd still sample it 10 times. And looking at the implementation of SystemClock, it's still based on Thread.sleep, so any systematic delay (say, system under heavy load) will affect both this test and the batch interval sampling rate.

BTW, eventually is using Thread.sleep as well (15ms between attempts, by default), so I guess the same considerations regarding flakiness should apply.

@tdas
Copy link
Contributor

tdas commented Jul 28, 2015

  1. Cannot deprecate TestSuiteBase as it is still perfectly suited for testsuites like BasicOperationsSuite where the exact per-batch output needs to be verified. Its suitable for some use cases, just not all usecase, especially not for those where the output is not important.
  2. I visited the new tests in RateControllerSuite again
    (a)"rate controller publishes updates": No need for runStreams on exact number of batches, all you need is eventually { publishCalls > 0 }
    (b) "publish rates reach receivers": No need for runStreams on exact number of batches, all you need is eventually {dstream.getCurrentRateLimit === Some(200) }
    (c) You are trying to run 4 batches and expect there will 3 observed rates immediately as the 4 batches complete. This CANNOT be guaranteed. Thread.sleep cannot guarantee that you observe the rate limits at the right times so that you observe all the different rates. See comments there, I have suggest how to make this non-flaky. Again, does not require runStreams with exact number of batches.


/**
* Dummy receiver implementation
*/
private class DummyReceiver(receiverId: Int, host: Option[String] = None)
private[streaming] class DummyReceiver(receiverId: Int, host: Option[String] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this receiver and associated InputDStream is used in multiple testsuites, mind renaming this to a more contextually relevant name than Dummy? How about RateTestReceiver and RateTestInputDStream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you can augment these classes to count the number of times rates have been received, so that the MockRateLimitDStream can be removed. Trying to minimize the number of new classes we create.

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #38833 has finished for PR 7600 at commit f168c94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos
Copy link
Contributor Author

dragos commented Jul 29, 2015

A summary of my changes and a few notes, in the hope of speeding up the process:

  • tests are bit faster after moving away from runStreams, so I stand corrected
  • Tests are still extending TestSuiteBase because withStreamingContext is defined in there
  • I moved the sampling of observedRates inside the eventually loop, as I don't know how to do it otherwise without using another thread
  • I didn't make the HashSet synchronized, since there's no background thread anymore

The rest of the changes should match your requests and I don't list them here.

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #1226 has finished for PR 7600 at commit f168c94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #1225 has finished for PR 7600 at commit f168c94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RequestExecutors(
    • class RFormula(override val uid: String) extends Estimator[RFormulaModel] with RFormulaBase
    • public static class StructWriter
    • abstract class InternalRow extends Serializable with SpecializedGetters
    • case class CreateStructUnsafe(children: Seq[Expression]) extends Expression
    • case class CreateNamedStructUnsafe(children: Seq[Expression]) extends Expression
    • case class LastDay(startDate: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class NextDay(startDate: Expression, dayOfWeek: Expression)
    • case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode

@tdas
Copy link
Contributor

tdas commented Jul 29, 2015

LGTM There are still an issues that occurred to me in the last moment, but I am inclined to merge this as is to unblock other PRs - RateController should not be serialized with checkpoints. Does not make sense to recover the RateController. Can you make a PR for this?

Regarding flakiness, it often times occurs only in Jenkins as things can run very slowly in Jenkins, and causes all this Thread.sleep related flakiness to occur. But your current method without background thread make it less flaky, but still does not guarantee. I thought about the test. The least flaky way to test that multiple rate update reach the receiver, would be to send one rate update, wait for it to be received, then send another update with different rate, then wait for it.

@tdas
Copy link
Contributor

tdas commented Jul 29, 2015

Aright I merged this. Thanks all!

@asfgit asfgit closed this in 819be46 Jul 29, 2015
@dragos
Copy link
Contributor Author

dragos commented Jul 30, 2015

LGTM There are still an issues that occurred to me in the last moment, but I am inclined to merge this as is to unblock other PRs - RateController should not be serialized with checkpoints. Does not make sense to recover the RateController. Can you make a PR for this?

I forgot to mention this. I spent some time on it, but it looked like the gains were not worth it. The problem is deserialization, and the specific RateEstimator instance. At deserialization time there's no ssc available yet (it's null), so there's no easy way to get to a SparkConf in order to get the configured estimator. We could work around that by attaching the controller at a later point, during start or some other lifecycle event, but it seemed to complicate matters quite a bit, for little gain that I can see. Thanks for not blocking the PR over this!

asfgit pushed a commit that referenced this pull request Jul 31, 2015
Based on #7600

/cc tdas

Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>

Closes #7648 from dragos/topic/streaming-bp/pid and squashes the following commits:

aa5b097 [Iulian Dragos] Add more comments, made all PID constant parameters positive, a couple more tests.
93b74f8 [Iulian Dragos] Better explanation of historicalError.
7975b0c [Iulian Dragos] Add configuration for PID.
26cfd78 [Iulian Dragos] A couple of variable renames.
d0bdf7c [Iulian Dragos] Update to latest version of the code, various style and name improvements.
d58b845 [François Garillot] [SPARK-8979][Streaming] Implements a PIDRateEstimator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants