Skip to content

Commit

Permalink
Make cluster manager throttling retry delay as dynamic setting
Browse files Browse the repository at this point in the history
Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Apr 5, 2023
1 parent 59e881b commit 914c3f1
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- Add retry delay as dynamic setting for cluster maanger throttling. ([#6998](https://github.com/opensearch-project/OpenSearch/pull/6998))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
* @param maxDelayForRetry MaxDelay that can be returned from backoff policy
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
public static BackoffPolicy exponentialEqualJitterBackoff(long baseDelay, long maxDelayForRetry) {
return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
}

Expand Down Expand Up @@ -223,10 +223,10 @@ public TimeValue next() {
}

private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
private final int maxDelayForRetry;
private final int baseDelay;
private final long maxDelayForRetry;
private final long baseDelay;

private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
private ExponentialEqualJitterBackoff(long baseDelay, long maxDelayForRetry) {
this.maxDelayForRetry = maxDelayForRetry;
this.baseDelay = baseDelay;
}
Expand All @@ -252,11 +252,11 @@ private static class ExponentialEqualJitterBackoffIterator implements Iterator<T
* Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
* and not increase the delay.
*/
private final int maxDelayForRetry;
private final int baseDelay;
private final long maxDelayForRetry;
private final long baseDelay;
private int retriesAttempted;

private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
private ExponentialEqualJitterBackoffIterator(long baseDelay, long maxDelayForRetry) {
this.baseDelay = baseDelay;
this.maxDelayForRetry = maxDelayForRetry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -178,17 +179,18 @@ class AsyncSingleAction extends RetryableAction {
private ClusterStateObserver observer;
private final long startTime;
private final Task task;
private static final int BASE_DELAY_MILLIS = 10;
private static final int MAX_DELAY_MILLIS = 5000;

AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
super(
logger,
threadPool,
TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
ClusterManagerTaskThrottler.getBaseDelayForRetry(),
request.clusterManagerNodeTimeout,
listener,
BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
BackoffPolicy.exponentialEqualJitterBackoff(
ClusterManagerTaskThrottler.getBaseDelayForRetry().millis(),
ClusterManagerTaskThrottler.getMaxDelayForRetry().millis()
),
ThreadPool.Names.SAME
);
this.task = task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashSet;
import java.util.List;
Expand All @@ -37,12 +38,31 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);
public static final ThrottlingKey DEFAULT_THROTTLING_KEY = new ThrottlingKey("default-task-key", false);

// default value for base delay is 5s
static volatile TimeValue baseDelay = TimeValue.timeValueSeconds(5);
// default values for max delay is 30s
static volatile TimeValue maxDelay = TimeValue.timeValueSeconds(30);

public static final Setting<Settings> THRESHOLD_SETTINGS = Setting.groupSetting(
"cluster_manager.throttling.thresholds.",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> BASE_DELAY_SETTINGS = Setting.timeSetting(
"cluster_manager.throttling.retry.base.delay",
baseDelay,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_DELAY_SETTINGS = Setting.timeSetting(
"cluster_manager.throttling.retry.max.delay",
maxDelay,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

protected Map<String, ThrottlingKey> THROTTLING_TASK_KEYS = new ConcurrentHashMap<>();

private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
Expand All @@ -68,8 +88,28 @@ public ClusterManagerTaskThrottler(
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
clusterSettings.addSettingsUpdateConsumer(BASE_DELAY_SETTINGS, this::updateBaseDelay);
clusterSettings.addSettingsUpdateConsumer(MAX_DELAY_SETTINGS, this::updateMaxDelay);
// Required for setting values as per current settings during node bootstrap
updateSetting(THRESHOLD_SETTINGS.get(settings));
updateBaseDelay(BASE_DELAY_SETTINGS.get(settings));
updateMaxDelay(MAX_DELAY_SETTINGS.get(settings));
}

void updateBaseDelay(TimeValue newBaseValue) {
baseDelay = newBaseValue;
}

void updateMaxDelay(TimeValue newMaxValue) {
maxDelay = newMaxValue;
}

public static TimeValue getBaseDelayForRetry() {
return baseDelay;
}

public static TimeValue getMaxDelayForRetry() {
return maxDelay;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,8 @@ public void apply(Settings value, Settings current, Settings previous) {
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,
ClusterManagerTaskThrottler.THRESHOLD_SETTINGS,

ClusterManagerTaskThrottler.BASE_DELAY_SETTINGS,
ClusterManagerTaskThrottler.MAX_DELAY_SETTINGS,
// Settings related to search backpressure
SearchBackpressureSettings.SETTING_MODE,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ public void testThrottlingRetryRemoteMaster() throws ExecutionException, Interru
assertFalse(listener.isDone());

// waiting for retry to trigger
Thread.sleep(100);
Thread.sleep(10000);

// Retry for above throttling exception
capturedRequests = transport.getCapturedRequestsAndClear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,12 @@ public void testSettingsOnBootstrap() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
int baseDelay = randomIntBetween(1, 10);
int maxDelay = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.put("cluster_manager.throttling.retry.base.delay", baseDelay + "s")
.put("cluster_manager.throttling.retry.max.delay", maxDelay + "s")
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
Expand All @@ -176,6 +180,32 @@ public void testSettingsOnBootstrap() {

// assert that limit is applied on throttler
assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit("put-mapping").intValue());
// assert that delay setting is applied on throttler
assertEquals(baseDelay, ClusterManagerTaskThrottler.getBaseDelayForRetry().seconds());
assertEquals(maxDelay, ClusterManagerTaskThrottler.getMaxDelayForRetry().seconds());
}

public void testUpdateRetryDelaySetting() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());

// verify defaults
assertEquals(ClusterManagerTaskThrottler.baseDelay, ClusterManagerTaskThrottler.getBaseDelayForRetry());
assertEquals(ClusterManagerTaskThrottler.maxDelay, ClusterManagerTaskThrottler.getMaxDelayForRetry());

// verify update base delay
int baseDelay = randomIntBetween(1, 10);
Settings newSettings = Settings.builder().put("cluster_manager.throttling.retry.base.delay", baseDelay + "s").build();
clusterSettings.applySettings(newSettings);
assertEquals(baseDelay, ClusterManagerTaskThrottler.getBaseDelayForRetry().seconds());

// verify update max delay
int maxDelay = randomIntBetween(1, 10);
newSettings = Settings.builder().put("cluster_manager.throttling.retry.max.delay", maxDelay + "s").build();
clusterSettings.applySettings(newSettings);
assertEquals(maxDelay, ClusterManagerTaskThrottler.getMaxDelayForRetry().seconds());
}

public void testValidateSettingsForUnknownTask() {
Expand Down

0 comments on commit 914c3f1

Please sign in to comment.