Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,15 @@ public String getType()
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile DateTime earlyStopTime = null;

/**
* When non-null, indicates a pending scale-during-rollover operation.
* The value is the target task count to scale to.
* Set in {@link #checkTaskDuration()} when autoscaler recommends scaling during task rollover.
* Applied in {@link #maybeApplyPendingScaleRollover()} when all actively reading tasks have stopped.
*/
private volatile Integer pendingRolloverTaskCount = null;

protected volatile RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
private volatile boolean started = false;
private volatile boolean stopped = false;
Expand Down Expand Up @@ -1762,6 +1771,8 @@ public void runInternal()

checkTaskDuration();

maybeApplyPendingScaleRollover();

checkPendingCompletionTasks();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called after checkPendingCompletionTasks()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybeApplyPendingScaleRollover is gated only on activelyReadingTaskGroups being empty, and it’s safe even if there are still publishing tasks. Applying the taskCount change earlier (before you prune/kill pending completion groups) ensures that the new allocation is staged as soon as it’s safe and avoids extra cycles where you keep old taskCount despite no active readers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the maybeApplyPendingScaleRollover() function comments then?

* This method is called after {@link #checkPendingCompletionTasks()} to check if a pending
   * scale rollover can be applied. The scale is only applied when:
   * <ul>
   *   <li>A pending rollover was set up in {@link #checkTaskDuration()} (Phase 1)</li>
   *   <li>All actively reading task groups have stopped (moved to pendingCompletionTaskGroups)</li>
   * </ul>
   * <p>
   * By deferring the taskCount change until all old tasks have stopped, we avoid
   * partition allocation mismatches that would cause {@link #discoverTasks()} to kill
   * publishing tasks on the next cycle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch! Thanks in advance!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

checkCurrentTaskState();
Expand Down Expand Up @@ -3392,6 +3403,57 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
}
});

// Phase 1 of scale-during-rollover: detect, set up, and continue draining.
// The taskCount change and re-allocation happen in Phase 2 after all tasks have stopped.
// We respect maxAllowedStops to avoid worker capacity exhaustion - rollover may take multiple cycles.
Integer targetRolloverTaskCount = pendingRolloverTaskCount;
if (taskAutoScaler != null && taskAutoScaler.shouldScaleDuringTaskRollover() && targetRolloverTaskCount == null && !futures.isEmpty()) {
// Detect new rollover opportunity
int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
if (rolloverTaskCount > 0 && rolloverTaskCount != getIoConfig().getTaskCount()) {
log.info(
"Autoscaler recommends scaling to [%d] tasks during rollover for supervisor[%s]. "
+ "Setting up a pending rollover - will apply after all tasks stop.",
rolloverTaskCount, supervisorId
);
targetRolloverTaskCount = rolloverTaskCount;
pendingRolloverTaskCount = rolloverTaskCount;
}
}

// Stop remaining active groups for rollover while respecting maxAllowedStops to avoid
// worker capacity exhaustion. Publishing tasks continue consuming worker slots,
// so stopping all at once could leave no capacity for new tasks.
// By invariant, having targetRolloverTaskCount != null means autoscaler is able to scale during rollover.
if (targetRolloverTaskCount != null) {
int numPendingCompletionTaskGroups = pendingCompletionTaskGroups.values().stream()
.mapToInt(List::size).sum();
int availableStops = ioConfig.getMaxAllowedStops() - numPendingCompletionTaskGroups - numStoppedTasks.get();
int stoppedForRollover = 0;

for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
Integer groupId = entry.getKey();
if (!futureGroupIds.contains(groupId)) {
if (stoppedForRollover >= availableStops) {
log.info(
"Deferring stop of taskGroup[%d] to next cycle - maxAllowedStops[%d] reached. "
+ "Pending rollover to [%d] tasks. Publishing tasks: [%d], stopped this cycle: [%d].",
groupId, ioConfig.getMaxAllowedStops(), targetRolloverTaskCount,
numPendingCompletionTaskGroups, numStoppedTasks.get()
);
break;
}
log.info(
"Stopping taskGroup[%d] for rollover to [%d] tasks.",
groupId, targetRolloverTaskCount
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(entry.getValue(), true));
stoppedForRollover++;
}
}
}

