-
Notifications
You must be signed in to change notification settings - Fork 3
[SPARK-8834] Add backpressure-based dynamic throttling to Spark Streaming #14
Conversation
- RateEstimator introduces API for computing max rate of message ingestion from batch updates - RateController sends batch completion updates to an estimator, and maintains a max speed estimate - the RateController test is in ignore because of long running time
fdb5cf2
to
33e3098
Compare
@@ -77,6 +77,10 @@ private[streaming] class ReceiverSupervisorImpl( | |||
case CleanupOldBlocks(threshTime) => | |||
logDebug("Received delete old batch signal") | |||
cleanupOldBlocks(threshTime) | |||
case RateLimitUpdate(eps) => { | |||
blockGenerator.updateRate(eps.toInt) | |||
logDebug(s"Received update for $streamId : $elemsPerBlock") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elemsPerBlock
is not defined. I guess it should be eps
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Crap, and I thought I was helping by adding one logging commit :-/
Code looks |
if (ratePerSec > 0) { | ||
protected def maxMessagesPerPartition: Option[Long] = { | ||
val effectiveRatePerSec = | ||
rateController.map(_.getLatestRate()).filter(_ > 0).fold(ratePerSec) {_ min _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get a compilation error on this line, but I'm not sure why.
[error] /home/ubuntu/dev/spark/spark-ops/release_spark/target/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala:80: wrong number of parameters; expected = 1
[error] rateController.map(_.getLatestRate()).filter(_ > 0).fold(ratePerSec) {_ min _}
[error] ^
[error] one error found
[error] Compile failed at Jul 10, 2015 2:55:16 PM [1.198s]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't reproduce, but I think I know why, wait a sec.
f533773
to
c21d7c4
Compare
Refer to this link for build results (access rights to CI server needed): Build Log
Test FAILed. |
This RateController maintains a prescriptive max speed estimate from a (possibly custom) rate estimator. Dynamically rate-limits this DStream if it is a ReceiverInputDStream
As with the RateLimiter, the global configuration parameter is taken as an absolute limit.
retest this please |
c21d7c4
to
a62a31e
Compare
Refer to this link for build results (access rights to CI server needed): Build Log
Test FAILed. |
completely unrelated flaky test in spark SQL's logical plans failing. |
@@ -77,6 +77,10 @@ private[streaming] class ReceiverSupervisorImpl( | |||
case CleanupOldBlocks(threshTime) => | |||
logDebug("Received delete old batch signal") | |||
cleanupOldBlocks(threshTime) | |||
case RateLimitUpdate(eps) => { | |||
blockGenerator.updateRate(eps.toInt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the coercion from long to int?
This supersedes #13, #11, #9 .