From 5cbfa8a13cb411985c83e00899e8b318070a62bc Mon Sep 17 00:00:00 2001 From: Ketan Verma Date: Fri, 21 Oct 2022 00:33:09 +0530 Subject: [PATCH] Refactoring changes Signed-off-by: Ketan Verma --- .../SearchBackpressureService.java | 2 +- .../trackers/CpuUsageTracker.java | 9 ++---- .../trackers/ElapsedTimeTracker.java | 9 ++---- .../trackers/HeapUsageTracker.java | 6 ++-- .../trackers/TaskResourceUsageTracker.java | 4 +-- .../TaskResourceUsageTrackerType.java | 28 +++++++++++++++++++ .../SearchBackpressureServiceTests.java | 2 +- .../trackers/CpuUsageTrackerTests.java | 4 +-- .../trackers/ElapsedTimeTrackerTests.java | 4 +-- .../trackers/HeapUsageTrackerTests.java | 8 +++--- .../tasks/TaskCancellationTests.java | 2 +- 11 files changed, 50 insertions(+), 28 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index 4e980199cb17e..66efa832dec37 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -215,7 +215,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) { List callbacks = new ArrayList<>(); for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) { - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); if (reason.isPresent()) { reasons.add(reason.get()); callbacks.add(tracker::incrementCancellations); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java index 6a6f601642676..608c1fe5ebde9 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -23,7 +23,7 @@ * @opensearch.internal */ public class CpuUsageTracker extends TaskResourceUsageTracker { - public static final String NAME = "cpu_usage_tracker"; + public static final TaskResourceUsageTrackerType TYPE = TaskResourceUsageTrackerType.CPU_USAGE_TRACKER; private final LongSupplier cpuTimeNanosThresholdSupplier; @@ -33,14 +33,11 @@ public CpuUsageTracker(SearchBackpressureSettings settings) { @Override public String name() { - return NAME; + return TYPE.getName(); } @Override - public void update(Task task) {} - - @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = task.getTotalResourceStats().getCpuTimeInNanos(); long threshold = cpuTimeNanosThresholdSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java index 6e14113b7245a..cc1cb25ea8465 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -23,7 +23,7 @@ * @opensearch.internal */ public class ElapsedTimeTracker extends TaskResourceUsageTracker { - public static final String NAME = "elapsed_time_tracker"; + public static final TaskResourceUsageTrackerType TYPE = TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER; private final LongSupplier timeNanosSupplier; private final LongSupplier elapsedTimeNanosThresholdSupplier; @@ -35,14 +35,11 @@ public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier time @Override public String name() { - return NAME; + return TYPE.getName(); } @Override - public void update(Task task) {} - - @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); long threshold = elapsedTimeNanosThresholdSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java index 99b73ea6d3585..9930504c4a7f1 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -27,7 +27,7 @@ * @opensearch.internal */ public class HeapUsageTracker extends TaskResourceUsageTracker implements SearchShardTaskSettings.Listener { - public static final String NAME = "heap_usage_tracker"; + public static final TaskResourceUsageTrackerType TYPE = TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER; private final LongSupplier heapBytesThresholdSupplier; private final DoubleSupplier heapVarianceThresholdSupplier; @@ -44,7 +44,7 @@ public HeapUsageTracker(SearchBackpressureSettings settings) { @Override public String name() { - return NAME; + return TYPE.getName(); } @Override @@ -53,7 +53,7 @@ public void update(Task task) { } @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { MovingAverage movingAverage = movingAverageReference.get(); // There haven't been enough measurements. diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java index 8f1842efa5771..1765dee42ae15 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java @@ -41,10 +41,10 @@ public long getCancellations() { /** * Notifies the tracker to update its state when a task execution completes. */ - public abstract void update(Task task); + public void update(Task task) {} /** * Returns the cancellation reason for the given task, if it's eligible for cancellation. */ - public abstract Optional cancellationReason(Task task); + public abstract Optional checkAndMaybeGetCancellationReason(Task task); } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java new file mode 100644 index 0000000000000..7a74321241534 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +/** + * Defines the type of TaskResourceUsageTracker. + */ +public enum TaskResourceUsageTrackerType { + CPU_USAGE_TRACKER("cpu_usage_tracker"), + HEAP_USAGE_TRACKER("heap_usage_tracker"), + ELAPSED_TIME_TRACKER("elapsed_time_tracker"); + + private final String name; + + TaskResourceUsageTrackerType(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index d2998f244f4e8..7e16e55e16e59 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -132,7 +132,7 @@ public String name() { public void update(Task task) {} @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { return Optional.empty(); } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java index a4b5d4ee76777..83c2e3a7673c8 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -33,7 +33,7 @@ public void testEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(1, reason.get().getCancellationScore()); assertEquals("cpu usage exceeded [200ms >= 15ms]", reason.get().getMessage()); @@ -43,7 +43,7 @@ public void testNotEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java index b3e33be671e04..1124c290cd59d 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -34,7 +34,7 @@ public void testEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0); ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(1, reason.get().getCancellationScore()); assertEquals("elapsed time exceeded [200ms >= 100ms]", reason.get().getMessage()); @@ -44,7 +44,7 @@ public void testNotEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000); ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java index 815da3a6dfe9b..6bb34fd1733a7 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -44,7 +44,7 @@ public void testEligibleForCancellation() { // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200); - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(4, reason.get().getCancellationScore()); assertEquals("heap usage exceeded [200b >= 100b]", reason.get().getMessage()); @@ -59,7 +59,7 @@ public void testNotEligibleForCancellation() { task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99); // Not enough observations. - reason = tracker.cancellationReason(task); + reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); // Record enough observations to make the moving average 'ready'. @@ -68,13 +68,13 @@ public void testNotEligibleForCancellation() { } // Task with heap usage < heapBytesThreshold should not be cancelled. - reason = tracker.cancellationReason(task); + reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); // Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled. double allowedHeapUsage = 99.0 * 2.0; task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1)); - reason = tracker.cancellationReason(task); + reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } } diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java index d6a82702e074d..50a510f954677 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java @@ -64,7 +64,7 @@ public String name() { public void update(Task task) {} @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { return Optional.empty(); } };