Skip to content

Commit

Permalink
Added in-flight cancellation of SearchShardTask based on resource con…
Browse files Browse the repository at this point in the history
…sumption

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Sep 24, 2022
1 parent 2c27dfd commit ef01ba3
Show file tree
Hide file tree
Showing 25 changed files with 1,649 additions and 7 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add BWC version 2.3.1 ([#4513](https://github.com/opensearch-project/OpenSearch/pull/4513))
- [Segment Replication] Add snapshot and restore tests for segment replication feature ([#3993](https://github.com/opensearch-project/OpenSearch/pull/3993))
- Added missing javadocs for `:example-plugins` modules ([#4540](https://github.com/opensearch-project/OpenSearch/pull/4540))
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0

Expand Down Expand Up @@ -101,4 +103,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)


[Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import org.opensearch.script.ScriptMetadata;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;

import java.util.ArrayList;
Expand Down Expand Up @@ -406,7 +405,6 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskResourceTrackingService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
Expand Down
31 changes: 31 additions & 0 deletions server/src/main/java/org/opensearch/common/Streak.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Streak is a data structure that keeps track of the number of consecutive successful events.
*/
public class Streak {
private final AtomicInteger consecutiveSuccessfulEvents = new AtomicInteger();

public int record(boolean isSuccessful) {
if (isSuccessful) {
return consecutiveSuccessfulEvents.incrementAndGet();
} else {
consecutiveSuccessfulEvents.set(0);
return 0;
}
}

public int length() {
return consecutiveSuccessfulEvents.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.search.backpressure.SearchBackpressureSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -580,7 +581,17 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,
SearchBackpressureSettings.SETTING_ENABLED,
SearchBackpressureSettings.SETTING_ENFORCED,
SearchBackpressureSettings.SETTING_NODE_DURESS_NUM_CONSECUTIVE_BREACHES,
SearchBackpressureSettings.SETTING_NODE_DURESS_CPU_THRESHOLD,
SearchBackpressureSettings.SETTING_NODE_DURESS_HEAP_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_HEAP_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_HEAP_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_HEAP_VARIANCE_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_CPU_TIME_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_ELAPSED_TIME_THRESHOLD
)
)
);
Expand Down
55 changes: 55 additions & 0 deletions server/src/main/java/org/opensearch/common/util/MovingAverage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

/**
* MovingAverage is used to calculate the moving average of last 'n' observations.
*/
public class MovingAverage {
private final int windowSize;
private final long[] observations;

private long count = 0;
private long sum = 0;
private double average = 0;

public MovingAverage(int windowSize) {
if (windowSize <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}

this.windowSize = windowSize;
this.observations = new long[windowSize];
}

/**
* Records a new observation and evicts the n-th last observation.
*/
public synchronized double record(long value) {
long delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = (double) sum / Math.min(count, observations.length);
return average;
}

public double getAverage() {
return average;
}

public long getCount() {
return count;
}

public boolean isReady() {
return count >= windowSize;
}
}
95 changes: 95 additions & 0 deletions server/src/main/java/org/opensearch/common/util/TokenBucket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.function.LongSupplier;

/**
* TokenBucket is used to limit the number of operations at a constant rate while allowing for short bursts.
*/
public class TokenBucket {
/**
* Defines a monotonically increasing counter.
*
* Usage examples:
* 1. clock = System::nanoTime can be used to perform rate-limiting per unit time
* 2. clock = AtomicLong::get can be used to perform rate-limiting per unit number of operations
*/
private final LongSupplier clock;

/**
* Defines the number of tokens added to the bucket per clock cycle.
*/
private final double rate;

/**
* Defines the capacity and the maximum number of operations that can be performed per clock cycle before
* the bucket runs out of tokens.
*/
private final double burst;

private double tokens;

private long lastRefilledAt;

public TokenBucket(LongSupplier clock, double rate, double burst) {
this(clock, rate, burst, burst);
}

public TokenBucket(LongSupplier clock, double rate, double burst, double initialTokens) {
if (rate <= 0.0) {
throw new IllegalArgumentException("rate must be greater than zero");
}

if (burst <= 0.0) {
throw new IllegalArgumentException("burst must be greater than zero");
}

this.clock = clock;
this.rate = rate;
this.burst = burst;
this.tokens = initialTokens;
this.lastRefilledAt = clock.getAsLong();
}

/**
* Refills the token bucket.
*/
private void refill() {
long now = clock.getAsLong();
double incr = (now - lastRefilledAt) * rate;
tokens = Math.min(tokens + incr, burst);
lastRefilledAt = now;
}

/**
* If there are enough tokens in the bucket, it requests/deducts 'n' tokens and returns true.
* Otherwise, returns false and leaves the bucket untouched.
*/
public boolean request(double n) {
if (n <= 0) {
throw new IllegalArgumentException("requested tokens must be greater than zero");
}

synchronized (this) {
refill();

if (tokens >= n) {
tokens -= n;
return true;
}

return false;
}
}

public boolean request() {
return request(1.0);
}
}
24 changes: 23 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.SearchBackpressureSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
Expand Down Expand Up @@ -780,6 +782,23 @@ protected Node(
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
clusterService.setIndexingPressureService(indexingPressureService);

final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
settings,
clusterService.getClusterSettings()
);

final SearchBackpressureService searchBackpressureService = new SearchBackpressureService(
searchBackpressureSettings,
taskResourceTrackingService,
threadPool
);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
Expand Down Expand Up @@ -867,7 +886,8 @@ protected Node(
responseCollectorService,
searchTransportService,
indexingPressureService,
searchModule.getValuesSourceRegistry().getUsageService()
searchModule.getValuesSourceRegistry().getUsageService(),
searchBackpressureService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -925,6 +945,8 @@ protected Node(
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.plugins.PluginsService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -81,6 +82,7 @@ public class NodeService implements Closeable {
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
private final SearchBackpressureService searchBackpressureService;

private final Discovery discovery;

Expand All @@ -101,7 +103,8 @@ public class NodeService implements Closeable {
ResponseCollectorService responseCollectorService,
SearchTransportService searchTransportService,
IndexingPressureService indexingPressureService,
AggregationUsageService aggregationUsageService
AggregationUsageService aggregationUsageService,
SearchBackpressureService searchBackpressureService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -119,6 +122,7 @@ public class NodeService implements Closeable {
this.searchTransportService = searchTransportService;
this.indexingPressureService = indexingPressureService;
this.aggregationUsageService = aggregationUsageService;
this.searchBackpressureService = searchBackpressureService;
clusterService.addStateApplier(ingestService);
}

Expand Down
Loading

0 comments on commit ef01ba3

Please sign in to comment.