Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Oct 10, 2022
1 parent c7e96b7 commit 69a2ff8
Show file tree
Hide file tree
Showing 16 changed files with 641 additions and 512 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.search.backpressure.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -583,19 +585,21 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_ENABLED,
SearchBackpressureSettings.SETTING_ENFORCED,
SearchBackpressureSettings.SETTING_NODE_DURESS_NUM_CONSECUTIVE_BREACHES,
SearchBackpressureSettings.SETTING_NODE_DURESS_CPU_THRESHOLD,
SearchBackpressureSettings.SETTING_NODE_DURESS_HEAP_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_HEAP_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_HEAP_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_HEAP_VARIANCE_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_CPU_TIME_THRESHOLD,
SearchBackpressureSettings.SETTING_SEARCH_TASK_ELAPSED_TIME_THRESHOLD,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@
* compatible open source license.
*/

package org.opensearch.common;
package org.opensearch.common.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Streak is a data structure that keeps track of the number of consecutive successful events.
* Streak is a data structure that keeps track of the number of successive successful events.
*
* @opensearch.internal
*/
public class Streak {
private final AtomicInteger consecutiveSuccessfulEvents = new AtomicInteger();
private final AtomicInteger successiveSuccessfulEvents = new AtomicInteger();

public int record(boolean isSuccessful) {
if (isSuccessful) {
return consecutiveSuccessfulEvents.incrementAndGet();
return successiveSuccessfulEvents.incrementAndGet();
} else {
consecutiveSuccessfulEvents.set(0);
successiveSuccessfulEvents.set(0);
return 0;
}
}

public int length() {
return consecutiveSuccessfulEvents.get();
return successiveSuccessfulEvents.get();
}
}
73 changes: 50 additions & 23 deletions server/src/main/java/org/opensearch/common/util/TokenBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.common.util;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -36,9 +38,10 @@ public class TokenBucket {
*/
private final double burst;

private double tokens;

private long lastRefilledAt;
/**
* Defines the current state of the token bucket.
*/
private final AtomicReference<State> state;

public TokenBucket(LongSupplier clock, double rate, double burst) {
this(clock, rate, burst, burst);
Expand All @@ -56,18 +59,7 @@ public TokenBucket(LongSupplier clock, double rate, double burst, double initial
this.clock = clock;
this.rate = rate;
this.burst = burst;
this.tokens = Math.min(initialTokens, burst);
this.lastRefilledAt = clock.getAsLong();
}

/**
* Refills the token bucket.
*/
private void refill() {
long now = clock.getAsLong();
double incr = (now - lastRefilledAt) * rate;
tokens = Math.min(tokens + incr, burst);
lastRefilledAt = now;
this.state = new AtomicReference<>(new State(Math.min(initialTokens, burst), clock.getAsLong()));
}

/**
Expand All @@ -79,19 +71,54 @@ public boolean request(double n) {
throw new IllegalArgumentException("requested tokens must be greater than zero");
}

synchronized (this) {
refill();

if (tokens >= n) {
tokens -= n;
return true;
// Refill tokens
State currentState, updatedState;
do {
currentState = state.get();
long now = clock.getAsLong();
double incr = (now - currentState.lastRefilledAt) * rate;
updatedState = new State(Math.min(currentState.tokens + incr, burst), now);
} while (state.compareAndSet(currentState, updatedState) == false);

// Deduct tokens
do {
currentState = state.get();
if (currentState.tokens < n) {
return false;
}
updatedState = new State(currentState.tokens - n, currentState.lastRefilledAt);
} while (state.compareAndSet(currentState, updatedState) == false);

return false;
}
return true;
}

public boolean request() {
return request(1.0);
}

/**
* Represents an immutable token bucket state.
*/
private static class State {
final double tokens;
final double lastRefilledAt;

public State(double tokens, double lastRefilledAt) {
this.tokens = tokens;
this.lastRefilledAt = lastRefilledAt;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
State state = (State) o;
return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0;
}

@Override
public int hashCode() {
return Objects.hash(tokens, lastRefilledAt);
}
}
}
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
Expand Down Expand Up @@ -1112,6 +1112,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1267,6 +1268,7 @@ private Node stop() {
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
Expand Down Expand Up @@ -1326,6 +1328,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(Discovery.class));
toClose.add(() -> stopWatch.stop().start("monitor"));
toClose.add(nodeService.getMonitorService());
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public MonitorService getMonitorService() {
return monitorService;
}

public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

@Override
public void close() throws IOException {
IOUtils.close(indicesService);
Expand Down
Loading

0 comments on commit 69a2ff8

Please sign in to comment.