Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in-flight cancellation of SearchShardTask based on resource consumption #4575

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add a new node role 'search' which is dedicated to provide search capability ([#4689](https://github.com/opensearch-project/OpenSearch/pull/4689))
- Introduce experimental searchable snapshot API ([#4680](https://github.com/opensearch-project/OpenSearch/pull/4680))
- Recommissioning of zone. REST layer support. ([#4624](https://github.com/opensearch-project/OpenSearch/pull/4604))
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import org.opensearch.script.ScriptMetadata;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;

import java.util.ArrayList;
Expand Down Expand Up @@ -420,7 +419,6 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskResourceTrackingService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -581,7 +584,22 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_ENABLED,
SearchBackpressureSettings.SETTING_ENFORCED,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

/**
* MovingAverage is used to calculate the moving average of last 'n' observations.
*
* @opensearch.internal
*/
public class MovingAverage {
private final int windowSize;
private final long[] observations;

private long count = 0;
private long sum = 0;
private double average = 0;

public MovingAverage(int windowSize) {
if (windowSize <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}

this.windowSize = windowSize;
this.observations = new long[windowSize];
}

/**
* Records a new observation and evicts the n-th last observation.
*/
public synchronized double record(long value) {
long delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = (double) sum / Math.min(count, observations.length);
return average;
}
Comment on lines +36 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a queue and see if we can avoid the synchronized block by using CAS if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually plan to benchmark CAS vs synchronized approaches before committing to either one of them, especially since the operations are pretty simple and quick to execute.

If there are major gains with CAS then it makes sense to go with it. Otherwise, I would prefer keeping it simple and readable.

Copy link
Contributor Author

@ketanv3 ketanv3 Oct 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not observe any major gains with queue + CAS approach as followed in indexing back-pressure (here). With higher thread contention, it performed slightly poorly.

# JMH version: 1.35
# VM version: JDK 17.0.3, OpenJDK 64-Bit Server VM, 17.0.3+7

Benchmark (1 thread - no contention)           Mode  Cnt   Score   Error  Units
MovingAverageBenchmark.timeMovingAverage       avgt    5  25.669 ± 1.884  ns/op
MovingAverageBenchmark.timeMovingAverageQueue  avgt    5  25.213 ± 0.383  ns/op

Benchmark (4 threads - low contention)         Mode  Cnt    Score   Error  Units
MovingAverageBenchmark.timeMovingAverage       avgt    5  217.714 ± 6.676  ns/op
MovingAverageBenchmark.timeMovingAverageQueue  avgt    5  223.088 ± 3.651  ns/op

Benchmark (16 threads - high contention)       Mode  Cnt    Score   Error   Units
MovingAverageBenchmark.timeMovingAverage       avgt    5  785.830 ± 13.446  ns/op
MovingAverageBenchmark.timeMovingAverageQueue  avgt    5  792.442 ± 64.234  ns/op

It is also worth noting that the current implementation of moving average in shard indexing back-pressure has subtle race-condition bugs. Two or more concurrent threads could reach this point and remove excessive elements from the queue leading to incorrect results, or even NPE in the worst case. Another problem is lack of causal ordering – an older thread may overwrite a new thread's average at this point.

Comment on lines +35 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use CAS here with a do-while loop

Copy link
Contributor Author

@ketanv3 ketanv3 Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using CAS backed by a queue or a ring-buffer, it may be possible to track the moving average similar to how it's done in indexing backpressure (though the current implementation has subtle race-condition bugs which I have highlighted in an above comment).

To successfully implement this, we need to ensure the last 'n' items, running sum, and count of inserted items are updated atomically; which may not be possible with CAS alone. We need to treat the entire state (even the backing queue/buffer) as immutable and create a new copy with every update (similar concept to CopyOnWriteArrayList).

Our use-case is write heavy (on task completion) with infrequent reads (on search backpressure service iteration), creating copies may be very expensive especially for larger window sizes. I'm still inclined to use the current approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark results comparing the existing approach v/s CAS backed by an immutable ring-buffer.

Using synchronized (current approach):

10 observations 100 observations 1000 observations
1 thread 14.123 ns/op 14.258 ns/op 14.544 ns/op
4 threads 368.087 ns/op 364.879 ns/op 378.378 ns/op
16 threads 1400.703 ns/op 1506.456 ns/op 1809.835 ns/op

Using compare-and-set backed by an immutable ring-buffer (implementation reference):

10 observations 100 observations 1000 observations
1 thread 27.438 ns/op 112.920 ns/op 1082.057 ns/op
4 threads 798.966 ns/op 1077.040 ns/op 4675.375 ns/op
16 threads 5820.276 ns/op 8374.579 ns/op 36474.605 ns/op

Key observations:

  • Synchronized approach doesn't have to clone the backing buffer, so, the time complexity doesn't grow proportional to the window size. This is a major drawback with the CAS approach as repeated array copy is expensive and creates a lot of GC overhead too.
  • Even with smaller window sizes, the performance of CAS becomes poorer as the number of threads (and the contention) grows.
  • Synchronized approach performed between 2–20x better for these measurements.


public double getAverage() {
return average;
}

public long getCount() {
return count;
}

public boolean isReady() {
return count >= windowSize;
}
}
33 changes: 33 additions & 0 deletions server/src/main/java/org/opensearch/common/util/Streak.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Streak is a data structure that keeps track of the number of successive successful events.
*
* @opensearch.internal
*/
public class Streak {
private final AtomicInteger successiveSuccessfulEvents = new AtomicInteger();

public int record(boolean isSuccessful) {
if (isSuccessful) {
return successiveSuccessfulEvents.incrementAndGet();
} else {
successiveSuccessfulEvents.set(0);
return 0;
}
}

public int length() {
return successiveSuccessfulEvents.get();
}
}
124 changes: 124 additions & 0 deletions server/src/main/java/org/opensearch/common/util/TokenBucket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

/**
* TokenBucket is used to limit the number of operations at a constant rate while allowing for short bursts.
*
* @opensearch.internal
*/
public class TokenBucket {
/**
* Defines a monotonically increasing counter.
*
* Usage examples:
* 1. clock = System::nanoTime can be used to perform rate-limiting per unit time
* 2. clock = AtomicLong::get can be used to perform rate-limiting per unit number of operations
*/
private final LongSupplier clock;

/**
* Defines the number of tokens added to the bucket per clock cycle.
*/
private final double rate;

/**
* Defines the capacity and the maximum number of operations that can be performed per clock cycle before
* the bucket runs out of tokens.
*/
private final double burst;

/**
* Defines the current state of the token bucket.
*/
private final AtomicReference<State> state;

public TokenBucket(LongSupplier clock, double rate, double burst) {
this(clock, rate, burst, burst);
}

public TokenBucket(LongSupplier clock, double rate, double burst, double initialTokens) {
if (rate <= 0.0) {
throw new IllegalArgumentException("rate must be greater than zero");
}

if (burst <= 0.0) {
throw new IllegalArgumentException("burst must be greater than zero");
}

this.clock = clock;
this.rate = rate;
this.burst = burst;
this.state = new AtomicReference<>(new State(Math.min(initialTokens, burst), clock.getAsLong()));
}

/**
* If there are enough tokens in the bucket, it requests/deducts 'n' tokens and returns true.
* Otherwise, returns false and leaves the bucket untouched.
*/
public boolean request(double n) {
if (n <= 0) {
throw new IllegalArgumentException("requested tokens must be greater than zero");
}

// Refill tokens
State currentState, updatedState;
do {
currentState = state.get();
long now = clock.getAsLong();
double incr = (now - currentState.lastRefilledAt) * rate;
updatedState = new State(Math.min(currentState.tokens + incr, burst), now);
} while (state.compareAndSet(currentState, updatedState) == false);

// Deduct tokens
do {
currentState = state.get();
if (currentState.tokens < n) {
return false;
}
updatedState = new State(currentState.tokens - n, currentState.lastRefilledAt);
} while (state.compareAndSet(currentState, updatedState) == false);

return true;
}

public boolean request() {
return request(1.0);
}

/**
* Represents an immutable token bucket state.
*/
private static class State {
final double tokens;
final double lastRefilledAt;

public State(double tokens, double lastRefilledAt) {
this.tokens = tokens;
this.lastRefilledAt = lastRefilledAt;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
State state = (State) o;
return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0;
}

@Override
public int hashCode() {
return Objects.hash(tokens, lastRefilledAt);
}
}
}
27 changes: 26 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
Expand Down Expand Up @@ -796,6 +798,23 @@ protected Node(
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
clusterService.setIndexingPressureService(indexingPressureService);

final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
settings,
clusterService.getClusterSettings()
);

final SearchBackpressureService searchBackpressureService = new SearchBackpressureService(
searchBackpressureSettings,
taskResourceTrackingService,
threadPool
);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
Expand Down Expand Up @@ -883,7 +902,8 @@ protected Node(
responseCollectorService,
searchTransportService,
indexingPressureService,
searchModule.getValuesSourceRegistry().getUsageService()
searchModule.getValuesSourceRegistry().getUsageService(),
searchBackpressureService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -941,6 +961,8 @@ protected Node(
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down Expand Up @@ -1104,6 +1126,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1259,6 +1282,7 @@ private Node stop() {
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
Expand Down Expand Up @@ -1318,6 +1342,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(Discovery.class));
toClose.add(() -> stopWatch.stop().start("monitor"));
toClose.add(nodeService.getMonitorService());
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.plugins.PluginsService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -81,6 +82,7 @@ public class NodeService implements Closeable {
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
private final SearchBackpressureService searchBackpressureService;

private final Discovery discovery;

Expand All @@ -101,7 +103,8 @@ public class NodeService implements Closeable {
ResponseCollectorService responseCollectorService,
SearchTransportService searchTransportService,
IndexingPressureService indexingPressureService,
AggregationUsageService aggregationUsageService
AggregationUsageService aggregationUsageService,
SearchBackpressureService searchBackpressureService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -119,6 +122,7 @@ public class NodeService implements Closeable {
this.searchTransportService = searchTransportService;
this.indexingPressureService = indexingPressureService;
this.aggregationUsageService = aggregationUsageService;
this.searchBackpressureService = searchBackpressureService;
clusterService.addStateApplier(ingestService);
}

Expand Down Expand Up @@ -203,6 +207,10 @@ public MonitorService getMonitorService() {
return monitorService;
}

public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

@Override
public void close() throws IOException {
IOUtils.close(indicesService);
Expand Down
Loading