Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] In-flight cancellation of SearchShardTask based on resource consumption #5039

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Update GeoGrid base class access modifier to support extensibility ([#4921](https://github.com/opensearch-project/OpenSearch/pull/4921))
- Build no-jdk distributions as part of release build ([#4902](https://github.com/opensearch-project/OpenSearch/pull/4902))
- Use getParameterCount instead of getParameterTypes ([#4821](https://github.com/opensearch-project/OpenSearch/pull/4821))
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))
- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805))
- Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932))

### Dependencies
- Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0
Expand Down Expand Up @@ -135,4 +138,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Security

[Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand Down Expand Up @@ -119,6 +120,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private ShardIndexingPressureStats shardIndexingPressureStats;

@Nullable
private SearchBackpressureStats searchBackpressureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -156,6 +160,11 @@ public NodeStats(StreamInput in) throws IOException {
shardIndexingPressureStats = null;
}

if (in.getVersion().onOrAfter(Version.V_2_4_0)) {
searchBackpressureStats = in.readOptionalWriteable(SearchBackpressureStats::new);
} else {
searchBackpressureStats = null;
}
}

public NodeStats(
Expand All @@ -176,7 +185,8 @@ public NodeStats(
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -196,6 +206,7 @@ public NodeStats(
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
this.searchBackpressureStats = searchBackpressureStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -305,6 +316,11 @@ public ShardIndexingPressureStats getShardIndexingPressureStats() {
return shardIndexingPressureStats;
}

@Nullable
public SearchBackpressureStats getSearchBackpressureStats() {
return searchBackpressureStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -336,6 +352,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeOptionalWriteable(shardIndexingPressureStats);
}
if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
out.writeOptionalWriteable(searchBackpressureStats);
}
}

@Override
Expand Down Expand Up @@ -408,6 +427,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getShardIndexingPressureStats() != null) {
getShardIndexingPressureStats().toXContent(builder, params);
}
if (getSearchBackpressureStats() != null) {
getSearchBackpressureStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public enum Metric {
ADAPTIVE_SELECTION("adaptive_selection"),
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure");
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics)
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import org.opensearch.script.ScriptMetadata;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;

import java.util.ArrayList;
Expand Down Expand Up @@ -402,7 +401,6 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskResourceTrackingService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -582,7 +588,22 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_MODE,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD,
ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

/**
* MovingAverage is used to calculate the moving average of last 'n' observations.
*
* @opensearch.internal
*/
public class MovingAverage {
private final int windowSize;
private final long[] observations;

private long count = 0;
private long sum = 0;
private double average = 0;

public MovingAverage(int windowSize) {
if (windowSize <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}

this.windowSize = windowSize;
this.observations = new long[windowSize];
}

/**
* Records a new observation and evicts the n-th last observation.
*/
public synchronized double record(long value) {
long delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = (double) sum / Math.min(count, observations.length);
return average;
}

public double getAverage() {
return average;
}

public long getCount() {
return count;
}

public boolean isReady() {
return count >= windowSize;
}
}
33 changes: 33 additions & 0 deletions server/src/main/java/org/opensearch/common/util/Streak.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Streak is a data structure that keeps track of the number of successive successful events.
*
* @opensearch.internal
*/
public class Streak {
private final AtomicInteger successiveSuccessfulEvents = new AtomicInteger();

public int record(boolean isSuccessful) {
if (isSuccessful) {
return successiveSuccessfulEvents.incrementAndGet();
} else {
successiveSuccessfulEvents.set(0);
return 0;
}
}

public int length() {
return successiveSuccessfulEvents.get();
}
}
124 changes: 124 additions & 0 deletions server/src/main/java/org/opensearch/common/util/TokenBucket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

/**
* TokenBucket is used to limit the number of operations at a constant rate while allowing for short bursts.
*
* @opensearch.internal
*/
public class TokenBucket {
/**
* Defines a monotonically increasing counter.
*
* Usage examples:
* 1. clock = System::nanoTime can be used to perform rate-limiting per unit time
* 2. clock = AtomicLong::get can be used to perform rate-limiting per unit number of operations
*/
private final LongSupplier clock;

/**
* Defines the number of tokens added to the bucket per clock cycle.
*/
private final double rate;

/**
* Defines the capacity and the maximum number of operations that can be performed per clock cycle before
* the bucket runs out of tokens.
*/
private final double burst;

/**
* Defines the current state of the token bucket.
*/
private final AtomicReference<State> state;

public TokenBucket(LongSupplier clock, double rate, double burst) {
this(clock, rate, burst, burst);
}

public TokenBucket(LongSupplier clock, double rate, double burst, double initialTokens) {
if (rate <= 0.0) {
throw new IllegalArgumentException("rate must be greater than zero");
}

if (burst <= 0.0) {
throw new IllegalArgumentException("burst must be greater than zero");
}

this.clock = clock;
this.rate = rate;
this.burst = burst;
this.state = new AtomicReference<>(new State(Math.min(initialTokens, burst), clock.getAsLong()));
}

/**
* If there are enough tokens in the bucket, it requests/deducts 'n' tokens and returns true.
* Otherwise, returns false and leaves the bucket untouched.
*/
public boolean request(double n) {
if (n <= 0) {
throw new IllegalArgumentException("requested tokens must be greater than zero");
}

// Refill tokens
State currentState, updatedState;
do {
currentState = state.get();
long now = clock.getAsLong();
double incr = (now - currentState.lastRefilledAt) * rate;
updatedState = new State(Math.min(currentState.tokens + incr, burst), now);
} while (state.compareAndSet(currentState, updatedState) == false);

// Deduct tokens
do {
currentState = state.get();
if (currentState.tokens < n) {
return false;
}
updatedState = new State(currentState.tokens - n, currentState.lastRefilledAt);
} while (state.compareAndSet(currentState, updatedState) == false);

return true;
}

public boolean request() {
return request(1.0);
}

/**
* Represents an immutable token bucket state.
*/
private static class State {
final double tokens;
final long lastRefilledAt;

public State(double tokens, long lastRefilledAt) {
this.tokens = tokens;
this.lastRefilledAt = lastRefilledAt;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
State state = (State) o;
return Double.compare(state.tokens, tokens) == 0 && lastRefilledAt == state.lastRefilledAt;
}

@Override
public int hashCode() {
return Objects.hash(tokens, lastRefilledAt);
}
}
}
Loading