Skip to content

[SPARK-8979] Add a PID based rate estimator #7648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

dragos
Copy link
Contributor

@dragos dragos commented Jul 24, 2015

Based on #7600

/cc @tdas

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38377 has finished for PR 7648 at commit 70d875b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos dragos force-pushed the topic/streaming-bp/pid branch from 70d875b to 2cd7139 Compare July 24, 2015 17:29
@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38381 has finished for PR 7648 at commit 2cd7139.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* the speed of ingestion of elements into Spark Streaming.
*
* @param batchDurationMillis the batch duration, in milliseconds
* @param proportional how much the correction should depend on the current
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dragos @huitseeker Could you change the documentation to make it easier to understand in the context of streaming? For example, its not clear what is "error" in the context of streaming. So these docs needs to be expanded to explain how the processing and scheduling delay is used to estimate the rate, and how the parameters proportional and integral controls the processing. And what are recommended ranges for these parameters, and how do you control them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently they are not user-configurable, should we do that? I think it's too much control for most people, but on the other hand it's the only knobs we have, short of writing another rate estimator altogether.

@dragos dragos force-pushed the topic/streaming-bp/pid branch from 2cd7139 to 491e371 Compare July 28, 2015 10:30
@dragos
Copy link
Contributor Author

dragos commented Jul 28, 2015

I pushed an expanded doc, but it would be good if @huitseeker had a look too, since this is mainly his brainchild.

@tdas
Copy link
Contributor

tdas commented Jul 29, 2015

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #38861 has finished for PR 7648 at commit 491e371.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jul 29, 2015

@dragos Please update this PR with latest master. Would help to review this.

/**
* Implements a proportional-integral-derivative (PID) controller which acts on
* the speed of ingestion of elements into Spark Streaming. A PID controller works
* by calculating an '''error''' between a measured output and a desired setpoint. In the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the set point?

@tdas
Copy link
Contributor

tdas commented Jul 30, 2015

@dragos Is there any explanation/derivation of the implementation from the PID concepts written up somewhere. I strongly suggest attaching that to the JIRA.
Nevermind, it already attached.

@dragos
Copy link
Contributor Author

dragos commented Jul 30, 2015

@huitseeker wrote one, he can add it himself, it wasn't on Google Docs, but a local document that I can't find right now.

@dragos dragos force-pushed the topic/streaming-bp/pid branch from 491e371 to cf3b5e3 Compare July 30, 2015 11:37
@dragos dragos force-pushed the topic/streaming-bp/pid branch from cf3b5e3 to 26cfd78 Compare July 30, 2015 11:42
val processingRate = elements.toDouble / processingDelay * 1000

// in elements/second
val error = latestRate - processingRate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Carrying over conversation from previous thread that got lost due to rebase

Could you make the names more semantically meaningful? How about: error --> changeInRate?
@tdas
tdas added a note 14 hours ago
Why is the latestRate considered as the set point (that's my assumption since the error is calculated between the observed value and the set point, according to PID theory)? @huitseeker
@dragos
dragos added a note 2 hours ago
Since @huitseeker seems to be away, I'll answer this.

The latestRate is what we considered the desired value at the previous batch update. With the new information we got for the last batch interval, we compute a current rate, and compare to what we asked for, that's constitutes our error that needs correction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'd prefer to keep this as error, as I think most people reading this code would have more troubles mapping things to PID terminology than to Spark Streaming terminology, and all PID docs will mention error and correction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is exactly the problem, people who are reading the streaming code (like me) is more like to know streaming than PID concepts, and if the code does not make it clear in terms of streaming, it is super hard to relate to. So I am fine with keep this as error as long as there is a explanation in terms of streaming stuff along with it. Just like what you added for historicalError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the explanation.

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #39054 has finished for PR 7648 at commit 93b74f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static final class FloatPrefixComparator extends PrefixComparator
    • case class UnsafeExternalSort(

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #39052 has finished for PR 7648 at commit 7975b0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #39047 has finished for PR 7648 at commit 26cfd78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static final class FloatPrefixComparator extends PrefixComparator
    • case class UnsafeExternalSort(

def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
case "pid" =>
val proportional = conf.getDouble("spark.streraming.backpressure.pid.proportional", -1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, in all the parameters!

*/
private[streaming] class PIDRateEstimator(
batchIntervalMillis: Long,
proportional: Double = -1D,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's actually convert all the parameters to be all positive. Would be more intuitive to say that the proportional parameter can be set between 0 - 1 -- 1 = max aggressive and 0 = least aggressive

@dragos
Copy link
Contributor Author

dragos commented Jul 31, 2015

I think I addressed all comments. In case I missed something, @nraychaudhuri will take over next week. I'm travelling today and won't be able to make further changes, but it's my firm belief this is in pretty good shape now. I hope it can get in.

s"Specified batch interval $batchIntervalMillis in PIDRateEstimator is invalid.")
require(
proportional >= 0,
s"Proportional term $proportional in PIDRateEstimator should be >= 0.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you see my earlier comment about > 1 not being practically feasible.

@SparkQA
Copy link

SparkQA commented Jul 31, 2015

Test build #39203 has finished for PR 7648 at commit aa5b097.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nraychaudhuri
Copy link
Contributor

@tdas I will try to take care of this by early next week.

@tdas
Copy link
Contributor

tdas commented Jul 31, 2015

Since we are nearing feature freeze today, I am going to merge this in. There are few nit, but none of them are blockers and we can figure out those details next week.

@tdas
Copy link
Contributor

tdas commented Jul 31, 2015

I am merging this. Thanks @dragos and @huitseeker

@asfgit asfgit closed this in 0a1d2ca Jul 31, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants