Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 committed May 21, 2024
1 parent 5bcac55 commit 6b1c658
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 145 deletions.
32 changes: 32 additions & 0 deletions server/src/main/java/org/opensearch/search/ResourceType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

public enum ResourceType {
CPU("cpu"),
JVM("jvm");

private final String name;
ResourceType(String name) {
this.name = name;
}

public static ResourceType fromName(String s) {
for (ResourceType resourceType: values()) {
if (resourceType.getName().equals(s)) {
return resourceType;
}
}
throw new IllegalArgumentException("Unknown resource type: [" + s + "]");
}

private String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.ResourceType;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.NodeDuressTrackers;
import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers;
import org.opensearch.tasks.CancellableTask;
Expand All @@ -43,11 +45,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.DoubleSupplier;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

Expand All @@ -61,7 +65,14 @@
*/
public class SearchBackpressureService extends AbstractLifecycleComponent implements TaskCompletionListener {
private static final Logger logger = LogManager.getLogger(SearchBackpressureService.class);

private static final List<Class<? extends SearchBackpressureTask>> TRACKED_TASK_TYPES = List.of(SearchTask.class, SearchShardTask.class);
private static final Map<TaskResourceUsageTrackerType, Function<NodeDuressTrackers, Boolean>> trackerApplyConditions =
Map.of(
TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, (nodeDuressTrackers) -> nodeDuressTrackers.isResourceInDuress(ResourceType.CPU),
TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, (nodeDuressTrackers) ->
isHeapTrackingSupported() && nodeDuressTrackers.isResourceInDuress(ResourceType.JVM),
TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, (nodeDuressTrackers) -> true
);
private volatile Scheduler.Cancellable scheduledFuture;

private final SearchBackpressureSettings settings;
Expand All @@ -86,16 +97,16 @@ public SearchBackpressureService(
taskResourceTrackingService,
threadPool,
System::nanoTime,
new NodeDuressTrackers(
new NodeDuressTrackers.NodeDuressTracker(
new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {{
put(ResourceType.CPU, new NodeDuressTracker(
() -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings().getCpuThreshold(),
() -> settings.getNodeDuressSettings().getNumSuccessiveBreaches()
),
new NodeDuressTrackers.NodeDuressTracker(
));
put(ResourceType.JVM, new NodeDuressTracker(
() -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold(),
() -> settings.getNodeDuressSettings().getNumSuccessiveBreaches()
)
),
));
}}),
getTrackers(
settings.getSearchTaskSettings()::getCpuTimeNanosThreshold,
settings.getSearchTaskSettings()::getHeapVarianceThreshold,
Expand Down Expand Up @@ -170,18 +181,27 @@ void doRun() {

List<CancellableTask> searchTasks = getTaskByType(SearchTask.class);
List<CancellableTask> searchShardTasks = getTaskByType(SearchShardTask.class);
final Map<Class<? extends SearchBackpressureTask>, List<CancellableTask>> cancellableTasks =
Map.of(
SearchTask.class, searchTasks,
SearchShardTask.class, searchShardTasks
);

// Force-refresh usage stats of these tasks before making a cancellation decision.
taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0]));
taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0]));

List<TaskCancellation> taskCancellations = new ArrayList<>();

taskCancellations = addHeapBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);

taskCancellations = addCPUBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);

taskCancellations = addElapsedTimeBasedTaskCancellations(taskCancellations, searchTasks, searchShardTasks);
for (TaskResourceUsageTrackerType trackerType: TaskResourceUsageTrackerType.values()) {
if (shouldApply(trackerType)) {
addResourceTrackerBasedCancellations(
trackerType,
taskCancellations,
cancellableTasks
);
}
}

// Since these cancellations might be duplicate due to multiple trackers causing cancellation for same task
// We need to merge them
Expand Down Expand Up @@ -219,74 +239,31 @@ void doRun() {
}
}

