diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 1c284627a29d9..320cb5457b21c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -41,7 +41,9 @@ import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; -import org.opensearch.search.backpressure.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.NodeDuressSettings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.watcher.ResourceWatcherService; @@ -583,19 +585,21 @@ public void apply(Settings value, Settings current, Settings previous) { IndexingPressure.MAX_INDEXING_BYTES, TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED, TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED, + + // Settings related to search backpressure SearchBackpressureSettings.SETTING_ENABLED, SearchBackpressureSettings.SETTING_ENFORCED, - SearchBackpressureSettings.SETTING_NODE_DURESS_NUM_CONSECUTIVE_BREACHES, - SearchBackpressureSettings.SETTING_NODE_DURESS_CPU_THRESHOLD, - SearchBackpressureSettings.SETTING_NODE_DURESS_HEAP_THRESHOLD, - SearchBackpressureSettings.SETTING_SEARCH_HEAP_THRESHOLD, - SearchBackpressureSettings.SETTING_SEARCH_TASK_HEAP_THRESHOLD, - SearchBackpressureSettings.SETTING_SEARCH_TASK_HEAP_VARIANCE_THRESHOLD, - SearchBackpressureSettings.SETTING_SEARCH_TASK_CPU_TIME_THRESHOLD, - SearchBackpressureSettings.SETTING_SEARCH_TASK_ELAPSED_TIME_THRESHOLD, SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, SearchBackpressureSettings.SETTING_CANCELLATION_RATE, - SearchBackpressureSettings.SETTING_CANCELLATION_BURST + SearchBackpressureSettings.SETTING_CANCELLATION_BURST, + NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES, + NodeDuressSettings.SETTING_CPU_THRESHOLD, + NodeDuressSettings.SETTING_HEAP_THRESHOLD, + SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD, + SearchShardTaskSettings.SETTING_HEAP_THRESHOLD, + SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD, + SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD, + SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/common/Streak.java b/server/src/main/java/org/opensearch/common/util/Streak.java similarity index 63% rename from server/src/main/java/org/opensearch/common/Streak.java rename to server/src/main/java/org/opensearch/common/util/Streak.java index 647009f9cb092..5f6ad3021659e 100644 --- a/server/src/main/java/org/opensearch/common/Streak.java +++ b/server/src/main/java/org/opensearch/common/util/Streak.java @@ -6,28 +6,28 @@ * compatible open source license. */ -package org.opensearch.common; +package org.opensearch.common.util; import java.util.concurrent.atomic.AtomicInteger; /** - * Streak is a data structure that keeps track of the number of consecutive successful events. + * Streak is a data structure that keeps track of the number of successive successful events. * * @opensearch.internal */ public class Streak { - private final AtomicInteger consecutiveSuccessfulEvents = new AtomicInteger(); + private final AtomicInteger successiveSuccessfulEvents = new AtomicInteger(); public int record(boolean isSuccessful) { if (isSuccessful) { - return consecutiveSuccessfulEvents.incrementAndGet(); + return successiveSuccessfulEvents.incrementAndGet(); } else { - consecutiveSuccessfulEvents.set(0); + successiveSuccessfulEvents.set(0); return 0; } } public int length() { - return consecutiveSuccessfulEvents.get(); + return successiveSuccessfulEvents.get(); } } diff --git a/server/src/main/java/org/opensearch/common/util/TokenBucket.java b/server/src/main/java/org/opensearch/common/util/TokenBucket.java index 8535db421a9e6..e47f152d71363 100644 --- a/server/src/main/java/org/opensearch/common/util/TokenBucket.java +++ b/server/src/main/java/org/opensearch/common/util/TokenBucket.java @@ -8,6 +8,8 @@ package org.opensearch.common.util; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; /** @@ -36,9 +38,10 @@ public class TokenBucket { */ private final double burst; - private double tokens; - - private long lastRefilledAt; + /** + * Defines the current state of the token bucket. + */ + private final AtomicReference state; public TokenBucket(LongSupplier clock, double rate, double burst) { this(clock, rate, burst, burst); @@ -56,18 +59,7 @@ public TokenBucket(LongSupplier clock, double rate, double burst, double initial this.clock = clock; this.rate = rate; this.burst = burst; - this.tokens = Math.min(initialTokens, burst); - this.lastRefilledAt = clock.getAsLong(); - } - - /** - * Refills the token bucket. - */ - private void refill() { - long now = clock.getAsLong(); - double incr = (now - lastRefilledAt) * rate; - tokens = Math.min(tokens + incr, burst); - lastRefilledAt = now; + this.state = new AtomicReference<>(new State(Math.min(initialTokens, burst), clock.getAsLong())); } /** @@ -79,19 +71,54 @@ public boolean request(double n) { throw new IllegalArgumentException("requested tokens must be greater than zero"); } - synchronized (this) { - refill(); - - if (tokens >= n) { - tokens -= n; - return true; + // Refill tokens + State currentState, updatedState; + do { + currentState = state.get(); + long now = clock.getAsLong(); + double incr = (now - currentState.lastRefilledAt) * rate; + updatedState = new State(Math.min(currentState.tokens + incr, burst), now); + } while (state.compareAndSet(currentState, updatedState) == false); + + // Deduct tokens + do { + currentState = state.get(); + if (currentState.tokens < n) { + return false; } + updatedState = new State(currentState.tokens - n, currentState.lastRefilledAt); + } while (state.compareAndSet(currentState, updatedState) == false); - return false; - } + return true; } public boolean request() { return request(1.0); } + + /** + * Represents an immutable token bucket state. + */ + private static class State { + final double tokens; + final double lastRefilledAt; + + public State(double tokens, double lastRefilledAt) { + this.tokens = tokens; + this.lastRefilledAt = lastRefilledAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + State state = (State) o; + return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(tokens, lastRefilledAt); + } + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1bbd174cd06cf..961760f22f8ed 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -43,7 +43,7 @@ import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.search.backpressure.SearchBackpressureService; -import org.opensearch.search.backpressure.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; @@ -1112,6 +1112,7 @@ public Node start() throws NodeValidationException { injector.getInstance(SearchService.class).start(); injector.getInstance(FsHealthService.class).start(); nodeService.getMonitorService().start(); + nodeService.getSearchBackpressureService().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); @@ -1267,6 +1268,7 @@ private Node stop() { injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(FsHealthService.class).stop(); nodeService.getMonitorService().stop(); + nodeService.getSearchBackpressureService().stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); @@ -1326,6 +1328,7 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(Discovery.class)); toClose.add(() -> stopWatch.stop().start("monitor")); toClose.add(nodeService.getMonitorService()); + toClose.add(nodeService.getSearchBackpressureService()); toClose.add(() -> stopWatch.stop().start("fsHealth")); toClose.add(injector.getInstance(FsHealthService.class)); toClose.add(() -> stopWatch.stop().start("gateway")); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index ff102c5c65922..f4e037db66ca2 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -207,6 +207,10 @@ public MonitorService getMonitorService() { return monitorService; } + public SearchBackpressureService getSearchBackpressureService() { + return searchBackpressureService; + } + @Override public void close() throws IOException { IOUtils.close(indicesService); diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index e855cbdf75673..885846a177d60 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -12,18 +12,22 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.util.TokenBucket; import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.monitor.process.ProcessProbe; -import org.opensearch.search.backpressure.trackers.NodeResourceUsageTracker; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.trackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -39,14 +43,20 @@ * * @opensearch.internal */ -public class SearchBackpressureService implements Runnable, TaskCompletionListener, SearchBackpressureSettings.Listener { +public class SearchBackpressureService extends AbstractLifecycleComponent + implements + TaskCompletionListener, + SearchBackpressureSettings.Listener { private static final Logger logger = LogManager.getLogger(SearchBackpressureService.class); + private volatile Scheduler.Cancellable scheduledFuture; + private final SearchBackpressureSettings settings; private final TaskResourceTrackingService taskResourceTrackingService; + private final ThreadPool threadPool; private final LongSupplier timeNanosSupplier; - private final List nodeResourceUsageTrackers; + private final List nodeDuressTrackers; private final List taskResourceUsageTrackers; private final AtomicReference taskCancellationRateLimiter = new AtomicReference<>(); @@ -67,11 +77,11 @@ public SearchBackpressureService( threadPool, System::nanoTime, List.of( - new NodeResourceUsageTracker( - () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressCpuThreshold() + new NodeDuressTracker( + () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings().getCpuThreshold() ), - new NodeResourceUsageTracker( - () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressHeapThreshold() + new NodeDuressTracker( + () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold() ) ), Collections.emptyList() @@ -83,15 +93,16 @@ public SearchBackpressureService( TaskResourceTrackingService taskResourceTrackingService, ThreadPool threadPool, LongSupplier timeNanosSupplier, - List nodeResourceUsageTrackers, + List nodeDuressTrackers, List taskResourceUsageTrackers ) { this.settings = settings; this.settings.setListener(this); this.taskResourceTrackingService = taskResourceTrackingService; this.taskResourceTrackingService.addTaskCompletionListener(this); + this.threadPool = threadPool; this.timeNanosSupplier = timeNanosSupplier; - this.nodeResourceUsageTrackers = nodeResourceUsageTrackers; + this.nodeDuressTrackers = nodeDuressTrackers; this.taskResourceUsageTrackers = taskResourceUsageTrackers; this.taskCancellationRateLimiter.set( @@ -101,20 +112,9 @@ public SearchBackpressureService( this.taskCancellationRatioLimiter.set( new TokenBucket(state::getCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst()) ); - - threadPool.scheduleWithFixedDelay(this, getSettings().getInterval(), ThreadPool.Names.GENERIC); - } - - @Override - public void run() { - try { - doRun(); - } catch (Exception e) { - logger.debug("failure in search search backpressure", e); - } } - public void doRun() { + void doRun() { if (getSettings().isEnabled() == false) { return; } @@ -166,10 +166,10 @@ public void doRun() { */ boolean isNodeInDuress() { boolean isNodeInDuress = false; - int numConsecutiveBreaches = getSettings().getNodeDuressNumConsecutiveBreaches(); + int numSuccessiveBreaches = getSettings().getNodeDuressSettings().getNumSuccessiveBreaches(); - for (NodeResourceUsageTracker tracker : nodeResourceUsageTrackers) { - if (tracker.check() >= numConsecutiveBreaches) { + for (NodeDuressTracker tracker : nodeDuressTrackers) { + if (tracker.check() >= numSuccessiveBreaches) { isNodeInDuress = true; // not breaking the loop so that each tracker's streak gets updated. } } @@ -182,7 +182,7 @@ boolean isNodeInDuress() { */ boolean isHeapUsageDominatedBySearch(List searchShardTasks) { long runningTasksHeapUsage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); - long searchTasksHeapThreshold = getSettings().getSearchHeapThresholdBytes(); + long searchTasksHeapThreshold = getSettings().getSearchShardTaskSettings().getTotalHeapThresholdBytes(); if (runningTasksHeapUsage < searchTasksHeapThreshold) { logger.debug("heap usage not dominated by search requests [{}/{}]", runningTasksHeapUsage, searchTasksHeapThreshold); return false; @@ -216,7 +216,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) { Optional reason = tracker.cancellationReason(task); if (reason.isPresent()) { reasons.add(reason.get()); - callbacks.add(() -> tracker.update(task)); + callbacks.add(tracker::incrementCancellations); } } @@ -234,6 +234,14 @@ List getTaskCancellations(List tasks) { .collect(Collectors.toUnmodifiableList()); } + SearchBackpressureSettings getSettings() { + return settings; + } + + SearchBackpressureState getState() { + return state; + } + @Override public void onTaskCompleted(Task task) { if (getSettings().isEnabled() == false) { @@ -280,11 +288,24 @@ public void onCancellationBurstChanged() { onCancellationRateChanged(); } - public SearchBackpressureSettings getSettings() { - return settings; + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + try { + doRun(); + } catch (Exception e) { + logger.debug("failure in search search backpressure", e); + } + }, getSettings().getInterval(), ThreadPool.Names.GENERIC); } - public SearchBackpressureState getState() { - return state; + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } } + + @Override + protected void doClose() throws IOException {} } diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureSettings.java deleted file mode 100644 index 477bd6cdad48f..0000000000000 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureSettings.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.backpressure; - -import org.apache.lucene.util.SetOnce; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.monitor.jvm.JvmStats; - -import java.util.concurrent.TimeUnit; - -/** - * Settings related to search backpressure and cancellation of in-flight requests. - * - * @opensearch.internal - */ -public class SearchBackpressureSettings { - private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); - - /** - * Default values for each setting. - */ - private static class Defaults { - private static final long INTERVAL = 1000; - - private static final boolean ENABLED = true; - private static final boolean ENFORCED = false; - - private static final int NODE_DURESS_NUM_CONSECUTIVE_BREACHES = 3; - private static final double NODE_DURESS_CPU_THRESHOLD = 0.9; - private static final double NODE_DURESS_HEAP_THRESHOLD = 0.7; - - private static final double SEARCH_HEAP_THRESHOLD = 0.05; - private static final double SEARCH_TASK_HEAP_THRESHOLD = 0.005; - private static final double SEARCH_TASK_HEAP_VARIANCE_THRESHOLD = 2.0; - - private static final long SEARCH_TASK_CPU_TIME_THRESHOLD = 15; - private static final long SEARCH_TASK_ELAPSED_TIME_THRESHOLD = 30000; - - private static final double CANCELLATION_RATIO = 0.1; - private static final double CANCELLATION_RATE = 0.003; - private static final double CANCELLATION_BURST = 10.0; - } - - /** - * Callback listeners. - */ - public interface Listener { - void onCancellationRatioChanged(); - - void onCancellationRateChanged(); - - void onCancellationBurstChanged(); - } - - private final SetOnce listener = new SetOnce<>(); - - public void setListener(Listener listener) { - this.listener.set(listener); - } - - // Static settings - - /** - * Defines the interval (in millis) at which the SearchBackpressureService monitors and cancels tasks. - */ - private final TimeValue interval; - public static final Setting SETTING_INTERVAL = Setting.longSetting( - "search_backpressure.interval", - Defaults.INTERVAL, - 1, - Setting.Property.NodeScope - ); - - // Dynamic settings - - /** - * Defines whether search backpressure is enabled or not. - */ - private volatile boolean enabled; - public static final Setting SETTING_ENABLED = Setting.boolSetting( - "search_backpressure.enabled", - Defaults.ENABLED, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines whether in-flight cancellation of tasks is enabled or not. - */ - private volatile boolean enforced; - public static final Setting SETTING_ENFORCED = Setting.boolSetting( - "search_backpressure.enforced", - Defaults.ENFORCED, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the number of consecutive limit breaches after the node is marked "in duress". - */ - private volatile int nodeDuressNumConsecutiveBreaches; - public static final Setting SETTING_NODE_DURESS_NUM_CONSECUTIVE_BREACHES = Setting.intSetting( - "search_backpressure.node_duress.num_consecutive_breaches", - Defaults.NODE_DURESS_NUM_CONSECUTIVE_BREACHES, - 1, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the CPU usage threshold (in percentage) for a node to be considered "in duress". - */ - private volatile double nodeDuressCpuThreshold; - public static final Setting SETTING_NODE_DURESS_CPU_THRESHOLD = Setting.doubleSetting( - "search_backpressure.node_duress.cpu_threshold", - Defaults.NODE_DURESS_CPU_THRESHOLD, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the heap usage threshold (in percentage) for a node to be considered "in duress". - */ - private volatile double nodeDuressHeapThreshold; - public static final Setting SETTING_NODE_DURESS_HEAP_THRESHOLD = Setting.doubleSetting( - "search_backpressure.node_duress.heap_threshold", - Defaults.NODE_DURESS_HEAP_THRESHOLD, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the heap usage threshold (in percentage) for the sum of heap usages across all search tasks - * before in-flight cancellation is applied. - */ - private volatile double searchHeapThreshold; - public static final Setting SETTING_SEARCH_HEAP_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_heap_threshold", - Defaults.SEARCH_HEAP_THRESHOLD, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. - */ - private volatile double searchTaskHeapThreshold; - public static final Setting SETTING_SEARCH_TASK_HEAP_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_task_heap_threshold", - Defaults.SEARCH_TASK_HEAP_THRESHOLD, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the heap usage variance for an individual task before it is considered for cancellation. - * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. - */ - private volatile double searchTaskHeapVarianceThreshold; - public static final Setting SETTING_SEARCH_TASK_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_task_heap_variance", - Defaults.SEARCH_TASK_HEAP_VARIANCE_THRESHOLD, - 0.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long searchTaskCpuTimeThreshold; - public static final Setting SETTING_SEARCH_TASK_CPU_TIME_THRESHOLD = Setting.longSetting( - "search_backpressure.search_task_cpu_time_threshold", - Defaults.SEARCH_TASK_CPU_TIME_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long searchTaskElapsedTimeThreshold; - public static final Setting SETTING_SEARCH_TASK_ELAPSED_TIME_THRESHOLD = Setting.longSetting( - "search_backpressure.search_task_elapsed_time_threshold", - Defaults.SEARCH_TASK_ELAPSED_TIME_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the percentage of tasks to cancel relative to the number of successful task completions. - * In other words, it is the number of tokens added to the bucket on each successful task completion. - */ - private volatile double cancellationRatio; - public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( - "search_backpressure.cancellation_ratio", - Defaults.CANCELLATION_RATIO, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the number of tasks to cancel per unit time (in millis). - * In other words, it is the number of tokens added to the bucket each millisecond. - */ - private volatile double cancellationRate; - public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( - "search_backpressure.cancellation_rate", - Defaults.CANCELLATION_RATE, - 0.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the maximum number of tasks that can be cancelled before being rate-limited. - */ - private volatile double cancellationBurst; - public static final Setting SETTING_CANCELLATION_BURST = Setting.doubleSetting( - "search_backpressure.cancellation_burst", - Defaults.CANCELLATION_BURST, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) { - interval = new TimeValue(SETTING_INTERVAL.get(settings)); - - enabled = SETTING_ENABLED.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_ENABLED, this::setEnabled); - - enforced = SETTING_ENFORCED.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_ENFORCED, this::setEnforced); - - nodeDuressNumConsecutiveBreaches = SETTING_NODE_DURESS_NUM_CONSECUTIVE_BREACHES.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_NODE_DURESS_NUM_CONSECUTIVE_BREACHES, this::setNodeDuressNumConsecutiveBreaches); - - nodeDuressCpuThreshold = SETTING_NODE_DURESS_CPU_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_NODE_DURESS_CPU_THRESHOLD, this::setNodeDuressCpuThreshold); - - nodeDuressHeapThreshold = SETTING_NODE_DURESS_HEAP_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_NODE_DURESS_HEAP_THRESHOLD, this::setNodeDuressHeapThreshold); - - searchHeapThreshold = SETTING_SEARCH_HEAP_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_SEARCH_HEAP_THRESHOLD, this::setSearchHeapThreshold); - - searchTaskHeapThreshold = SETTING_SEARCH_TASK_HEAP_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_SEARCH_TASK_HEAP_THRESHOLD, this::setSearchTaskHeapThreshold); - - searchTaskHeapVarianceThreshold = SETTING_SEARCH_TASK_HEAP_VARIANCE_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_SEARCH_TASK_HEAP_VARIANCE_THRESHOLD, this::setSearchTaskHeapVarianceThreshold); - - searchTaskCpuTimeThreshold = SETTING_SEARCH_TASK_CPU_TIME_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_SEARCH_TASK_CPU_TIME_THRESHOLD, this::setSearchTaskCpuTimeThreshold); - - searchTaskElapsedTimeThreshold = SETTING_SEARCH_TASK_ELAPSED_TIME_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_SEARCH_TASK_ELAPSED_TIME_THRESHOLD, this::setSearchTaskElapsedTimeThreshold); - - cancellationRatio = SETTING_CANCELLATION_RATIO.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, this::setCancellationRatio); - - cancellationRate = SETTING_CANCELLATION_RATE.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATE, this::setCancellationRate); - - cancellationBurst = SETTING_CANCELLATION_BURST.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst); - } - - public TimeValue getInterval() { - return interval; - } - - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public boolean isEnforced() { - return enforced; - } - - public void setEnforced(boolean enforced) { - this.enforced = enforced; - } - - public int getNodeDuressNumConsecutiveBreaches() { - return nodeDuressNumConsecutiveBreaches; - } - - public void setNodeDuressNumConsecutiveBreaches(int nodeDuressNumConsecutiveBreaches) { - this.nodeDuressNumConsecutiveBreaches = nodeDuressNumConsecutiveBreaches; - } - - public double getNodeDuressCpuThreshold() { - return nodeDuressCpuThreshold; - } - - public void setNodeDuressCpuThreshold(double nodeDuressCpuThreshold) { - this.nodeDuressCpuThreshold = nodeDuressCpuThreshold; - } - - public double getNodeDuressHeapThreshold() { - return nodeDuressHeapThreshold; - } - - public void setNodeDuressHeapThreshold(double nodeDuressHeapThreshold) { - this.nodeDuressHeapThreshold = nodeDuressHeapThreshold; - } - - public double getSearchHeapThreshold() { - return searchHeapThreshold; - } - - public long getSearchHeapThresholdBytes() { - return (long) (HEAP_SIZE_BYTES * getSearchHeapThreshold()); - } - - public void setSearchHeapThreshold(double searchHeapThreshold) { - this.searchHeapThreshold = searchHeapThreshold; - } - - public double getSearchTaskHeapThreshold() { - return searchTaskHeapThreshold; - } - - public long getSearchTaskHeapThresholdBytes() { - return (long) (HEAP_SIZE_BYTES * getSearchTaskHeapThreshold()); - } - - public void setSearchTaskHeapThreshold(double searchTaskHeapThreshold) { - this.searchTaskHeapThreshold = searchTaskHeapThreshold; - } - - public double getSearchTaskHeapVarianceThreshold() { - return searchTaskHeapVarianceThreshold; - } - - public void setSearchTaskHeapVarianceThreshold(double searchTaskHeapVarianceThreshold) { - this.searchTaskHeapVarianceThreshold = searchTaskHeapVarianceThreshold; - } - - public long getSearchTaskCpuTimeThreshold() { - return searchTaskCpuTimeThreshold; - } - - public void setSearchTaskCpuTimeThreshold(long searchTaskCpuTimeThreshold) { - this.searchTaskCpuTimeThreshold = searchTaskCpuTimeThreshold; - } - - public long getSearchTaskElapsedTimeThreshold() { - return searchTaskElapsedTimeThreshold; - } - - public void setSearchTaskElapsedTimeThreshold(long searchTaskElapsedTimeThreshold) { - this.searchTaskElapsedTimeThreshold = searchTaskElapsedTimeThreshold; - } - - public double getCancellationRatio() { - return cancellationRatio; - } - - public void setCancellationRatio(double cancellationRatio) { - this.cancellationRatio = cancellationRatio; - if (listener.get() != null) { - listener.get().onCancellationRatioChanged(); - } - } - - public double getCancellationRate() { - return cancellationRate; - } - - public double getCancellationRateNanos() { - return getCancellationRate() / TimeUnit.MILLISECONDS.toNanos(1); // rate per nanoseconds - } - - public void setCancellationRate(double cancellationRate) { - this.cancellationRate = cancellationRate; - if (listener.get() != null) { - listener.get().onCancellationRateChanged(); - } - } - - public double getCancellationBurst() { - return cancellationBurst; - } - - public void setCancellationBurst(double cancellationBurst) { - this.cancellationBurst = cancellationBurst; - if (listener.get() != null) { - listener.get().onCancellationBurstChanged(); - } - } -} diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java index 6e1de4f4e6fc5..a62231ec29ede 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java @@ -35,7 +35,7 @@ public long getCompletionCount() { return completionCount.get(); } - public long incrementCompletionCount() { + long incrementCompletionCount() { return completionCount.incrementAndGet(); } @@ -43,7 +43,7 @@ public long getCancellationCount() { return cancellationCount.get(); } - public long incrementCancellationCount() { + long incrementCancellationCount() { return cancellationCount.incrementAndGet(); } @@ -51,7 +51,7 @@ public long getLimitReachedCount() { return limitReachedCount.get(); } - public long incrementLimitReachedCount() { + long incrementLimitReachedCount() { return limitReachedCount.incrementAndGet(); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/NodeDuressSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/NodeDuressSettings.java new file mode 100644 index 0000000000000..09c1e4fcef46c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/NodeDuressSettings.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; + +/** + * Defines the settings for a node to be considered in duress. + * + * @opensearch.internal + */ +public class NodeDuressSettings { + private static class Defaults { + private static final int NUM_SUCCESSIVE_BREACHES = 3; + private static final double CPU_THRESHOLD = 0.9; + private static final double HEAP_THRESHOLD = 0.7; + } + + /** + * Defines the number of successive limit breaches after the node is marked "in duress". + */ + private volatile int numSuccessiveBreaches; + public static final Setting SETTING_NUM_SUCCESSIVE_BREACHES = Setting.intSetting( + "search_backpressure.node_duress.num_successive_breaches", + Defaults.NUM_SUCCESSIVE_BREACHES, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the CPU usage threshold (in percentage) for a node to be considered "in duress". + */ + private volatile double cpuThreshold; + public static final Setting SETTING_CPU_THRESHOLD = Setting.doubleSetting( + "search_backpressure.node_duress.cpu_threshold", + Defaults.CPU_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage threshold (in percentage) for a node to be considered "in duress". + */ + private volatile double heapThreshold; + public static final Setting SETTING_HEAP_THRESHOLD = Setting.doubleSetting( + "search_backpressure.node_duress.heap_threshold", + Defaults.HEAP_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public NodeDuressSettings(Settings settings, ClusterSettings clusterSettings) { + numSuccessiveBreaches = SETTING_NUM_SUCCESSIVE_BREACHES.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_NUM_SUCCESSIVE_BREACHES, this::setNumSuccessiveBreaches); + + cpuThreshold = SETTING_CPU_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_THRESHOLD, this::setCpuThreshold); + + heapThreshold = SETTING_HEAP_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_THRESHOLD, this::setHeapThreshold); + } + + public int getNumSuccessiveBreaches() { + return numSuccessiveBreaches; + } + + private void setNumSuccessiveBreaches(int numSuccessiveBreaches) { + this.numSuccessiveBreaches = numSuccessiveBreaches; + } + + public double getCpuThreshold() { + return cpuThreshold; + } + + private void setCpuThreshold(double cpuThreshold) { + this.cpuThreshold = cpuThreshold; + } + + public double getHeapThreshold() { + return heapThreshold; + } + + private void setHeapThreshold(double heapThreshold) { + this.heapThreshold = heapThreshold; + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java new file mode 100644 index 0000000000000..4834808d768f1 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java @@ -0,0 +1,213 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.settings; + +import org.apache.lucene.util.SetOnce; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +import java.util.concurrent.TimeUnit; + +/** + * Settings related to search backpressure and cancellation of in-flight requests. + * + * @opensearch.internal + */ +public class SearchBackpressureSettings { + private static class Defaults { + private static final long INTERVAL = 1000; + + private static final boolean ENABLED = true; + private static final boolean ENFORCED = false; + + private static final double CANCELLATION_RATIO = 0.1; + private static final double CANCELLATION_RATE = 0.003; + private static final double CANCELLATION_BURST = 10.0; + } + + /** + * Defines the interval (in millis) at which the SearchBackpressureService monitors and cancels tasks. + */ + private final TimeValue interval; + public static final Setting SETTING_INTERVAL = Setting.longSetting( + "search_backpressure.interval", + Defaults.INTERVAL, + 1, + Setting.Property.NodeScope + ); + + /** + * Defines whether search backpressure is enabled or not. + */ + private volatile boolean enabled; + public static final Setting SETTING_ENABLED = Setting.boolSetting( + "search_backpressure.enabled", + Defaults.ENABLED, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines whether in-flight cancellation of tasks is enabled or not. + */ + private volatile boolean enforced; + public static final Setting SETTING_ENFORCED = Setting.boolSetting( + "search_backpressure.enforced", + Defaults.ENFORCED, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the percentage of tasks to cancel relative to the number of successful task completions. + * In other words, it is the number of tokens added to the bucket on each successful task completion. + */ + private volatile double cancellationRatio; + public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( + "search_backpressure.cancellation_ratio", + Defaults.CANCELLATION_RATIO, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the number of tasks to cancel per unit time (in millis). + * In other words, it is the number of tokens added to the bucket each millisecond. + */ + private volatile double cancellationRate; + public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( + "search_backpressure.cancellation_rate", + Defaults.CANCELLATION_RATE, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the maximum number of tasks that can be cancelled before being rate-limited. + */ + private volatile double cancellationBurst; + public static final Setting SETTING_CANCELLATION_BURST = Setting.doubleSetting( + "search_backpressure.cancellation_burst", + Defaults.CANCELLATION_BURST, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Callback listeners. + */ + public interface Listener { + void onCancellationRatioChanged(); + + void onCancellationRateChanged(); + + void onCancellationBurstChanged(); + } + + private final SetOnce listener = new SetOnce<>(); + private final NodeDuressSettings nodeDuressSettings; + private final SearchShardTaskSettings searchShardTaskSettings; + + public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) { + this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); + this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings); + + interval = new TimeValue(SETTING_INTERVAL.get(settings)); + + enabled = SETTING_ENABLED.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_ENABLED, this::setEnabled); + + enforced = SETTING_ENFORCED.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_ENFORCED, this::setEnforced); + + cancellationRatio = SETTING_CANCELLATION_RATIO.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, this::setCancellationRatio); + + cancellationRate = SETTING_CANCELLATION_RATE.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATE, this::setCancellationRate); + + cancellationBurst = SETTING_CANCELLATION_BURST.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst); + } + + public void setListener(Listener listener) { + this.listener.set(listener); + } + + public NodeDuressSettings getNodeDuressSettings() { + return nodeDuressSettings; + } + + public SearchShardTaskSettings getSearchShardTaskSettings() { + return searchShardTaskSettings; + } + + public TimeValue getInterval() { + return interval; + } + + public boolean isEnabled() { + return enabled; + } + + private void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean isEnforced() { + return enforced; + } + + private void setEnforced(boolean enforced) { + this.enforced = enforced; + } + + public double getCancellationRatio() { + return cancellationRatio; + } + + private void setCancellationRatio(double cancellationRatio) { + this.cancellationRatio = cancellationRatio; + if (listener.get() != null) { + listener.get().onCancellationRatioChanged(); + } + } + + public double getCancellationRate() { + return cancellationRate; + } + + public double getCancellationRateNanos() { + return getCancellationRate() / TimeUnit.MILLISECONDS.toNanos(1); // rate per nanoseconds + } + + private void setCancellationRate(double cancellationRate) { + this.cancellationRate = cancellationRate; + if (listener.get() != null) { + listener.get().onCancellationRateChanged(); + } + } + + public double getCancellationBurst() { + return cancellationBurst; + } + + private void setCancellationBurst(double cancellationBurst) { + this.cancellationBurst = cancellationBurst; + if (listener.get() != null) { + listener.get().onCancellationBurstChanged(); + } + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java new file mode 100644 index 0000000000000..1126dad78f554 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.monitor.jvm.JvmStats; + +/** + * Defines the settings related to the cancellation of SearchShardTasks. + * + * @opensearch.internal + */ +public class SearchShardTaskSettings { + private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); + + private static class Defaults { + private static final double TOTAL_HEAP_THRESHOLD = 0.05; + private static final double HEAP_THRESHOLD = 0.005; + private static final double HEAP_VARIANCE_THRESHOLD = 2.0; + private static final long CPU_TIME_THRESHOLD = 15; + private static final long ELAPSED_TIME_THRESHOLD = 30000; + } + + /** + * Defines the heap usage threshold (in percentage) for the sum of heap usages across all search shard tasks + * before in-flight cancellation is applied. + */ + private volatile double totalHeapThreshold; + public static final Setting SETTING_TOTAL_HEAP_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.total_heap_threshold", + Defaults.TOTAL_HEAP_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. + */ + private volatile double heapThreshold; + public static final Setting SETTING_HEAP_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_threshold", + Defaults.HEAP_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage variance for an individual task before it is considered for cancellation. + * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. + */ + private volatile double heapVarianceThreshold; + public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_variance", + Defaults.HEAP_VARIANCE_THRESHOLD, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. + */ + private volatile long cpuTimeThreshold; + public static final Setting SETTING_CPU_TIME_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.cpu_time_threshold", + Defaults.CPU_TIME_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. + */ + private volatile long elapsedTimeThreshold; + public static final Setting SETTING_ELAPSED_TIME_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.elapsed_time_threshold", + Defaults.ELAPSED_TIME_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public SearchShardTaskSettings(Settings settings, ClusterSettings clusterSettings) { + totalHeapThreshold = SETTING_TOTAL_HEAP_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_THRESHOLD, this::setTotalHeapThreshold); + + heapThreshold = SETTING_HEAP_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_THRESHOLD, this::setHeapThreshold); + + heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); + + cpuTimeThreshold = SETTING_CPU_TIME_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_THRESHOLD, this::setCpuTimeThreshold); + + elapsedTimeThreshold = SETTING_ELAPSED_TIME_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_THRESHOLD, this::setElapsedTimeThreshold); + } + + public double getTotalHeapThreshold() { + return totalHeapThreshold; + } + + public long getTotalHeapThresholdBytes() { + return (long) (HEAP_SIZE_BYTES * getTotalHeapThreshold()); + } + + private void setTotalHeapThreshold(double totalHeapThreshold) { + this.totalHeapThreshold = totalHeapThreshold; + } + + public double getHeapThreshold() { + return heapThreshold; + } + + public long getHeapThresholdBytes() { + return (long) (HEAP_SIZE_BYTES * getHeapThreshold()); + } + + private void setHeapThreshold(double heapThreshold) { + this.heapThreshold = heapThreshold; + } + + public double getHeapVarianceThreshold() { + return heapVarianceThreshold; + } + + private void setHeapVarianceThreshold(double heapVarianceThreshold) { + this.heapVarianceThreshold = heapVarianceThreshold; + } + + public long getCpuTimeThreshold() { + return cpuTimeThreshold; + } + + private void setCpuTimeThreshold(long cpuTimeThreshold) { + this.cpuTimeThreshold = cpuTimeThreshold; + } + + public long getElapsedTimeThreshold() { + return elapsedTimeThreshold; + } + + private void setElapsedTimeThreshold(long elapsedTimeThreshold) { + this.elapsedTimeThreshold = elapsedTimeThreshold; + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/package-info.java b/server/src/main/java/org/opensearch/search/backpressure/settings/package-info.java new file mode 100644 index 0000000000000..a853a139b096b --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains settings for search backpressure. + */ +package org.opensearch.search.backpressure.settings; diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java similarity index 79% rename from server/src/main/java/org/opensearch/search/backpressure/trackers/NodeResourceUsageTracker.java rename to server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java index 3d36aa83dee58..8e35c724a8fef 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java @@ -8,16 +8,16 @@ package org.opensearch.search.backpressure.trackers; -import org.opensearch.common.Streak; +import org.opensearch.common.util.Streak; import java.util.function.BooleanSupplier; /** - * NodeResourceUsageTracker is used to check if the node is in duress. + * NodeDuressTracker is used to check if the node is in duress. * * @opensearch.internal */ -public class NodeResourceUsageTracker { +public class NodeDuressTracker { /** * Tracks the number of consecutive breaches. */ @@ -28,7 +28,7 @@ public class NodeResourceUsageTracker { */ private final BooleanSupplier isNodeInDuress; - public NodeResourceUsageTracker(BooleanSupplier isNodeInDuress) { + public NodeDuressTracker(BooleanSupplier isNodeInDuress) { this.isNodeInDuress = isNodeInDuress; } diff --git a/server/src/test/java/org/opensearch/common/StreakTests.java b/server/src/test/java/org/opensearch/common/StreakTests.java index 57009b2a09b03..80080ad3e4027 100644 --- a/server/src/test/java/org/opensearch/common/StreakTests.java +++ b/server/src/test/java/org/opensearch/common/StreakTests.java @@ -8,6 +8,7 @@ package org.opensearch.common; +import org.opensearch.common.util.Streak; import org.opensearch.test.OpenSearchTestCase; public class StreakTests extends OpenSearchTestCase { @@ -18,7 +19,7 @@ public void testStreak() { // Streak starts with zero. assertEquals(0, streak.length()); - // Streak increases on consecutive successful events. + // Streak increases on successive successful events. streak.record(true); assertEquals(1, streak.length()); streak.record(true); diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index 841b9d1596174..ac5a8229718ba 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -11,7 +11,9 @@ import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.search.backpressure.trackers.NodeResourceUsageTracker; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.trackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; @@ -47,8 +49,8 @@ public void testIsNodeInDuress() { AtomicReference cpuUsage = new AtomicReference<>(); AtomicReference heapUsage = new AtomicReference<>(); - NodeResourceUsageTracker cpuUsageTracker = new NodeResourceUsageTracker(() -> cpuUsage.get() >= 0.5); - NodeResourceUsageTracker heapUsageTracker = new NodeResourceUsageTracker(() -> heapUsage.get() >= 0.5); + NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5); + NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5); SearchBackpressureSettings settings = new SearchBackpressureSettings( Settings.EMPTY, @@ -118,7 +120,7 @@ public void testInFlightCancellation() { ThreadPool mockThreadPool = mock(ThreadPool.class); AtomicLong mockTime = new AtomicLong(0); LongSupplier mockTimeNanosSupplier = mockTime::get; - NodeResourceUsageTracker mockNodeResourceUsageTracker = new NodeResourceUsageTracker(() -> true); + NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true); TaskResourceUsageTracker mockTaskResourceUsageTracker = new TaskResourceUsageTracker() { @Override @@ -157,17 +159,19 @@ public Optional cancellationReason(Task task) { mockTaskResourceTrackingService, mockThreadPool, mockTimeNanosSupplier, - List.of(mockNodeResourceUsageTracker), + List.of(mockNodeDuressTracker), List.of(mockTaskResourceUsageTracker) ); // Run two iterations so that node is marked 'in duress' from the third iteration onwards. - service.run(); - service.run(); + service.doRun(); + service.doRun(); // Mocking 'settings' with predictable searchHeapThresholdBytes so that cancellation logic doesn't get skipped. long taskHeapUsageBytes = 500; - when(settings.getSearchHeapThresholdBytes()).thenReturn(taskHeapUsageBytes); + SearchShardTaskSettings shardTaskSettings = mock(SearchShardTaskSettings.class); + when(shardTaskSettings.getTotalHeapThresholdBytes()).thenReturn(taskHeapUsageBytes); + when(settings.getSearchShardTaskSettings()).thenReturn(shardTaskSettings); // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). Map activeTasks = new HashMap<>(); @@ -181,12 +185,12 @@ public Optional cancellationReason(Task task) { doReturn(activeTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); // There are 15 tasks eligible for cancellation but only 10 will be cancelled (burst limit). - service.run(); + service.doRun(); assertEquals(10, service.getState().getCancellationCount()); assertEquals(1, service.getState().getLimitReachedCount()); // If the clock or completed task count haven't made sufficient progress, we'll continue to be rate-limited. - service.run(); + service.doRun(); assertEquals(10, service.getState().getCancellationCount()); assertEquals(2, service.getState().getLimitReachedCount()); @@ -195,14 +199,14 @@ public Optional cancellationReason(Task task) { for (int i = 0; i < 20; i++) { service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); } - service.run(); + service.doRun(); assertEquals(12, service.getState().getCancellationCount()); assertEquals(3, service.getState().getLimitReachedCount()); // Fast-forward the clock by one second to replenish some tokens. // This will add 3 tokens (time delta * rate) to 'rateLimitPerTime'. mockTime.addAndGet(TimeUnit.SECONDS.toNanos(1)); - service.run(); + service.doRun(); assertEquals(15, service.getState().getCancellationCount()); assertEquals(3, service.getState().getLimitReachedCount()); // no more tasks to cancel; limit not reached } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeResourceUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java similarity index 79% rename from server/src/test/java/org/opensearch/search/backpressure/trackers/NodeResourceUsageTrackerTests.java rename to server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java index 96715c9f01aac..472ba95566523 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeResourceUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java @@ -12,11 +12,11 @@ import java.util.concurrent.atomic.AtomicReference; -public class NodeResourceUsageTrackerTests extends OpenSearchTestCase { +public class NodeDuressTrackerTests extends OpenSearchTestCase { - public void testNodeResourceUsageTracker() { + public void testNodeDuressTracker() { AtomicReference cpuUsage = new AtomicReference<>(0.0); - NodeResourceUsageTracker tracker = new NodeResourceUsageTracker(() -> cpuUsage.get() >= 0.5); + NodeDuressTracker tracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5); // Node not in duress. assertEquals(0, tracker.check());