Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cluster manager throttling retry delay as dynamic setting #6998

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))
- Add retry delay as dynamic setting for cluster maanger throttling. ([#6998](https://github.com/opensearch-project/OpenSearch/pull/6998))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
* @param maxDelayForRetry MaxDelay that can be returned from backoff policy
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
public static BackoffPolicy exponentialEqualJitterBackoff(long baseDelay, long maxDelayForRetry) {
return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
}

Expand Down Expand Up @@ -223,10 +223,10 @@ public TimeValue next() {
}

private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
private final int maxDelayForRetry;
private final int baseDelay;
private final long maxDelayForRetry;
private final long baseDelay;

private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
private ExponentialEqualJitterBackoff(long baseDelay, long maxDelayForRetry) {
this.maxDelayForRetry = maxDelayForRetry;
this.baseDelay = baseDelay;
}
Expand All @@ -252,11 +252,11 @@ private static class ExponentialEqualJitterBackoffIterator implements Iterator<T
* Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
* and not increase the delay.
*/
private final int maxDelayForRetry;
private final int baseDelay;
private final long maxDelayForRetry;
private final long baseDelay;
private int retriesAttempted;

private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
private ExponentialEqualJitterBackoffIterator(long baseDelay, long maxDelayForRetry) {
this.baseDelay = baseDelay;
this.maxDelayForRetry = maxDelayForRetry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -178,17 +179,18 @@ class AsyncSingleAction extends RetryableAction {
private ClusterStateObserver observer;
private final long startTime;
private final Task task;
private static final int BASE_DELAY_MILLIS = 10;
private static final int MAX_DELAY_MILLIS = 5000;

AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
super(
logger,
threadPool,
TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
ClusterManagerTaskThrottler.getBaseDelayForRetry(),
request.clusterManagerNodeTimeout,
listener,
BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
BackoffPolicy.exponentialEqualJitterBackoff(
ClusterManagerTaskThrottler.getBaseDelayForRetry().millis(),
ClusterManagerTaskThrottler.getMaxDelayForRetry().millis()
),
ThreadPool.Names.SAME
);
this.task = task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashSet;
import java.util.List;
Expand All @@ -37,12 +38,31 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);
public static final ThrottlingKey DEFAULT_THROTTLING_KEY = new ThrottlingKey("default-task-key", false);

// default value for base delay is 5s
static volatile TimeValue baseDelay = TimeValue.timeValueSeconds(5);
// default values for max delay is 30s
static volatile TimeValue maxDelay = TimeValue.timeValueSeconds(30);

public static final Setting<Settings> THRESHOLD_SETTINGS = Setting.groupSetting(
"cluster_manager.throttling.thresholds.",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> BASE_DELAY_SETTINGS = Setting.timeSetting(
"cluster_manager.throttling.retry.base.delay",
baseDelay,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_DELAY_SETTINGS = Setting.timeSetting(
"cluster_manager.throttling.retry.max.delay",
maxDelay,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

protected Map<String, ThrottlingKey> THROTTLING_TASK_KEYS = new ConcurrentHashMap<>();

private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
Expand All @@ -68,8 +88,28 @@ public ClusterManagerTaskThrottler(
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
clusterSettings.addSettingsUpdateConsumer(BASE_DELAY_SETTINGS, this::updateBaseDelay);
clusterSettings.addSettingsUpdateConsumer(MAX_DELAY_SETTINGS, this::updateMaxDelay);
// Required for setting values as per current settings during node bootstrap
updateSetting(THRESHOLD_SETTINGS.get(settings));
updateBaseDelay(BASE_DELAY_SETTINGS.get(settings));
updateMaxDelay(MAX_DELAY_SETTINGS.get(settings));
}

void updateBaseDelay(TimeValue newBaseValue) {
baseDelay = newBaseValue;
}

void updateMaxDelay(TimeValue newMaxValue) {
maxDelay = newMaxValue;
}

public static TimeValue getBaseDelayForRetry() {
return baseDelay;
}

public static TimeValue getMaxDelayForRetry() {
return maxDelay;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,8 @@ public void apply(Settings value, Settings current, Settings previous) {
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,
ClusterManagerTaskThrottler.THRESHOLD_SETTINGS,

ClusterManagerTaskThrottler.BASE_DELAY_SETTINGS,
ClusterManagerTaskThrottler.MAX_DELAY_SETTINGS,
// Settings related to search backpressure
SearchBackpressureSettings.SETTING_MODE,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ public void testThrottlingRetryRemoteMaster() throws ExecutionException, Interru
assertFalse(listener.isDone());

// waiting for retry to trigger
Thread.sleep(100);
Thread.sleep(10000);

// Retry for above throttling exception
capturedRequests = transport.getCapturedRequestsAndClear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,12 @@ public void testSettingsOnBootstrap() {

ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
int baseDelay = randomIntBetween(1, 10);
int maxDelay = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.put("cluster_manager.throttling.retry.base.delay", baseDelay + "s")
.put("cluster_manager.throttling.retry.max.delay", maxDelay + "s")
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
Expand All @@ -176,6 +180,32 @@ public void testSettingsOnBootstrap() {

// assert that limit is applied on throttler
assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit("put-mapping").intValue());
// assert that delay setting is applied on throttler
assertEquals(baseDelay, ClusterManagerTaskThrottler.getBaseDelayForRetry().seconds());
assertEquals(maxDelay, ClusterManagerTaskThrottler.getMaxDelayForRetry().seconds());
}

public void testUpdateRetryDelaySetting() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
return clusterService.getMasterService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());

// verify defaults
assertEquals(ClusterManagerTaskThrottler.baseDelay, ClusterManagerTaskThrottler.getBaseDelayForRetry());
assertEquals(ClusterManagerTaskThrottler.maxDelay, ClusterManagerTaskThrottler.getMaxDelayForRetry());

// verify update base delay
int baseDelay = randomIntBetween(1, 10);
Settings newSettings = Settings.builder().put("cluster_manager.throttling.retry.base.delay", baseDelay + "s").build();
clusterSettings.applySettings(newSettings);
assertEquals(baseDelay, ClusterManagerTaskThrottler.getBaseDelayForRetry().seconds());

// verify update max delay
int maxDelay = randomIntBetween(1, 10);
newSettings = Settings.builder().put("cluster_manager.throttling.retry.max.delay", maxDelay + "s").build();
clusterSettings.applySettings(newSettings);
assertEquals(maxDelay, ClusterManagerTaskThrottler.getMaxDelayForRetry().seconds());
}

public void testValidateSettingsForUnknownTask() {
Expand Down