List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> results = coalesceAndAwait(futures);
for (int j = 0; j < results.size(); j++) {
Integer groupId = futureGroupIds.get(j);
Expand Down Expand Up @@ -3442,38 +3504,59 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
// remove this task group from the list of current task groups now that it has been handled
activelyReadingTaskGroups.remove(groupId);
}

maybeScaleDuringTaskRollover();
}


/**
* Scales up or down the number of tasks during a task rollover, if applicable.
* This method is called after {@link #checkTaskDuration()} and before
* {@link #checkPendingCompletionTasks()} to check if a pending scale rollover can be applied.
* The scale is only applied when:
* <ul>
* <li>A pending rollover was set up in {@link #checkTaskDuration()} (Phase 1), AND</li>
* <li>All actively reading task groups have stopped (moved to pendingCompletionTaskGroups)</li>
* </ul>
* <p>
* This method is invoked to determine whether a task count adjustment is needed
* during a task rollover based on the recommendations from the task auto-scaler.
*/

@VisibleForTesting
void maybeScaleDuringTaskRollover()
* This method is gated only on activelyReadingTaskGroups being empty, and it’s safe even
* if there are still publishing tasks. Applying the {@code taskCount} change before
* {@link #checkPendingCompletionTasks()} ensures the new allocation is staged as soon as it’s safe
* and avoids extra cycles where the old {@code taskCount} persists despite no active readers.
*/
void maybeApplyPendingScaleRollover()
{
if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
if (rolloverTaskCount > 0 && rolloverTaskCount != getIoConfig().getTaskCount()) {
log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
changeTaskCountInIOConfig(rolloverTaskCount);
// Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call.
// This seems the best way to inject task amount recalculation during the rollover.
clearAllocationInfo();

ServiceMetricEvent.Builder event = ServiceMetricEvent
.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());

emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, rolloverTaskCount));
}
// By invariant, having targetRolloverTaskCount != null means autoscaler is able to scale during rollover.
if (pendingRolloverTaskCount == null) {
return;
}

if (!activelyReadingTaskGroups.isEmpty()) {
log.debug(
"Pending rollover to [%d] tasks deferred - [%d] task groups still actively reading.",
pendingRolloverTaskCount, activelyReadingTaskGroups.size()
);
return;
}

int targetTaskCount = pendingRolloverTaskCount;
log.info(
"Applying pending scale rollover: changing taskCount from [%d] to [%d]. "
+ "All actively reading tasks have stopped.",
getIoConfig().getTaskCount(), targetTaskCount
);
changeTaskCountInIOConfig(pendingRolloverTaskCount);
clearAllocationInfo();

pendingRolloverTaskCount = null;

ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());

emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, targetTaskCount));

// Note: createNewTasks() will not create tasks this cycle because partitionGroups is now empty.
// On the next runInternal() cycle, updatePartitionDataFromStream() will rebuild partitionGroups
// with the new taskCount, and createNewTasks() will create the correctly-allocated tasks.
}

private DateTime computeEarliestTaskStartTime(TaskGroup group)
Expand Down Expand Up @@ -3502,8 +3585,8 @@ private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> checkpointTas
if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the sequences in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if
// publishing failed and we need to re-ingest)
// the metadata store (which will have advanced if we succeeded in publishing and will remain the same if
// publishing failed, and we need to re-ingest)
stateManager.recordCompletedTaskState(TaskState.SUCCESS);
return Futures.transform(
stopTasksInGroup(taskGroup, "task[%s] succeeded in the taskGroup", task.status.getId()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
Expand Down Expand Up @@ -48,8 +49,8 @@
* around the current PPT, then converting those to task counts. This allows non-divisor task counts
* while keeping changes gradual (no large jumps).
* <p>
* Scale-up and scale-down are both evaluated proactively.
* Future versions may perform scale-down on task rollover only.
* Scale-up is applied proactively, while scale-down is only enacted on task rollover
* when the rollover task count differs from the current count.
*/
public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
{
Expand Down Expand Up @@ -158,6 +159,12 @@ public int computeTaskCountForRollover()
}
}

@Override
public boolean shouldScaleDuringTaskRollover()
{
return config.isScaleDownOnTaskRolloverOnly();
}

public int computeTaskCountForScaleAction()
{
lastKnownMetrics = collectMetrics();
Expand Down Expand Up @@ -194,10 +201,12 @@ public CostBasedAutoScalerConfig getConfig()
* Returns -1 (no scaling needed) in the following cases:
* <ul>
* <li>Metrics are not available</li>
* <li>Partition or task counts are invalid</li>
* <li>No valid task counts remain after applying constraints</li>
* <li>Current task count already optimal</li>
* </ul>
*
* @return optimal task count for scale-up, or -1 if no scaling action needed
* @return optimal task count (scale up or down), or -1 if no scaling action needed
*/
int computeOptimalTaskCount(CostMetrics metrics)
{
Expand Down Expand Up @@ -314,7 +323,8 @@ static int[] computeValidTaskCounts(

/**
* Computes extra allowed increase in partitions-per-task in scenarios when the average per-partition lag
* is above the configured threshold. By default, it is {@code EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}.
* is above the configured threshold. By default, it is
* {@code EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD}.
* Generally, one of the autoscaler priorities is to keep the lag as close to zero as possible.
*/
static int computeExtraMaxPartitionsPerTaskIncrease(
Expand Down Expand Up @@ -376,12 +386,13 @@ static double extractPollIdleRatio(Map<String, Map<String, Object>> taskStats)
}

/**
* Extracts the average 15-minute moving average processing rate from task stats.
* Extracts the average processing rate from task stats, preferring the 15-minute moving average
* and falling back to 5-minute or 1-minute if needed.
* This rate represents the historical throughput (records per second) for each task,
* averaged across all tasks.
*
* @param taskStats the stats map from supervisor.getStats()
* @return the average 15-minute processing rate across all tasks in records/second,
* @return the average processing rate across all tasks in records/second,
* or -1 if no valid metrics are available
*/
static double extractMovingAverage(Map<String, Map<String, Object>> taskStats)
Expand Down Expand Up @@ -464,4 +475,12 @@ private CostMetrics collectMetrics()
);
}

/**
* Sets lastKnownMetrics for testing purposes.
*/
@VisibleForTesting
void setLastKnownMetrics(CostMetrics metrics)
{
this.lastKnownMetrics = metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ public void reset()
}

/**
* This method computes current consumer lag. Gets the total lag of all partitions and fill in the lagMetricsQueue
* This method computes current consumer lag. Gets the total lag of all partitions and fills the lagMetricsQueue.
*
* @return a Runnbale object to compute and collect lag.
* @return a Runnable object to compute and collect lag.
*/
private Runnable computeAndCollectLag()
{
Expand Down Expand Up @@ -196,13 +196,13 @@ private Runnable computeAndCollectLag()
* This method determines whether to do scale actions based on collected lag points.
* The current algorithm of scale is straightforward:
* <ul>
* <li>First, compute the proportion of lag points higher/lower than {@code scaleOutThreshold/scaleInThreshold},
* getting {@code scaleInThreshold/scaleOutThreshold},.
* <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
* <li>First, compute the proportion of lag points higher than {@code scaleOutThreshold} and
* lower than {@code scaleInThreshold}.
* <li>Secondly, compare those proportions with
* {@code triggerScaleOutFractionThreshold} and {@code triggerScaleInFractionThreshold}.
* <ul><li>P.S. Scale out action has a higher priority than scale in action.</ul>
* <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, scale out/in action would be triggered.
* <li>Finally, if the scale-out/scale-in proportions are higher than their respective trigger thresholds,
* scale out/in actions are triggered.
* </ul>
*
* @param lags the lag metrics of Stream (Kafka/Kinesis)
Expand Down
Loading
Loading