Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add version check during task submission for bwc for static threshold setting #5647

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add version check during task submission for bwc for static threshold…
… setting (#5633)

* Add version check during task submission for bwc for static threshold setting

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
(cherry picked from commit ea1cc9d)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Dec 28, 2022
commit 80d06b93bc8a3e7a6d33ed975af6d632eea58f33
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support OpenSSL Provider with default Netty allocator ([#5499](https://github.com/opensearch-project/OpenSearch/pull/5499))
- Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Fix backward compatibility for static cluster manager throttling threshold setting ([#5633](https://github.com/opensearch-project/OpenSearch/pull/5633))
### Security

[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.4...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,7 @@ private enum OpenSearchExceptionHandle {
ClusterManagerThrottlingException.class,
ClusterManagerThrottlingException::new,
165,
Version.V_2_4_0
Version.V_2_5_0
),
SNAPSHOT_IN_USE_DELETION_EXCEPTION(
SnapshotInUseDeletionException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -51,6 +52,11 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final ConcurrentMap<String, Long> tasksThreshold;
private final Supplier<Version> minNodeVersionSupplier;

// Once all nodes are greater than or equal 2.5.0 version, then only it will start throttling.
// During upgrade as well, it will wait for all older version nodes to leave the cluster before starting throttling.
// This is needed specifically for static setting to enable throttling.
private AtomicBoolean startThrottling = new AtomicBoolean();

public ClusterManagerTaskThrottler(
final Settings settings,
final ClusterSettings clusterSettings,
Expand Down Expand Up @@ -168,7 +174,7 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
int size = tasks.size();
if (clusterManagerThrottlingKey.isThrottlingEnabled()) {
Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey());
if (threshold != null && (count + size > threshold)) {
if (threshold != null && shouldThrottle(threshold, count, size)) {
clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
Expand All @@ -185,6 +191,28 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
});
}

/**
* If throttling thresholds are set via static setting, it will update the threshold map.
* It may start throwing throttling exception to older nodes in cluster.
* Older version nodes will not be equipped to handle the throttling exception and
* this may result in unexpected behavior where internal tasks would start failing without any retries.
*
* For every task submission request, it will validate if nodes version is greater or equal to 2.5.0 and set the startThrottling flag.
* Once the startThrottling flag is set, it will not perform check for next set of tasks.
*/
private boolean shouldThrottle(Long threshold, Long count, int size) {
if (!startThrottling.get()) {
if (minNodeVersionSupplier.get().compareTo(Version.V_2_5_0) >= 0) {
startThrottling.compareAndSet(false, true);
logger.info("Starting cluster manager throttling as all nodes are higher than or equal to 2.5.0");
} else {
logger.info("Skipping cluster manager throttling as at least one node < 2.5.0 is present in cluster");
return false;
}
}
return count + size > threshold;
}

@Override
public void onSubmitFailure(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,48 @@ public void testThrottlingForDisabledThrottlingTask() {
assertEquals(0L, throttlingStats.getThrottlingCount(taskKey));
}

public void testThrottlingForInitialStaticSettingAndVersionCheck() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

// setting threshold in initial settings
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
initialSettings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true);

// verifying adding more tasks then threshold passes
throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, put_mapping_threshold_value + 5));
assertEquals(0L, throttlingStats.getThrottlingCount("put-mapping"));

// Removing older version node from cluster
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode })
);

// adding more tasks, these tasks should be throttled
// As queue already have more tasks than threshold from previous call.
assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, 3))
);
assertEquals(3L, throttlingStats.getThrottlingCount("put-mapping"));
}

public void testThrottling() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
Expand Down