Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added resource usage trackers for in-flight cancellation of SearchShardTask #4805

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactoring changes
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Oct 26, 2022
commit 962cc05791408838143e22491c31c4eeb3cb5ed3
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) {
List<Runnable> callbacks = new ArrayList<>();

for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
Copy link
Collaborator

@Bukhtawar Bukhtawar Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, but in general we should think about decoupling tracking and action once thresholds have breached. Today it might be search cancellation but I do envision this as an action that modifies threadpool size/queue in a manner that creates a backpressure
We can think about that refactor as a fast follow up as that will help us add more actions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also cancellation isn't truly back pressure :) it's load shedding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Trackers can recommend actions once thresholds are met, and cancellation of tasks can be one such action. This will however influence how dissimilar actions from different trackers are grouped/compared with each other in the SearchBackpressureService. For example, we need to aggregate the cancellation scores from each tracker before we start cancelling tasks. With generic actions, this might become really complicated.

Let's do a detailed design of this first and refactor as a follow-up. Enhancement: #4985

if (reason.isPresent()) {
reasons.add(reason.get());
callbacks.add(tracker::incrementCancellations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER;

/**
* CpuUsageTracker evaluates if the task has consumed too many CPU cycles than allowed.
*
* @opensearch.internal
*/
public class CpuUsageTracker extends TaskResourceUsageTracker {
public static final String NAME = "cpu_usage_tracker";

private final LongSupplier cpuTimeNanosThresholdSupplier;

public CpuUsageTracker(SearchBackpressureSettings settings) {
Expand All @@ -33,14 +33,11 @@ public CpuUsageTracker(SearchBackpressureSettings settings) {

@Override
public String name() {
return NAME;
return CPU_USAGE_TRACKER.getName();
}

@Override
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = cpuTimeNanosThresholdSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER;

/**
* ElapsedTimeTracker evaluates if the task has been running for more time than allowed.
*
* @opensearch.internal
*/
public class ElapsedTimeTracker extends TaskResourceUsageTracker {
public static final String NAME = "elapsed_time_tracker";

private final LongSupplier timeNanosSupplier;
private final LongSupplier elapsedTimeNanosThresholdSupplier;

Expand All @@ -35,14 +35,11 @@ public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier time

@Override
public String name() {
return NAME;
return ELAPSED_TIME_TRACKER.getName();
}

@Override
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = elapsedTimeNanosThresholdSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import java.util.function.DoubleSupplier;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER;

/**
* HeapUsageTracker evaluates if the task has consumed too much heap than allowed.
* It also compares the task's heap usage against a historical moving average of previously completed tasks.
*
* @opensearch.internal
*/
public class HeapUsageTracker extends TaskResourceUsageTracker implements SearchShardTaskSettings.Listener {
public static final String NAME = "heap_usage_tracker";

private final LongSupplier heapBytesThresholdSupplier;
private final DoubleSupplier heapVarianceThresholdSupplier;
private final AtomicReference<MovingAverage> movingAverageReference;
Expand All @@ -44,7 +44,7 @@ public HeapUsageTracker(SearchBackpressureSettings settings) {

@Override
public String name() {
return NAME;
return HEAP_USAGE_TRACKER.getName();
}

@Override
Expand All @@ -53,7 +53,7 @@ public void update(Task task) {
}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
MovingAverage movingAverage = movingAverageReference.get();

// There haven't been enough measurements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public long getCancellations() {
/**
* Notifies the tracker to update its state when a task execution completes.
*/
public abstract void update(Task task);
public void update(Task task) {}

/**
* Returns the cancellation reason for the given task, if it's eligible for cancellation.
*/
public abstract Optional<TaskCancellation.Reason> cancellationReason(Task task);
public abstract Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function signature still doesn't look right, it doesn't clarify if cancellation will occur or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel that it's correct – this only returns a TaskCancellation.Reason for a task eligible for cancellation. It doesn't say anything about whether the task will be actually cancelled or not.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure.trackers;

/**
* Defines the type of TaskResourceUsageTracker.
*/
public enum TaskResourceUsageTrackerType {
CPU_USAGE_TRACKER("cpu_usage_tracker"),
HEAP_USAGE_TRACKER("heap_usage_tracker"),
ELAPSED_TIME_TRACKER("elapsed_time_tracker");

private final String name;

TaskResourceUsageTrackerType(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public String name() {
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200);
CpuUsageTracker tracker = new CpuUsageTracker(mockSettings);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(1, reason.get().getCancellationScore());
assertEquals("cpu usage exceeded [200ms >= 15ms]", reason.get().getMessage());
Expand All @@ -43,7 +43,7 @@ public void testNotEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200);
CpuUsageTracker tracker = new CpuUsageTracker(mockSettings);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0);
ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(1, reason.get().getCancellationScore());
assertEquals("elapsed time exceeded [200ms >= 100ms]", reason.get().getMessage());
Expand All @@ -44,7 +44,7 @@ public void testNotEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000);
ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testEligibleForCancellation() {

// Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance).
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200);
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(4, reason.get().getCancellationScore());
assertEquals("heap usage exceeded [200b >= 100b]", reason.get().getMessage());
Expand All @@ -59,7 +59,7 @@ public void testNotEligibleForCancellation() {
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99);

// Not enough observations.
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());

// Record enough observations to make the moving average 'ready'.
Expand All @@ -68,13 +68,13 @@ public void testNotEligibleForCancellation() {
}

// Task with heap usage < heapBytesThreshold should not be cancelled.
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());

// Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled.
double allowedHeapUsage = 99.0 * 2.0;
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1));
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String name() {
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
return Optional.empty();
}
};
Expand Down