private List<TaskCancellation> addElapsedTimeBasedTaskCancellations(
List<TaskCancellation> taskCancellations,
List<CancellableTask> searchTasks,
List<CancellableTask> searchShardTasks
) {
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchTaskElapsedTimeTracker =
getTaskResourceUsageTrackersByType(SearchTask.class).getElapsedTimeTracker();
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchShardTaskElapsedTimeTracker =
getTaskResourceUsageTrackersByType(SearchShardTask.class).getElapsedTimeTracker();

addTaskCancellationsFromTaskResourceUsageTracker(taskCancellations, searchTasks, searchTaskElapsedTimeTracker, SearchTask.class);

addTaskCancellationsFromTaskResourceUsageTracker(
taskCancellations,
searchShardTasks,
searchShardTaskElapsedTimeTracker,
SearchShardTask.class
);

return taskCancellations;
private boolean shouldApply(TaskResourceUsageTrackerType trackerType) {
return trackerApplyConditions.get(trackerType).apply(nodeDuressTrackers);
}

private List<TaskCancellation> addCPUBasedTaskCancellations(
private List<TaskCancellation> addResourceTrackerBasedCancellations(
TaskResourceUsageTrackerType type,
List<TaskCancellation> taskCancellations,
List<CancellableTask> searchTasks,
List<CancellableTask> searchShardTasks
Map<Class<? extends SearchBackpressureTask>, List<CancellableTask>> cancellableTasks
) {
if (nodeDuressTrackers.isCPUInDuress()) {
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchTaskCPUUsageTracker =
getTaskResourceUsageTrackersByType(SearchTask.class).getCpuUsageTracker();
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchShardTaskCPUUsageTracker =
getTaskResourceUsageTrackersByType(SearchShardTask.class).getCpuUsageTracker();

addTaskCancellationsFromTaskResourceUsageTracker(taskCancellations, searchTasks, searchTaskCPUUsageTracker, SearchTask.class);
for (Class<? extends SearchBackpressureTask> taskType: TRACKED_TASK_TYPES) {
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> taskResourceTracker =
getTaskResourceUsageTrackersByType(taskType).getTracker(type);

addTaskCancellationsFromTaskResourceUsageTracker(
taskCancellations,
searchShardTasks,
searchShardTaskCPUUsageTracker,
SearchShardTask.class
cancellableTasks.get(taskType),
taskResourceTracker,
taskType
);
}
return taskCancellations;
}

private List<TaskCancellation> addHeapBasedTaskCancellations(
List<TaskCancellation> taskCancellations,
List<CancellableTask> searchTasks,
List<CancellableTask> searchShardTasks
) {
if (isHeapTrackingSupported() && nodeDuressTrackers.isHeapInDuress()) {
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchTaskHeapUsageTracker =
getTaskResourceUsageTrackersByType(SearchTask.class).getHeapUsageTracker();
final Optional<TaskResourceUsageTrackers.TaskResourceUsageTracker> searchShardTaskHeapUsageTracker =
getTaskResourceUsageTrackersByType(SearchShardTask.class).getHeapUsageTracker();

addTaskCancellationsFromTaskResourceUsageTracker(taskCancellations, searchTasks, searchTaskHeapUsageTracker, SearchTask.class);

addTaskCancellationsFromTaskResourceUsageTracker(
taskCancellations,
searchShardTasks,
searchShardTaskHeapUsageTracker,
SearchShardTask.class
);
}
return taskCancellations;
}


private void addTaskCancellationsFromTaskResourceUsageTracker(
List<TaskCancellation> taskCancellations,
List<CancellableTask> tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.search.backpressure.trackers;

import org.opensearch.common.util.Streak;
import org.opensearch.search.ResourceType;

import java.util.EnumMap;
import java.util.function.BooleanSupplier;
import java.util.function.IntSupplier;

Expand All @@ -19,37 +21,26 @@
* @opensearch.internal
*/
public class NodeDuressTrackers {
private final EnumMap<ResourceType, NodeDuressTracker> duressTrackers;

private final NodeDuressTracker heapDuressTracker;
private final NodeDuressTracker cpuDuressTracker;

public NodeDuressTrackers(final NodeDuressTracker heapDuressTracker, final NodeDuressTracker cpuDuressTracker) {
this.heapDuressTracker = heapDuressTracker;
this.cpuDuressTracker = cpuDuressTracker;
}

/**
* Method to check the heap duress
* @return true if heap is in duress
*/
public boolean isHeapInDuress() {
return heapDuressTracker.test();
public NodeDuressTrackers(EnumMap<ResourceType, NodeDuressTracker> duressTrackers) {
this.duressTrackers = duressTrackers;
}

/**
* Method to check the CPU duress
* @return true if cpu is in duress
* Method to check the {@link ResourceType} in duress
* @return Boolean
*/
public boolean isCPUInDuress() {
return cpuDuressTracker.test();
public boolean isResourceInDuress(ResourceType resourceType) {
return duressTrackers.get(resourceType).test();
}

/**
* Method to evaluate whether the node is in duress or not
* @return true if node is in duress because of either system resource
*/
public boolean isNodeInDuress() {
return isCPUInDuress() || isHeapInDuress();
return isResourceInDuress(ResourceType.CPU) || isResourceInDuress(ResourceType.JVM);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

package org.opensearch.search.backpressure.trackers;

import org.opensearch.search.ResourceType;

import java.util.function.Function;

import static org.opensearch.search.backpressure.trackers.HeapUsageTracker.isHeapTrackingSupported;

/**
* Defines the type of TaskResourceUsageTracker.
*/
Expand All @@ -17,7 +23,6 @@ public enum TaskResourceUsageTrackerType {
ELAPSED_TIME_TRACKER("elapsed_time_tracker");

private final String name;

TaskResourceUsageTrackerType(String name) {
this.name = name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
* @opensearch.internal
*/
public class TaskResourceUsageTrackers {
private TaskResourceUsageTracker cpuUsageTracker;
private TaskResourceUsageTracker heapUsageTracker;
private TaskResourceUsageTracker elapsedTimeTracker;
private final EnumMap<TaskResourceUsageTrackerType, TaskResourceUsageTracker> all;

public TaskResourceUsageTrackers() {
Expand All @@ -41,7 +38,6 @@ public TaskResourceUsageTrackers() {
* @param cpuUsageTracker
*/
public void addCpuUsageTracker(final TaskResourceUsageTracker cpuUsageTracker) {
this.cpuUsageTracker = cpuUsageTracker;
all.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, cpuUsageTracker);
}

Expand All @@ -50,7 +46,6 @@ public void addCpuUsageTracker(final TaskResourceUsageTracker cpuUsageTracker) {
* @param heapUsageTracker
*/
public void addHeapUsageTracker(final TaskResourceUsageTracker heapUsageTracker) {
this.heapUsageTracker = heapUsageTracker;
all.put(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, heapUsageTracker);
}

Expand All @@ -59,32 +54,15 @@ public void addHeapUsageTracker(final TaskResourceUsageTracker heapUsageTracker)
* @param elapsedTimeTracker
*/
public void addElapsedTimeTracker(final TaskResourceUsageTracker elapsedTimeTracker) {
this.elapsedTimeTracker = elapsedTimeTracker;
all.put(TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, elapsedTimeTracker);
}

/**
* getter for cpuUsageTracker
* @return
*/
public Optional<TaskResourceUsageTracker> getCpuUsageTracker() {
return Optional.ofNullable(cpuUsageTracker);
}

/**
* getter for heapUsageTacker
* @return
*/
public Optional<TaskResourceUsageTracker> getHeapUsageTracker() {
return Optional.ofNullable(heapUsageTracker);
}

/**
* getter for elapsedTimeTracker
* @return
*/
public Optional<TaskResourceUsageTracker> getElapsedTimeTracker() {
return Optional.ofNullable(elapsedTimeTracker);
public Optional<TaskResourceUsageTracker> getTracker(TaskResourceUsageTrackerType type) {
return Optional.ofNullable(all.get(type));
}

/**
Expand Down
Loading

0 comments on commit 6b1c658

Please sign in to comment.