Skip to content

Commit 93b74f8

Browse files
committed
Better explanation of historicalError.
1 parent 7975b0c commit 93b74f8

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,23 @@ private[streaming] class PIDRateEstimator(
7272
// in elements/second
7373
val error = latestRate - processingRate
7474

75-
// in elements/second
76-
val sumError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
75+
// The error integral, based on schedulingDelay as an indicator for accumulated errors
76+
// a scheduling delay s corresponds to s * processingRate overflowing elements. Those
77+
// are elements that couldn't be processed in previous batches, leading to this delay.
78+
// We assume the processingRate didn't change too much.
79+
// from the number of overflowing elements we can calculate the rate at which they would be
80+
// processed by dividing it by the batch interval. This rate is our "historical" error,
81+
// or integral part, since if we subtracted this rate from the previous "calculated rate",
82+
// there wouldn't have been any overflowing elements, and the scheduling delay would have
83+
// been zero.
84+
// (in elements/second)
85+
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
7786

7887
// in elements/(second ^ 2)
7988
val dError = (error - latestError) / delaySinceUpdate
8089

8190
val newRate = (latestRate + proportional * error +
82-
integral * sumError +
91+
integral * historicalError +
8392
derivative * dError).max(0.0)
8493
latestTime = time
8594
if (firstRun) {

0 commit comments

Comments
 (0)