Skip to content

[SPARK-8977][Streaming] Defines the RateEstimator interface, and impements the RateController #7600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4721c7d
[SPARK-8975][Streaming] Add a mechanism to send a new rate from the d…
huitseeker Jul 13, 2015
d15de42
[SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.stre…
huitseeker Jul 15, 2015
6369b30
Merge pull request #15 from huitseeker/SPARK-8975
dragos Jul 17, 2015
cd1397d
Add a test for the propagation of a new rate limit from driver to rec…
dragos Jul 17, 2015
261a051
- removed field to hold the current rate limit in rate limiter
dragos Jul 20, 2015
0c51959
Added a few tests that measure the receiver’s rate.
dragos Jul 20, 2015
210f495
Revert "Added a few tests that measure the receiver’s rate."
dragos Jul 21, 2015
162d9e5
Use Reflection for accessing truly private `executor` method and
dragos Jul 21, 2015
8941cf9
Renames and other nitpicks.
dragos Jul 22, 2015
d32ca36
[SPARK-8977][Streaming] Defines the RateEstimator interface, and impl…
huitseeker Jul 14, 2015
34a389d
Various style changes and a first test for the rate controller.
dragos Jul 22, 2015
238cfc6
Merge remote-tracking branch 'upstream/master' into topic/streaming-b…
dragos Jul 23, 2015
b425d32
Removed DeveloperAPI, removed rateEstimator field, removed Noop rate
dragos Jul 23, 2015
e57c66b
Added a couple of tests for the full scenario from driver to receivers,
dragos Jul 23, 2015
715437a
Review comments and added a `reset` call in ReceiverTrackerTest.
dragos Jul 24, 2015
e9fb45e
- Add a test for checkpointing
dragos Jul 24, 2015
475e346
Latest round of reviews.
dragos Jul 28, 2015
a2eb3b9
Merge remote-tracking branch 'upstream/master' into topic/streaming-b…
dragos Jul 28, 2015
5125e60
Fix style.
dragos Jul 28, 2015
f168c94
Latest review round.
dragos Jul 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.streaming.{Time, Duration, StreamingContext}
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
import org.apache.spark.streaming.scheduler.RateController
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.Utils

/**
Expand All @@ -47,6 +49,9 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()

// Keep track of the freshest rate for this stream using the rateEstimator
protected[streaming] val rateController: Option[RateController] = None

/** A human-readable name of this InputDStream */
private[streaming] def name: String = {
// e.g. FlumePollingDStream -> "Flume polling stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import scala.reflect.ClassTag

import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.streaming.util.WriteAheadLogUtils

/**
Expand All @@ -40,6 +41,17 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
} else {
None
}
}

/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
Expand Down Expand Up @@ -110,4 +122,14 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
}
Some(blockRDD)
}

/**
* A RateController that sends the new rate to receivers, via the receiver tracker.
*/
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* A StreamingListener that receives batch completion updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {

init()

protected def publish(rate: Long): Unit

@transient
implicit private var executionContext: ExecutionContext = _

@transient
private var rateLimit: AtomicLong = _

/**
* An initialization method called both from the constructor and Serialization code.
*/
private def init() {
executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
rateLimit = new AtomicLong(-1L)
}

private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
init()
}

/**
* Compute the new rate limit and publish it asynchronously.
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
}

def getLatestRate(): Long = rateLimit.get()

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo

for {
processingEnd <- batchCompleted.batchInfo.processingEndTime;
workDelay <- batchCompleted.batchInfo.processingDelay;
waitDelay <- batchCompleted.batchInfo.schedulingDelay;
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}

object RateController {
def isBackPressureEnabled(conf: SparkConf): Boolean =
conf.getBoolean("spark.streaming.backpressure.enable", false)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler.rate

import org.apache.spark.SparkConf
import org.apache.spark.SparkException

/**
* A component that estimates the rate at wich an InputDStream should ingest
* elements, based on updates at every batch completion.
*/
private[streaming] trait RateEstimator extends Serializable {

/**
* Computes the number of elements the stream attached to this `RateEstimator`
* should ingest per second, given an update on the size and completion
* times of the latest batch.
*
* @param time The timetamp of the current batch interval that just finished
* @param elements The number of elements that were processed in this batch
* @param processingDelay The time in ms that took for the job to complete
* @param schedulingDelay The time in ms that the job spent in the scheduling queue
*/
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double]
}

object RateEstimator {

/**
* Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
*
* @return None if there is no configured estimator, otherwise an instance of RateEstimator
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
* known estimators.
*/
def create(conf: SparkConf): Option[RateEstimator] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this return an option? Since there is a separate conf that enables/disables RateController, this should always return an instance of RateEstimator even if its the default. But I do understand that since this PR does not have a default one this is probably better. So I am okay to merge this PR as is, as long as the PID estimator PR updates this code to not use Option. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator =>
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonTestRateReceiver}
import org.apache.spark.util.{Clock, ManualClock, Utils}

/**
Expand Down Expand Up @@ -391,6 +393,32 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}

test("recovery maintains rate controller") {
ssc = new StreamingContext(conf, batchDuration)
ssc.checkpoint(checkpointDir)

val dstream = new RateLimitInputDStream(ssc) {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
}
SingletonTestRateReceiver.reset()

val output = new TestOutputStreamWithPartitions(dstream.checkpoint(batchDuration * 2))
output.register()
runStreams(ssc, 5, 5)

SingletonTestRateReceiver.reset()
ssc = new StreamingContext(checkpointDir)
ssc.start()
val outputNew = advanceTimeWithRealDelay(ssc, 2)

eventually(timeout(5.seconds)) {
assert(dstream.getCurrentRateLimit === Some(200))
}
ssc.stop()
ssc = null
}

// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.rate.RateEstimator

class RateControllerSuite extends TestSuiteBase {

override def useManualClock: Boolean = false

test("rate controller publishes updates") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val dstream = new RateLimitInputDStream(ssc)
dstream.register()
ssc.start()

eventually(timeout(10.seconds)) {
assert(dstream.publishCalls > 0)
}
}
}

test("publish rates reach receivers") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val dstream = new RateLimitInputDStream(ssc) {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
}
dstream.register()
SingletonTestRateReceiver.reset()
ssc.start()

eventually(timeout(10.seconds)) {
assert(dstream.getCurrentRateLimit === Some(200))
}
}
}

test("multiple publish rates reach receivers") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val rates = Seq(100L, 200L, 300L)

val dstream = new RateLimitInputDStream(ssc) {
override val rateController =
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
}
SingletonTestRateReceiver.reset()
dstream.register()

val observedRates = mutable.HashSet.empty[Long]
ssc.start()

eventually(timeout(20.seconds)) {
dstream.getCurrentRateLimit.foreach(observedRates += _)
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
}
}
}
}

private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator {
private var idx: Int = 0

private def nextRate(): Double = {
val rate = rates(idx)
idx = (idx + 1) % rates.size
rate
}

def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(nextRate())
}
Loading