-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-8977][Streaming] Defines the RateEstimator interface, and impements the RateController #7600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
dragos
wants to merge
20
commits into
apache:master
from
lightbend:topic/streaming-bp/rate-controller
Closed
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
4721c7d
[SPARK-8975][Streaming] Add a mechanism to send a new rate from the d…
huitseeker d15de42
[SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.stre…
huitseeker 6369b30
Merge pull request #15 from huitseeker/SPARK-8975
dragos cd1397d
Add a test for the propagation of a new rate limit from driver to rec…
dragos 261a051
- removed field to hold the current rate limit in rate limiter
dragos 0c51959
Added a few tests that measure the receiver’s rate.
dragos 210f495
Revert "Added a few tests that measure the receiver’s rate."
dragos 162d9e5
Use Reflection for accessing truly private `executor` method and
dragos 8941cf9
Renames and other nitpicks.
dragos d32ca36
[SPARK-8977][Streaming] Defines the RateEstimator interface, and impl…
huitseeker 34a389d
Various style changes and a first test for the rate controller.
dragos 238cfc6
Merge remote-tracking branch 'upstream/master' into topic/streaming-b…
dragos b425d32
Removed DeveloperAPI, removed rateEstimator field, removed Noop rate
dragos e57c66b
Added a couple of tests for the full scenario from driver to receivers,
dragos 715437a
Review comments and added a `reset` call in ReceiverTrackerTest.
dragos e9fb45e
- Add a test for checkpointing
dragos 475e346
Latest round of reviews.
dragos a2eb3b9
Merge remote-tracking branch 'upstream/master' into topic/streaming-b…
dragos 5125e60
Fix style.
dragos f168c94
Latest review round.
dragos File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.streaming.scheduler | ||
|
||
import java.io.ObjectInputStream | ||
import java.util.concurrent.atomic.AtomicLong | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.streaming.scheduler.rate.RateEstimator | ||
import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
||
/** | ||
* A StreamingListener that receives batch completion updates, and maintains | ||
* an estimate of the speed at which this stream should ingest messages, | ||
* given an estimate computation from a `RateEstimator` | ||
*/ | ||
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) | ||
extends StreamingListener with Serializable { | ||
|
||
init() | ||
|
||
protected def publish(rate: Long): Unit | ||
|
||
@transient | ||
implicit private var executionContext: ExecutionContext = _ | ||
|
||
@transient | ||
private var rateLimit: AtomicLong = _ | ||
|
||
/** | ||
* An initialization method called both from the constructor and Serialization code. | ||
*/ | ||
private def init() { | ||
executionContext = ExecutionContext.fromExecutorService( | ||
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update")) | ||
rateLimit = new AtomicLong(-1L) | ||
} | ||
|
||
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { | ||
ois.defaultReadObject() | ||
init() | ||
} | ||
|
||
/** | ||
* Compute the new rate limit and publish it asynchronously. | ||
*/ | ||
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = | ||
Future[Unit] { | ||
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) | ||
newRate.foreach { s => | ||
rateLimit.set(s.toLong) | ||
publish(getLatestRate()) | ||
} | ||
} | ||
|
||
def getLatestRate(): Long = rateLimit.get() | ||
|
||
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { | ||
val elements = batchCompleted.batchInfo.streamIdToInputInfo | ||
|
||
for { | ||
processingEnd <- batchCompleted.batchInfo.processingEndTime; | ||
workDelay <- batchCompleted.batchInfo.processingDelay; | ||
waitDelay <- batchCompleted.batchInfo.schedulingDelay; | ||
elems <- elements.get(streamUID).map(_.numRecords) | ||
} computeAndPublish(processingEnd, elems, workDelay, waitDelay) | ||
} | ||
} | ||
|
||
object RateController { | ||
def isBackPressureEnabled(conf: SparkConf): Boolean = | ||
conf.getBoolean("spark.streaming.backpressure.enable", false) | ||
} |
59 changes: 59 additions & 0 deletions
59
streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.streaming.scheduler.rate | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.SparkException | ||
|
||
/** | ||
* A component that estimates the rate at wich an InputDStream should ingest | ||
* elements, based on updates at every batch completion. | ||
*/ | ||
private[streaming] trait RateEstimator extends Serializable { | ||
|
||
/** | ||
* Computes the number of elements the stream attached to this `RateEstimator` | ||
* should ingest per second, given an update on the size and completion | ||
* times of the latest batch. | ||
* | ||
* @param time The timetamp of the current batch interval that just finished | ||
* @param elements The number of elements that were processed in this batch | ||
* @param processingDelay The time in ms that took for the job to complete | ||
* @param schedulingDelay The time in ms that the job spent in the scheduling queue | ||
*/ | ||
def compute( | ||
time: Long, | ||
elements: Long, | ||
processingDelay: Long, | ||
schedulingDelay: Long): Option[Double] | ||
} | ||
|
||
object RateEstimator { | ||
|
||
/** | ||
* Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`. | ||
* | ||
* @return None if there is no configured estimator, otherwise an instance of RateEstimator | ||
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any | ||
* known estimators. | ||
*/ | ||
def create(conf: SparkConf): Option[RateEstimator] = | ||
conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator => | ||
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.streaming.scheduler | ||
|
||
import scala.collection.mutable | ||
import scala.reflect.ClassTag | ||
import scala.util.control.NonFatal | ||
|
||
import org.scalatest.Matchers._ | ||
import org.scalatest.concurrent.Eventually._ | ||
import org.scalatest.time.SpanSugar._ | ||
|
||
import org.apache.spark.streaming._ | ||
import org.apache.spark.streaming.scheduler.rate.RateEstimator | ||
|
||
class RateControllerSuite extends TestSuiteBase { | ||
|
||
override def useManualClock: Boolean = false | ||
|
||
test("rate controller publishes updates") { | ||
val ssc = new StreamingContext(conf, batchDuration) | ||
withStreamingContext(ssc) { ssc => | ||
val dstream = new RateLimitInputDStream(ssc) | ||
dstream.register() | ||
ssc.start() | ||
|
||
eventually(timeout(10.seconds)) { | ||
assert(dstream.publishCalls > 0) | ||
} | ||
} | ||
} | ||
|
||
test("publish rates reach receivers") { | ||
val ssc = new StreamingContext(conf, batchDuration) | ||
withStreamingContext(ssc) { ssc => | ||
val dstream = new RateLimitInputDStream(ssc) { | ||
override val rateController = | ||
Some(new ReceiverRateController(id, new ConstantEstimator(200.0))) | ||
} | ||
dstream.register() | ||
SingletonTestRateReceiver.reset() | ||
ssc.start() | ||
|
||
eventually(timeout(10.seconds)) { | ||
assert(dstream.getCurrentRateLimit === Some(200)) | ||
} | ||
} | ||
} | ||
|
||
test("multiple publish rates reach receivers") { | ||
val ssc = new StreamingContext(conf, batchDuration) | ||
withStreamingContext(ssc) { ssc => | ||
val rates = Seq(100L, 200L, 300L) | ||
|
||
val dstream = new RateLimitInputDStream(ssc) { | ||
override val rateController = | ||
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*))) | ||
} | ||
SingletonTestRateReceiver.reset() | ||
dstream.register() | ||
|
||
val observedRates = mutable.HashSet.empty[Long] | ||
ssc.start() | ||
|
||
eventually(timeout(20.seconds)) { | ||
dstream.getCurrentRateLimit.foreach(observedRates += _) | ||
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver | ||
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue) | ||
} | ||
} | ||
} | ||
} | ||
|
||
private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator { | ||
private var idx: Int = 0 | ||
|
||
private def nextRate(): Double = { | ||
val rate = rates(idx) | ||
idx = (idx + 1) % rates.size | ||
rate | ||
} | ||
|
||
def compute( | ||
time: Long, | ||
elements: Long, | ||
processingDelay: Long, | ||
schedulingDelay: Long): Option[Double] = Some(nextRate()) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this return an option? Since there is a separate conf that enables/disables RateController, this should always return an instance of RateEstimator even if its the default. But I do understand that since this PR does not have a default one this is probably better. So I am okay to merge this PR as is, as long as the PID estimator PR updates this code to not use Option. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok