Skip to content

Dynamic Batch Interval Adjustment #16993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.matching.Regex

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.scheduler.{BatchController, Job}
import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{CallSite, Utils}

Expand Down Expand Up @@ -69,13 +67,13 @@ abstract class DStream[T: ClassTag] (
// Methods that should be implemented by subclasses of DStream
// =======================================================================

/** Time interval after which the DStream generates an RDD */
/** Time interval after which the DStream generates a RDD */
def slideDuration: Duration

/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]

/** Method that generates an RDD for the given time */
/** Method that generates a RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]

// =======================================================================
Expand Down Expand Up @@ -311,7 +309,9 @@ abstract class DStream[T: ClassTag] (
private[streaming] def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new SparkException (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
} else if (time <= zeroTime ||
! ((time - zeroTime).isMultipleOf(slideDuration) ||
BatchController.getBatchIntervalEnabled())) {
logInfo(s"Time $time is invalid as zeroTime is $zeroTime" +
s" , slideDuration is $slideDuration and difference is ${time - zeroTime}")
false
Expand All @@ -338,7 +338,7 @@ abstract class DStream[T: ClassTag] (
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.scheduler.batch.BatchEstimator
import org.apache.spark.util.ThreadUtils
import scala.concurrent.{Future ,ExecutionContext}

abstract class BatchController(val streamUID: Int, batchEstimator: BatchEstimator)
extends StreamingListener with Serializable with Logging {

init()

protected def publish(rate: Long): Unit

@transient
implicit private var executionContext: ExecutionContext = _

@transient
private var batchInterval: AtomicLong = _

var totalDelay: Long = -1L
var schedulerDelay: Long = -1L
var numRecords: Long = -1L
var processingDelay: Long = -1L
var batchIntevl: Long = -1L

/**
* An initialization method called both from the constructor and Serialization code.
*/
private def init() {
executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("stream-batchInteval-update"))
batchInterval = new AtomicLong(-1L)
}

/**
* Computing the Batch Intarvel and Publish it
*/
private def computeAndPublish(processTime: Long, batchIntevl: Long): Unit =
Future[Unit]{
val newBatchInterval = batchEstimator.compute(processTime, batchIntevl)
logInfo(s" ##### the newBatchInterval is $newBatchInterval")
newBatchInterval.foreach{ s =>
batchInterval.set(s)
logInfo(s" ##### after setting newBatchInterval is $batchInterval")
publish(getLatestBatchInterval())
}
}

def getLatestBatchInterval(): Long = batchInterval.get()


/**
* Compute the batch interval after completed
* @param batchCompleted
*/
override def onBatchCompleted (batchCompleted: StreamingListenerBatchCompleted) {
totalDelay = batchCompleted.batchInfo.totalDelay.get
schedulerDelay = batchCompleted.batchInfo.schedulingDelay.get
numRecords = batchCompleted.batchInfo.numRecords
processingDelay = batchCompleted.batchInfo.processingDelay.get
batchIntevl = batchCompleted.batchInfo.batchInterval

logInfo(s"processingDelay $processingDelay | batchInterval $batchIntevl " +
s"| totalDelay $totalDelay | schedulerDelay $schedulerDelay | numRecords $numRecords")

for{
processingTime <- batchCompleted.batchInfo.processingDelay
batchInterval <- Option(batchCompleted.batchInfo.batchInterval)
} computeAndPublish(processingTime, batchInterval)

logInfo(s" ##### Hear onBatchCompleted msg and begin to compute.")
}
}
/**
*Get the configure
*/
object BatchController {
// is the dynamic batch interval enabled
var isEnable: Boolean = false
def isDynamicBatchIntervalEnabled(conf: SparkConf): Boolean =
conf.getBoolean("spark.streaming.dynamicBatchInterval.enabled", false)

def getBatchIntervalEnabled(): Boolean = isEnable
def setBatchIntervalEnabled(enabled: Boolean): Unit = {
isEnable = enabled
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.streaming.Time
/**
* :: DeveloperApi ::
* Class having information on completed batches.
* @param batchInterval Time of the batch Intercval
* @param batchTime Time of the batch
* @param streamIdToInputInfo A map of input stream id to its input info
* @param submissionTime Clock time of when jobs of this batch was submitted to
Expand All @@ -33,6 +34,7 @@ import org.apache.spark.streaming.Time
*/
@DeveloperApi
case class BatchInfo(
batchInterval: Long,
batchTime: Time,
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.streaming.scheduler

import scala.util.{Failure, Success, Try}

import org.apache.spark.internal.Logging
import org.apache.spark.streaming.scheduler.batch.BatchEstimator

import scala.util.{Failure, Success, Try}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Duration, Time}
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
Expand All @@ -34,6 +35,8 @@ private[scheduler] case class DoCheckpoint(
time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent

case class UpdateBatchInterval(newBatchInterval : Long) extends JobGeneratorEvent

/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
Expand Down Expand Up @@ -77,6 +80,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// last batch whose completion,checkpointing and metadata cleanup has been completed
private var lastProcessedBatch: Time = null

protected[streaming] val batchController: Option[BatchController] = {
if (BatchController.isDynamicBatchIntervalEnabled(ssc.conf)) {
logInfo(s" ##### init batchController....")
Some(new ReceiverBatchController(
ssc.getNewInputStreamId(), BatchEstimator.create(ssc.conf, ssc.graph.batchDuration)))
} else {
None
}
}
/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
Expand Down Expand Up @@ -185,6 +197,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
case UpdateBatchInterval(newBatchInterval) => updateBatchInteval(newBatchInterval)
}
}

Expand Down Expand Up @@ -231,7 +244,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// added but not allocated, are dangling in the queue after recovering, we have to allocate
// those blocks to the next batch, which is the batch they were supposed to go.
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
jobScheduler.submitJobSet(JobSet(
graph.batchDuration.milliseconds, time, graph.generateJobs(time)))
}

// Restart the timer
Expand All @@ -250,7 +264,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
jobScheduler.submitJobSet(JobSet(
graph.batchDuration.milliseconds, time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
Expand Down Expand Up @@ -289,6 +304,21 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
markBatchFullyProcessed(time)
}


/**
* update batch interval
* */
private def updateBatchInterval(newBatchInterval: Long) {
this.synchronized{
logInfo(s" ##### updateBatchInterval. before update the batchDuration " +
"${graph.batchDuration} and the period ${timer.period} ")
graph.batchDuration = new Duration(newBatchInterval)
timer.period = newBatchInterval
logInfo(s" ##### after update the batchDuration" +
" ${graph.batchDuration} and the period ${timer.period} ")
}
}

/** Perform checkpoint for the given `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
Expand All @@ -303,4 +333,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private def markBatchFullyProcessed(time: Time) {
lastProcessedBatch = time
}

private[streaming] class ReceiverBatchController(id: Int, estimator: BatchEstimator)
extends BatchController(id, estimator) with Logging{
override def publish(newBatchInterval: Long): Unit = {
logInfo(s"##### Begin to publish new batch interval $newBatchInterval " )
eventLoop.post(UpdateBatchInterval(newBatchInterval))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.commons.lang3.SerializationUtils

import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.ui.UIUtils
Expand Down Expand Up @@ -83,6 +82,11 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

// attach batch controllers of input streams to receive batch completion updates
for {
batchController <- jobGenerator.batchController
} ssc.addStreamingListener(batchController)

listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
Expand Down Expand Up @@ -201,20 +205,18 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
}
}
}

Expand Down Expand Up @@ -253,7 +255,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.streaming.Time
*/
private[streaming]
case class JobSet(
batchInterval: Long,
time: Time,
jobs: Seq[Job],
streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {
Expand Down Expand Up @@ -62,6 +63,7 @@ case class JobSet(

def toBatchInfo: BatchInfo = {
BatchInfo(
batchInterval,
time,
streamIdToInputInfo,
submissionTime,
Expand Down
Loading