From de643399a83f8dfa77901cd96e624f95aca8c736 Mon Sep 17 00:00:00 2001 From: Ketan Verma Date: Fri, 21 Oct 2022 00:33:09 +0530 Subject: [PATCH] Refactoring changes Signed-off-by: Ketan Verma --- .../SearchBackpressureService.java | 2 +- .../trackers/CpuUsageTracker.java | 11 +++----- .../trackers/ElapsedTimeTracker.java | 11 +++----- .../trackers/HeapUsageTracker.java | 8 +++--- .../trackers/TaskResourceUsageTracker.java | 4 +-- .../TaskResourceUsageTrackerType.java | 28 +++++++++++++++++++ .../SearchBackpressureServiceTests.java | 2 +- .../trackers/CpuUsageTrackerTests.java | 4 +-- .../trackers/ElapsedTimeTrackerTests.java | 4 +-- .../trackers/HeapUsageTrackerTests.java | 8 +++--- .../tasks/TaskCancellationTests.java | 2 +- 11 files changed, 53 insertions(+), 31 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index 4e980199cb17e..66efa832dec37 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -215,7 +215,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) { List callbacks = new ArrayList<>(); for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) { - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); if (reason.isPresent()) { reasons.add(reason.get()); callbacks.add(tracker::incrementCancellations); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java index 6a6f601642676..03f621dbae06c 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -17,14 +17,14 @@ import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; +import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER; + /** * CpuUsageTracker evaluates if the task has consumed too many CPU cycles than allowed. * * @opensearch.internal */ public class CpuUsageTracker extends TaskResourceUsageTracker { - public static final String NAME = "cpu_usage_tracker"; - private final LongSupplier cpuTimeNanosThresholdSupplier; public CpuUsageTracker(SearchBackpressureSettings settings) { @@ -33,14 +33,11 @@ public CpuUsageTracker(SearchBackpressureSettings settings) { @Override public String name() { - return NAME; + return CPU_USAGE_TRACKER.getName(); } @Override - public void update(Task task) {} - - @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = task.getTotalResourceStats().getCpuTimeInNanos(); long threshold = cpuTimeNanosThresholdSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java index 6e14113b7245a..84d3b41933002 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -17,14 +17,14 @@ import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; +import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER; + /** * ElapsedTimeTracker evaluates if the task has been running for more time than allowed. * * @opensearch.internal */ public class ElapsedTimeTracker extends TaskResourceUsageTracker { - public static final String NAME = "elapsed_time_tracker"; - private final LongSupplier timeNanosSupplier; private final LongSupplier elapsedTimeNanosThresholdSupplier; @@ -35,14 +35,11 @@ public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier time @Override public String name() { - return NAME; + return ELAPSED_TIME_TRACKER.getName(); } @Override - public void update(Task task) {} - - @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); long threshold = elapsedTimeNanosThresholdSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java index 99b73ea6d3585..b6e7d3c0a797d 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -20,6 +20,8 @@ import java.util.function.DoubleSupplier; import java.util.function.LongSupplier; +import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER; + /** * HeapUsageTracker evaluates if the task has consumed too much heap than allowed. * It also compares the task's heap usage against a historical moving average of previously completed tasks. @@ -27,8 +29,6 @@ * @opensearch.internal */ public class HeapUsageTracker extends TaskResourceUsageTracker implements SearchShardTaskSettings.Listener { - public static final String NAME = "heap_usage_tracker"; - private final LongSupplier heapBytesThresholdSupplier; private final DoubleSupplier heapVarianceThresholdSupplier; private final AtomicReference movingAverageReference; @@ -44,7 +44,7 @@ public HeapUsageTracker(SearchBackpressureSettings settings) { @Override public String name() { - return NAME; + return HEAP_USAGE_TRACKER.getName(); } @Override @@ -53,7 +53,7 @@ public void update(Task task) { } @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { MovingAverage movingAverage = movingAverageReference.get(); // There haven't been enough measurements. diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java index 8f1842efa5771..1765dee42ae15 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java @@ -41,10 +41,10 @@ public long getCancellations() { /** * Notifies the tracker to update its state when a task execution completes. */ - public abstract void update(Task task); + public void update(Task task) {} /** * Returns the cancellation reason for the given task, if it's eligible for cancellation. */ - public abstract Optional cancellationReason(Task task); + public abstract Optional checkAndMaybeGetCancellationReason(Task task); } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java new file mode 100644 index 0000000000000..7a74321241534 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +/** + * Defines the type of TaskResourceUsageTracker. + */ +public enum TaskResourceUsageTrackerType { + CPU_USAGE_TRACKER("cpu_usage_tracker"), + HEAP_USAGE_TRACKER("heap_usage_tracker"), + ELAPSED_TIME_TRACKER("elapsed_time_tracker"); + + private final String name; + + TaskResourceUsageTrackerType(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index d2998f244f4e8..7e16e55e16e59 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -132,7 +132,7 @@ public String name() { public void update(Task task) {} @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { return Optional.empty(); } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java index a4b5d4ee76777..83c2e3a7673c8 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -33,7 +33,7 @@ public void testEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(1, reason.get().getCancellationScore()); assertEquals("cpu usage exceeded [200ms >= 15ms]", reason.get().getMessage()); @@ -43,7 +43,7 @@ public void testNotEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java index b3e33be671e04..1124c290cd59d 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -34,7 +34,7 @@ public void testEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0); ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(1, reason.get().getCancellationScore()); assertEquals("elapsed time exceeded [200ms >= 100ms]", reason.get().getMessage()); @@ -44,7 +44,7 @@ public void testNotEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000); ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java index 815da3a6dfe9b..6bb34fd1733a7 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -44,7 +44,7 @@ public void testEligibleForCancellation() { // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(4, reason.get().getCancellationScore()); assertEquals("heap usage exceeded [200b >= 100b]", reason.get().getMessage()); @@ -59,7 +59,7 @@ public void testNotEligibleForCancellation() { task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99); // Not enough observations. - reason = tracker.cancellationReason(task); + reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); // Record enough observations to make the moving average 'ready'. @@ -68,13 +68,13 @@ public void testNotEligibleForCancellation() { } // Task with heap usage < heapBytesThreshold should not be cancelled. - reason = tracker.cancellationReason(task); + reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); // Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled. double allowedHeapUsage = 99.0 * 2.0; task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1)); - reason = tracker.cancellationReason(task); + reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } } diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java index d6a82702e074d..50a510f954677 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java @@ -64,7 +64,7 @@ public String name() { public void update(Task task) {} @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { return Optional.empty(); } };