Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

[SPARK-8834] Add backpressure-based dynamic throttling to Spark Streaming #14

Closed
wants to merge 4 commits into from

Conversation

huitseeker
Copy link

This supersedes #13, #11, #9 .

Review on Reviewable

@huitseeker huitseeker changed the title [SI-8834] Add backpressure-based dynamic throttling to Spark Streaming [SPARK-8834] Add backpressure-based dynamic throttling to Spark Streaming Jul 10, 2015
- RateEstimator introduces API for computing max rate of message ingestion from batch updates
- RateController sends batch completion updates to an estimator, and maintains a max speed estimate

- the RateController test is in ignore because of long running time
@huitseeker huitseeker force-pushed the DirectBackPressure2 branch from fdb5cf2 to 33e3098 Compare July 10, 2015 13:28
@@ -77,6 +77,10 @@ private[streaming] class ReceiverSupervisorImpl(
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case RateLimitUpdate(eps) => {
blockGenerator.updateRate(eps.toInt)
logDebug(s"Received update for $streamId : $elemsPerBlock")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elemsPerBlock is not defined. I guess it should be eps here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crap, and I thought I was helping by adding one logging commit :-/

@dragos
Copy link

dragos commented Jul 10, 2015

Code looks in good shape fine, but needs to compile too 😄. I'll send an email update on how I think we should proceed (I'd like to break this in separate PRs, based on TD's suggestions).

if (ratePerSec > 0) {
protected def maxMessagesPerPartition: Option[Long] = {
val effectiveRatePerSec =
rateController.map(_.getLatestRate()).filter(_ > 0).fold(ratePerSec) {_ min _}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a compilation error on this line, but I'm not sure why.

[error] /home/ubuntu/dev/spark/spark-ops/release_spark/target/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala:80: wrong number of parameters; expected = 1
[error]       rateController.map(_.getLatestRate()).filter(_ > 0).fold(ratePerSec) {_ min _}
[error]                                                                               ^
[error] one error found
[error] Compile failed at Jul 10, 2015 2:55:16 PM [1.198s]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't reproduce, but I think I know why, wait a sec.

@huitseeker huitseeker force-pushed the DirectBackPressure2 branch 4 times, most recently from f533773 to c21d7c4 Compare July 13, 2015 08:17
@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/38/

Build Log
last 10 lines

[...truncated 13 lines...]
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 5459aaca3aa86e59e7afd9cfa093327ffdc24d4e
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark JDK-7 PV (i-f485c427)
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark-Ora-JDK7-PV
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of c21d7c419422b042964333db37048dfac67add74 to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/38/ and message: Merged build finished.

Test FAILed.

This RateController maintains a prescriptive max speed estimate
from a (possibly custom) rate estimator.

Dynamically rate-limits this DStream if it is a ReceiverInputDStream
As with the RateLimiter, the global configuration parameter is taken as an absolute limit.
@huitseeker
Copy link
Author

retest this please

@huitseeker huitseeker force-pushed the DirectBackPressure2 branch from c21d7c4 to a62a31e Compare July 14, 2015 09:45
@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/41/

Build Log
last 10 lines

[...truncated 16 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/14/merge^{commit} # timeout=10
Checking out Revision 8ca8eb6d3210439bd58fcc08a1c01d22c575dad2 (refs/remotes/origin/pr/14/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 8ca8eb6d3210439bd58fcc08a1c01d22c575dad2
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of a62a31e2f725f240732ae2f78e2d05a388c3100c to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/41/ and message: Merged build finished.

Test FAILed.

@huitseeker
Copy link
Author

completely unrelated flaky test in spark SQL's logical plans failing.

@@ -77,6 +77,10 @@ private[streaming] class ReceiverSupervisorImpl(
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case RateLimitUpdate(eps) => {
blockGenerator.updateRate(eps.toInt)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the coercion from long to int?

@dragos dragos closed this Sep 9, 2015
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants