Skip to content

Commit

Permalink
Added resource usage trackers for in-flight cancellation of SearchSha…
Browse files Browse the repository at this point in the history
…rdTask

1. CpuUsageTracker: cancels tasks if they consume too much CPU
2. ElapsedTimeTracker: cancels tasks if they consume too much time
3. HeapUsageTracker: cancels tasks if they consume too much heap

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Oct 20, 2022
1 parent 3af46ae commit 7719d71
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Apply reproducible builds configuration for OpenSearch plugins through gradle plugin ([#4746](https://github.com/opensearch-project/OpenSearch/pull/4746))
- Add groupId value propagation tests for ZIP publication task ([#4772](https://github.com/opensearch-project/OpenSearch/pull/4772))
- Add support for GeoJson Point type in GeoPoint field ([#4597](https://github.com/opensearch-project/OpenSearch/pull/4597))
- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,11 +596,12 @@ public void apply(Settings value, Settings current, Settings previous) {
NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD
SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ public boolean request() {
*/
private static class State {
final double tokens;
final double lastRefilledAt;
final long lastRefilledAt;

public State(double tokens, double lastRefilledAt) {
public State(double tokens, long lastRefilledAt) {
this.tokens = tokens;
this.lastRefilledAt = lastRefilledAt;
}
Expand All @@ -113,7 +113,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
State state = (State) o;
return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0;
return Double.compare(state.tokens, tokens) == 0 && lastRefilledAt == state.lastRefilledAt;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.tasks.CancellableTask;
Expand All @@ -29,7 +32,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -84,7 +86,7 @@ public SearchBackpressureService(
() -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold()
)
),
Collections.emptyList()
List.of(new CpuUsageTracker(settings), new HeapUsageTracker(settings), new ElapsedTimeTracker(settings, System::nanoTime))
);
}

Expand All @@ -97,7 +99,7 @@ public SearchBackpressureService(
List<TaskResourceUsageTracker> taskResourceUsageTrackers
) {
this.settings = settings;
this.settings.setListener(this);
this.settings.addListener(this);
this.taskResourceTrackingService = taskResourceTrackingService;
this.taskResourceTrackingService.addTaskCompletionListener(this);
this.threadPool = threadPool;
Expand Down Expand Up @@ -181,10 +183,10 @@ boolean isNodeInDuress() {
* Returns true if the increase in heap usage is due to search requests.
*/
boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchShardTasks) {
long runningTasksHeapUsage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
long searchTasksHeapThreshold = getSettings().getSearchShardTaskSettings().getTotalHeapThresholdBytes();
if (runningTasksHeapUsage < searchTasksHeapThreshold) {
logger.debug("heap usage not dominated by search requests [{}/{}]", runningTasksHeapUsage, searchTasksHeapThreshold);
long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold();
if (usage < threshold) {
logger.debug("heap usage not dominated by search requests [{}/{}]", usage, threshold);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

package org.opensearch.search.backpressure.settings;

import org.apache.lucene.util.SetOnce;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Settings related to search backpressure and cancellation of in-flight requests.
Expand All @@ -23,7 +26,7 @@
*/
public class SearchBackpressureSettings {
private static class Defaults {
private static final long INTERVAL = 1000;
private static final long INTERVAL_MILLIS = 1000;

private static final boolean ENABLED = true;
private static final boolean ENFORCED = false;
Expand All @@ -37,9 +40,9 @@ private static class Defaults {
* Defines the interval (in millis) at which the SearchBackpressureService monitors and cancels tasks.
*/
private final TimeValue interval;
public static final Setting<Long> SETTING_INTERVAL = Setting.longSetting(
"search_backpressure.interval",
Defaults.INTERVAL,
public static final Setting<Long> SETTING_INTERVAL_MILLIS = Setting.longSetting(
"search_backpressure.interval_millis",
Defaults.INTERVAL_MILLIS,
1,
Setting.Property.NodeScope
);
Expand Down Expand Up @@ -116,15 +119,19 @@ public interface Listener {
void onCancellationBurstChanged();
}

private final SetOnce<Listener> listener = new SetOnce<>();
private final List<Listener> listeners = new ArrayList<>();
private final Settings settings;
private final ClusterSettings clusterSettings;
private final NodeDuressSettings nodeDuressSettings;
private final SearchShardTaskSettings searchShardTaskSettings;

public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) {
this.settings = settings;
this.clusterSettings = clusterSettings;
this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings);
this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings);

interval = new TimeValue(SETTING_INTERVAL.get(settings));
interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings));

enabled = SETTING_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_ENABLED, this::setEnabled);
Expand All @@ -142,8 +149,16 @@ public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSett
clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst);
}

public void setListener(Listener listener) {
this.listener.set(listener);
public void addListener(Listener listener) {
listeners.add(listener);
}

public Settings getSettings() {
return settings;
}

public ClusterSettings getClusterSettings() {
return clusterSettings;
}

public NodeDuressSettings getNodeDuressSettings() {
Expand Down Expand Up @@ -180,9 +195,7 @@ public double getCancellationRatio() {

private void setCancellationRatio(double cancellationRatio) {
this.cancellationRatio = cancellationRatio;
if (listener.get() != null) {
listener.get().onCancellationRatioChanged();
}
notifyListeners(Listener::onCancellationRatioChanged);
}

public double getCancellationRate() {
Expand All @@ -195,9 +208,7 @@ public double getCancellationRateNanos() {

private void setCancellationRate(double cancellationRate) {
this.cancellationRate = cancellationRate;
if (listener.get() != null) {
listener.get().onCancellationRateChanged();
}
notifyListeners(Listener::onCancellationRateChanged);
}

public double getCancellationBurst() {
Expand All @@ -206,8 +217,20 @@ public double getCancellationBurst() {

private void setCancellationBurst(double cancellationBurst) {
this.cancellationBurst = cancellationBurst;
if (listener.get() != null) {
listener.get().onCancellationBurstChanged();
notifyListeners(Listener::onCancellationBurstChanged);
}

private void notifyListeners(Consumer<Listener> consumer) {
List<Exception> exceptions = new ArrayList<>();

for (Listener listener : listeners) {
try {
consumer.accept(listener);
} catch (Exception e) {
exceptions.add(e);
}
}

ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
}
}
Loading

0 comments on commit 7719d71

Please sign in to comment.