Skip to content

Commit

Permalink
Incorporated comments
Browse files Browse the repository at this point in the history
Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Dec 27, 2022
1 parent d10d550 commit 31fb3a0
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final ConcurrentMap<String, Long> tasksCount;
private final ConcurrentMap<String, Long> tasksThreshold;
private final Supplier<Version> minNodeVersionSupplier;
// Once all nodes are higher than 2.5.0 version, then only it will start throttling.
// Needed for static threshold settings.
// Once all nodes are higher than 2.5.0 version, then only it will start throttling.
// During upgrade, it will wait for all older version nodes to leave the cluster and then only start throttling.
private AtomicBoolean startThrottling = new AtomicBoolean();

public ClusterManagerTaskThrottler(
Expand Down Expand Up @@ -172,7 +173,7 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
int size = tasks.size();
if (clusterManagerThrottlingKey.isThrottlingEnabled()) {
Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey());
if (threshold != null && checkForThrottling(threshold, count, size)) {
if (threshold != null && shouldThrottle(threshold, count, size)) {
clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
Expand All @@ -192,18 +193,19 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
/**
* If throttling thresholds are set via static setting, it will update the threshold map.
* It may start throwing throttling exception to older nodes in cluster.
* Older version nodes will not be equipped to handle the throttling exception and
* this may result in unexpected behavior where internal tasks would start failing without any retries.
*
* On submission of first task, it validates if all nodes are of 2.5.0 or not and
* update startThrottling flag.
* Once the flag is set it will start throttling, and won't perform check for next tasks.
* For every task submission request, it will validate if nodes version is greater or equal to 2.5.0 and set the startThrottling flag.
* Once the startThrottling flag is set, it will not perform check for next set of tasks.
*/
private boolean checkForThrottling(Long threshold, Long count, int size) {
private boolean shouldThrottle(Long threshold, Long count, int size) {
if (!startThrottling.get()) {
if (minNodeVersionSupplier.get().compareTo(Version.V_2_5_0) >= 0) {
startThrottling.compareAndSet(false, true);
logger.info("Starting cluster manager throttling as all nodes are higher than or equal to 2.5.0");
} else {
logger.info("Skipping cluster manager throttling as node older than 2.5.0 is present in cluster");
logger.info("Skipping cluster manager throttling as at least one node < 2.5.0 is present in cluster");
return false;
}
}
Expand Down

0 comments on commit 31fb3a0

Please sign in to comment.