-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-8979] Add a PID based rate estimator #7648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #38377 has finished for PR 7648 at commit
|
70d875b
to
2cd7139
Compare
Test build #38381 has finished for PR 7648 at commit
|
* the speed of ingestion of elements into Spark Streaming. | ||
* | ||
* @param batchDurationMillis the batch duration, in milliseconds | ||
* @param proportional how much the correction should depend on the current |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dragos @huitseeker Could you change the documentation to make it easier to understand in the context of streaming? For example, its not clear what is "error" in the context of streaming. So these docs needs to be expanded to explain how the processing and scheduling delay is used to estimate the rate, and how the parameters proportional and integral controls the processing. And what are recommended ranges for these parameters, and how do you control them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently they are not user-configurable, should we do that? I think it's too much control for most people, but on the other hand it's the only knobs we have, short of writing another rate estimator altogether.
2cd7139
to
491e371
Compare
I pushed an expanded doc, but it would be good if @huitseeker had a look too, since this is mainly his brainchild. |
Jenkins, test this please. |
Test build #38861 has finished for PR 7648 at commit
|
@dragos Please update this PR with latest master. Would help to review this. |
/** | ||
* Implements a proportional-integral-derivative (PID) controller which acts on | ||
* the speed of ingestion of elements into Spark Streaming. A PID controller works | ||
* by calculating an '''error''' between a measured output and a desired setpoint. In the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the set point
?
|
@huitseeker wrote one, he can add it himself, it wasn't on Google Docs, but a local document that I can't find right now. |
491e371
to
cf3b5e3
Compare
cf3b5e3
to
26cfd78
Compare
val processingRate = elements.toDouble / processingDelay * 1000 | ||
|
||
// in elements/second | ||
val error = latestRate - processingRate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Carrying over conversation from previous thread that got lost due to rebase
Could you make the names more semantically meaningful? How about: error --> changeInRate?
@tdas
tdas added a note 14 hours ago
Why is the latestRate considered as the set point (that's my assumption since the error is calculated between the observed value and the set point, according to PID theory)? @huitseeker
@dragos
dragos added a note 2 hours ago
Since @huitseeker seems to be away, I'll answer this.The latestRate is what we considered the desired value at the previous batch update. With the new information we got for the last batch interval, we compute a current rate, and compare to what we asked for, that's constitutes our error that needs correction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I'd prefer to keep this as error
, as I think most people reading this code would have more troubles mapping things to PID terminology than to Spark Streaming terminology, and all PID docs will mention error and correction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that is exactly the problem, people who are reading the streaming code (like me) is more like to know streaming than PID concepts, and if the code does not make it clear in terms of streaming, it is super hard to relate to. So I am fine with keep this as error
as long as there is a explanation in terms of streaming stuff along with it. Just like what you added for historicalError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add the explanation.
Test build #39054 has finished for PR 7648 at commit
|
Test build #39052 has finished for PR 7648 at commit
|
Test build #39047 has finished for PR 7648 at commit
|
def create(conf: SparkConf, batchInterval: Duration): RateEstimator = | ||
conf.get("spark.streaming.backpressure.rateEstimator", "pid") match { | ||
case "pid" => | ||
val proportional = conf.getDouble("spark.streraming.backpressure.pid.proportional", -1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, in all the parameters!
*/ | ||
private[streaming] class PIDRateEstimator( | ||
batchIntervalMillis: Long, | ||
proportional: Double = -1D, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's actually convert all the parameters to be all positive. Would be more intuitive to say that the proportional parameter can be set between 0 - 1 -- 1 = max aggressive and 0 = least aggressive
I think I addressed all comments. In case I missed something, @nraychaudhuri will take over next week. I'm travelling today and won't be able to make further changes, but it's my firm belief this is in pretty good shape now. I hope it can get in. |
s"Specified batch interval $batchIntervalMillis in PIDRateEstimator is invalid.") | ||
require( | ||
proportional >= 0, | ||
s"Proportional term $proportional in PIDRateEstimator should be >= 0.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you see my earlier comment about > 1 not being practically feasible.
Test build #39203 has finished for PR 7648 at commit
|
@tdas I will try to take care of this by early next week. |
Since we are nearing feature freeze today, I am going to merge this in. There are few nit, but none of them are blockers and we can figure out those details next week. |
I am merging this. Thanks @dragos and @huitseeker |
Based on #7600
/cc @tdas