From 7fd120edd6cf2319da3d561ff2c1f849e29dd183 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Tue, 27 Dec 2022 15:29:25 +0530 Subject: [PATCH] Incorporated comments Signed-off-by: Dhwanil Patel --- .../service/ClusterManagerTaskThrottler.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index 77b9c7778f8cf..c556a4436e17d 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -51,8 +51,9 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private final ConcurrentMap tasksCount; private final ConcurrentMap tasksThreshold; private final Supplier minNodeVersionSupplier; - // Once all nodes are higher than 2.5.0 version, then only it will start throttling. // Needed for static threshold settings. + // Once all nodes are higher than 2.5.0 version, then only it will start throttling. + // During upgrade, it will wait for all older version nodes to leave the cluster and then only start throttling. private AtomicBoolean startThrottling = new AtomicBoolean(); public ClusterManagerTaskThrottler( @@ -172,7 +173,7 @@ public void onBeginSubmit(List tasks) { int size = tasks.size(); if (clusterManagerThrottlingKey.isThrottlingEnabled()) { Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); - if (threshold != null && checkForThrottling(threshold, count, size)) { + if (threshold != null && shouldThrottle(threshold, count, size)) { clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); logger.warn( "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", @@ -192,18 +193,19 @@ public void onBeginSubmit(List tasks) { /** * If throttling thresholds are set via static setting, it will update the threshold map. * It may start throwing throttling exception to older nodes in cluster. + * Older version nodes will not be equipped to handle the throttling exception and + * this may result in unexpected behavior where internal tasks would start failing without any retries. * - * On submission of first task, it validates if all nodes are of 2.5.0 or not and - * update startThrottling flag. - * Once the flag is set it will start throttling, and won't perform check for next tasks. + * For every task submission request, it will validate if nodes version >= 2.5.0 and set the startThrottling flag. + * Once the startThrottling flag is set, it will not perform check for next set of tasks. */ - private boolean checkForThrottling(Long threshold, Long count, int size) { + private boolean shouldThrottle(Long threshold, Long count, int size) { if (!startThrottling.get()) { if (minNodeVersionSupplier.get().compareTo(Version.V_2_5_0) >= 0) { startThrottling.compareAndSet(false, true); logger.info("Starting cluster manager throttling as all nodes are higher than or equal to 2.5.0"); } else { - logger.info("Skipping cluster manager throttling as node older than 2.5.0 is present in cluster"); + logger.info("Skipping cluster manager throttling as at least one node < 2.5.0 is present in cluster"); return false; } }