Skip to content

Commit

Permalink
Add version check during task submission for bwc for static threshold…
Browse files Browse the repository at this point in the history
… setting

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Dec 26, 2022
1 parent 35605c6 commit e8c99a3
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix case sensitivity for wildcard queries ([#5462](https://github.com/opensearch-project/OpenSearch/pull/5462))
- Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Fix backward compatibility for static cluster manager throttling threshold setting ([]())
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.4...HEAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,7 @@ private enum OpenSearchExceptionHandle {
ClusterManagerThrottlingException.class,
ClusterManagerThrottlingException::new,
165,
Version.V_2_4_0
Version.V_2_5_0
),
SNAPSHOT_IN_USE_DELETION_EXCEPTION(
SnapshotInUseDeletionException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -50,6 +51,9 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final ConcurrentMap<String, Long> tasksCount;
private final ConcurrentMap<String, Long> tasksThreshold;
private final Supplier<Version> minNodeVersionSupplier;
// once all nodes are higher than 2.5.0 version, then only it will start throttling.
// Needed for static threshold settings.
private AtomicBoolean startThrottling = new AtomicBoolean();

public ClusterManagerTaskThrottler(
final Settings settings,
Expand Down Expand Up @@ -168,7 +172,7 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
int size = tasks.size();
if (clusterManagerThrottlingKey.isThrottlingEnabled()) {
Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey());
if (threshold != null && (count + size > threshold)) {
if (threshold != null && checkForThrottling(threshold, count, size)) {
clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
Expand All @@ -185,6 +189,27 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
});
}

/**
* If throttling thresholds are set via static setting, it will update the threshold map.
* It may start throwing throttling exception to older nodes in cluster.
*
* On submission of first task, it validates if all nodes are of 2.5.0 or not and
* update startThrottling flag.
* Once the flag is set it will start throttling, and won't perform check for next tasks.
*/
private boolean checkForThrottling(Long threshold, Long count, int size) {
if(!startThrottling.get()) {
if(minNodeVersionSupplier.get().compareTo(Version.V_2_5_0) >= 0) {
startThrottling.compareAndSet(false, true);
logger.info("Starting cluster manager throttling as all nodes are higher than or equal to 2.5.0");
} else {
logger.info("Skipping cluster manager throttling as node older than 2.5.0 is present in cluster");
return false;
}
}
return count + size > threshold;
}

@Override
public void onSubmitFailure(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,48 @@ public void testThrottlingForDisabledThrottlingTask() {
assertEquals(0L, throttlingStats.getThrottlingCount(taskKey));
}

public void testThrottlingForStaticVersionCheck() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

// setting threshold in initial settings
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
initialSettings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true);

// verifying adding more tasks then threshold passes
throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, put_mapping_threshold_value + 5));
assertEquals(0L, throttlingStats.getThrottlingCount("put-mapping"));

// Removing older version node from cluster
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode })
);

// adding more tasks, these tasks should be throttled
// As queue already have more tasks than threshold from previous call.
assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, 3))
);
assertEquals(3L, throttlingStats.getThrottlingCount("put-mapping"));
}

public void testThrottling() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
Expand Down

0 comments on commit e8c99a3

Please sign in to comment.