-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
- Loading branch information
1 parent
97c1bf0
commit 1a27813
Showing
10 changed files
with
930 additions
and
0 deletions.
There are no files selected for viewing
81 changes: 81 additions & 0 deletions
81
server/src/main/java/org/opensearch/wlm/cancellation/AbstractTaskSelectionStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.wlm.cancellation; | ||
|
||
import org.opensearch.search.ResourceType; | ||
import org.opensearch.tasks.CancellableTask; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.tasks.TaskCancellation; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Represents an abstract task selection strategy. | ||
* This class implements the TaskSelectionStrategy interface and provides a method to select tasks for cancellation based on a sorting condition. | ||
* The specific sorting condition depends on the implementation. | ||
*/ | ||
public abstract class AbstractTaskSelectionStrategy implements TaskSelectionStrategy { | ||
|
||
/** | ||
* Returns a comparator that defines the sorting condition for tasks. | ||
* The specific sorting condition depends on the implementation. | ||
* | ||
* @return The comparator | ||
*/ | ||
public abstract Comparator<Task> sortingCondition(); | ||
|
||
/** | ||
* Selects tasks for cancellation based on the provided limit and resource type. | ||
* The tasks are sorted based on the sorting condition and then selected until the accumulated resource usage reaches the limit. | ||
* | ||
* @param tasks The list of tasks from which to select | ||
* @param limit The limit on the accumulated resource usage | ||
* @param resourceType The type of resource to consider | ||
* @return The list of selected tasks | ||
* @throws IllegalArgumentException If the limit is less than zero | ||
*/ | ||
@Override | ||
public List<TaskCancellation> selectTasksForCancellation(List<Task> tasks, long limit, ResourceType resourceType) { | ||
if (limit < 0) { | ||
throw new IllegalArgumentException("reduceBy has to be greater than zero"); | ||
} | ||
if (limit == 0) { | ||
return Collections.emptyList(); | ||
} | ||
|
||
List<Task> sortedTasks = tasks.stream().sorted(sortingCondition()).collect(Collectors.toList()); | ||
|
||
List<TaskCancellation> selectedTasks = new ArrayList<>(); | ||
long accumulated = 0; | ||
|
||
for (Task task : sortedTasks) { | ||
if (task instanceof CancellableTask) { | ||
selectedTasks.add(createTaskCancellation((CancellableTask) task)); | ||
accumulated += resourceType.getResourceUsage(task); | ||
if (accumulated >= limit) { | ||
break; | ||
} | ||
} | ||
} | ||
return selectedTasks; | ||
} | ||
|
||
private TaskCancellation createTaskCancellation(CancellableTask task) { | ||
// TODO add correct reason and callbacks | ||
return new TaskCancellation(task, List.of(new TaskCancellation.Reason("limits exceeded", 5)), List.of(this::callbackOnCancel)); | ||
} | ||
|
||
private void callbackOnCancel() { | ||
// todo Implement callback logic here mostly used for Stats | ||
} | ||
} |
218 changes: 218 additions & 0 deletions
218
server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.wlm.cancellation; | ||
|
||
import org.opensearch.cluster.metadata.QueryGroup; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.monitor.jvm.JvmStats; | ||
import org.opensearch.monitor.process.ProcessProbe; | ||
import org.opensearch.search.ResourceType; | ||
import org.opensearch.search.backpressure.settings.NodeDuressSettings; | ||
import org.opensearch.search.backpressure.trackers.NodeDuressTrackers; | ||
import org.opensearch.tasks.TaskCancellation; | ||
import org.opensearch.wlm.QueryGroupLevelResourceUsageView; | ||
|
||
import java.util.ArrayList; | ||
import java.util.EnumMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService.TRACKED_RESOURCES; | ||
|
||
/** | ||
* Manages the cancellation of tasks enforced by QueryGroup thresholds on resource usage criteria. | ||
* This class utilizes a strategy pattern through {@link TaskSelectionStrategy} to identify tasks that exceed | ||
* predefined resource usage limits and are therefore eligible for cancellation. | ||
* | ||
* <p>The cancellation process is initiated by evaluating the resource usage of each QueryGroup against its | ||
* resource limits. Tasks that contribute to exceeding these limits are selected for cancellation based on the | ||
* implemented task selection strategy.</p> | ||
* | ||
* <p>Instances of this class are configured with a map linking QueryGroup IDs to their corresponding resource usage | ||
* views, a set of active QueryGroups, and a task selection strategy. These components collectively facilitate the | ||
* identification and cancellation of tasks that threaten to breach QueryGroup resource limits.</p> | ||
* | ||
* @see TaskSelectionStrategy | ||
* @see QueryGroup | ||
* @see ResourceType | ||
*/ | ||
public class DefaultTaskCancellation { | ||
private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); | ||
|
||
protected final TaskSelectionStrategy taskSelectionStrategy; | ||
// a map of QueryGroupId to its corresponding QueryGroupLevelResourceUsageView object | ||
protected final Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews; | ||
protected final Set<QueryGroup> activeQueryGroups; | ||
protected NodeDuressTrackers nodeDuressTrackers; | ||
|
||
public DefaultTaskCancellation( | ||
TaskSelectionStrategy taskSelectionStrategy, | ||
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews, | ||
Set<QueryGroup> activeQueryGroups, | ||
Settings settings, | ||
ClusterSettings clusterSettings | ||
) { | ||
this.taskSelectionStrategy = taskSelectionStrategy; | ||
this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews; | ||
this.activeQueryGroups = activeQueryGroups; | ||
this.nodeDuressTrackers = setupNodeDuressTracker(settings, clusterSettings); | ||
} | ||
|
||
/** | ||
* Cancel tasks based on the implemented strategy. | ||
*/ | ||
public final void cancelTasks() { | ||
cancelTasksForMode(QueryGroup.ResiliencyMode.ENFORCED); | ||
|
||
if (nodeDuressTrackers.isNodeInDuress()) { | ||
cancelTasksForMode(QueryGroup.ResiliencyMode.SOFT); | ||
} | ||
} | ||
|
||
private void cancelTasksForMode(QueryGroup.ResiliencyMode resiliencyMode) { | ||
List<TaskCancellation> cancellableTasks = getAllCancellableTasksFrom(resiliencyMode); | ||
for (TaskCancellation taskCancellation : cancellableTasks) { | ||
taskCancellation.cancel(); | ||
} | ||
} | ||
|
||
/** | ||
* Get all cancellable tasks from the QueryGroups. | ||
* | ||
* @return List of tasks that can be cancelled | ||
*/ | ||
protected List<TaskCancellation> getAllCancellableTasksFrom(QueryGroup.ResiliencyMode resiliencyMode) { | ||
return getQueryGroupsToCancelFrom(resiliencyMode).stream() | ||
.flatMap(queryGroup -> getCancellableTasksFrom(queryGroup).stream()) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
/** | ||
* returns the list of QueryGroups breaching their resource limits. | ||
* | ||
* @return List of QueryGroups | ||
*/ | ||
private List<QueryGroup> getQueryGroupsToCancelFrom(QueryGroup.ResiliencyMode resiliencyMode) { | ||
final List<QueryGroup> queryGroupsToCancelFrom = new ArrayList<>(); | ||
|
||
for (QueryGroup queryGroup : this.activeQueryGroups) { | ||
if (queryGroup.getResiliencyMode() != resiliencyMode) { | ||
continue; | ||
} | ||
Map<ResourceType, Long> queryGroupResourceUsage = queryGroupLevelResourceUsageViews.get(queryGroup.get_id()) | ||
.getResourceUsageData(); | ||
|
||
for (ResourceType resourceType : TRACKED_RESOURCES) { | ||
if (queryGroup.getResourceLimits().containsKey(resourceType) && queryGroupResourceUsage.containsKey(resourceType)) { | ||
Double resourceLimit = (Double) queryGroup.getResourceLimits().get(resourceType); | ||
Long resourceUsage = queryGroupResourceUsage.get(resourceType); | ||
|
||
if (isBreachingThreshold(resourceType, resourceLimit, resourceUsage)) { | ||
queryGroupsToCancelFrom.add(queryGroup); | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
|
||
return queryGroupsToCancelFrom; | ||
} | ||
|
||
/** | ||
* Get cancellable tasks from a specific queryGroup. | ||
* | ||
* @param queryGroup The QueryGroup from which to get cancellable tasks | ||
* @return List of tasks that can be cancelled | ||
*/ | ||
protected List<TaskCancellation> getCancellableTasksFrom(QueryGroup queryGroup) { | ||
return TRACKED_RESOURCES.stream() | ||
.filter(resourceType -> shouldCancelTasks(queryGroup, resourceType)) | ||
.flatMap(resourceType -> getTaskCancellations(queryGroup, resourceType).stream()) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private boolean shouldCancelTasks(QueryGroup queryGroup, ResourceType resourceType) { | ||
long reduceBy = getReduceBy(queryGroup, resourceType); | ||
return reduceBy > 0; | ||
} | ||
|
||
private List<TaskCancellation> getTaskCancellations(QueryGroup queryGroup, ResourceType resourceType) { | ||
return taskSelectionStrategy.selectTasksForCancellation( | ||
// get the active tasks in the query group | ||
queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getActiveTasks(), | ||
getReduceBy(queryGroup, resourceType), | ||
resourceType | ||
); | ||
} | ||
|
||
private long getReduceBy(QueryGroup queryGroup, ResourceType resourceType) { | ||
if (queryGroup.getResourceLimits().get(resourceType) == null) { | ||
return 0; | ||
} | ||
Double threshold = (Double) queryGroup.getResourceLimits().get(resourceType); | ||
return getResourceUsage(queryGroup, resourceType) - convertThresholdIntoLong(resourceType, threshold); | ||
} | ||
|
||
private Long convertThresholdIntoLong(ResourceType resourceType, Double resourceThresholdInPercentage) { | ||
Long threshold = null; | ||
if (resourceType == ResourceType.MEMORY) { | ||
// Check if resource usage is breaching the threshold | ||
threshold = (long) (resourceThresholdInPercentage * HEAP_SIZE_BYTES); | ||
} else if (resourceType == ResourceType.CPU) { | ||
// Get the total CPU time of the process in milliseconds | ||
long cpuTotalTimeInMillis = ProcessProbe.getInstance().getProcessCpuTotalTime(); | ||
// Check if resource usage is breaching the threshold | ||
threshold = (long) (resourceThresholdInPercentage * cpuTotalTimeInMillis); | ||
} | ||
return threshold; | ||
} | ||
|
||
private Long getResourceUsage(QueryGroup queryGroup, ResourceType resourceType) { | ||
if (!queryGroupLevelResourceUsageViews.containsKey(queryGroup.get_id())) { | ||
return 0L; | ||
} | ||
return queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getResourceUsageData().get(resourceType); | ||
} | ||
|
||
private boolean isBreachingThreshold(ResourceType resourceType, Double resourceThresholdInPercentage, long resourceUsage) { | ||
if (resourceType == ResourceType.MEMORY) { | ||
// Check if resource usage is breaching the threshold | ||
return resourceUsage > convertThresholdIntoLong(resourceType, resourceThresholdInPercentage); | ||
} | ||
// Resource types should be CPU, resourceUsage is in nanoseconds, convert to milliseconds | ||
long resourceUsageInMillis = resourceUsage / 1_000_000; | ||
// Check if resource usage is breaching the threshold | ||
return resourceUsageInMillis > convertThresholdIntoLong(resourceType, resourceThresholdInPercentage); | ||
} | ||
|
||
private NodeDuressTrackers setupNodeDuressTracker(Settings settings, ClusterSettings clusterSettings) { | ||
NodeDuressSettings nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); | ||
return new NodeDuressTrackers(new EnumMap<>(ResourceType.class) { | ||
{ | ||
put( | ||
ResourceType.CPU, | ||
new NodeDuressTrackers.NodeDuressTracker( | ||
() -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= nodeDuressSettings.getCpuThreshold(), | ||
nodeDuressSettings::getNumSuccessiveBreaches | ||
) | ||
); | ||
put( | ||
ResourceType.MEMORY, | ||
new NodeDuressTrackers.NodeDuressTracker( | ||
() -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= nodeDuressSettings.getHeapThreshold(), | ||
nodeDuressSettings::getNumSuccessiveBreaches | ||
) | ||
); | ||
} | ||
}); | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
...c/main/java/org/opensearch/wlm/cancellation/LongestRunningTaskFirstSelectionStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.wlm.cancellation; | ||
|
||
import org.opensearch.tasks.Task; | ||
|
||
import java.util.Comparator; | ||
|
||
/** | ||
* Represents a task selection strategy that prioritizes the longest running tasks first. | ||
*/ | ||
public class LongestRunningTaskFirstSelectionStrategy extends AbstractTaskSelectionStrategy { | ||
|
||
/** | ||
* Returns a comparator that sorts tasks based on their start time in descending order. | ||
* | ||
* @return The comparator | ||
*/ | ||
@Override | ||
public Comparator<Task> sortingCondition() { | ||
return Comparator.comparingLong(Task::getStartTime); | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
.../main/java/org/opensearch/wlm/cancellation/ShortestRunningTaskFirstSelectionStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.wlm.cancellation; | ||
|
||
import org.opensearch.tasks.Task; | ||
|
||
import java.util.Comparator; | ||
|
||
/** | ||
* Represents a task selection strategy that prioritizes the shortest running tasks first. | ||
*/ | ||
public class ShortestRunningTaskFirstSelectionStrategy extends AbstractTaskSelectionStrategy { | ||
|
||
/** | ||
* Returns a comparator that sorts tasks based on their start time in ascending order. | ||
* | ||
* @return The comparator | ||
*/ | ||
@Override | ||
public Comparator<Task> sortingCondition() { | ||
return Comparator.comparingLong(Task::getStartTime).reversed(); | ||
} | ||
} |
32 changes: 32 additions & 0 deletions
32
server/src/main/java/org/opensearch/wlm/cancellation/TaskSelectionStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.wlm.cancellation; | ||
|
||
import org.opensearch.search.ResourceType; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.tasks.TaskCancellation; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Interface for strategies to select tasks for cancellation. | ||
* Implementations of this interface define how tasks are selected for cancellation based on resource usage. | ||
*/ | ||
public interface TaskSelectionStrategy { | ||
/** | ||
* Determines which tasks should be cancelled based on the provided criteria. | ||
* | ||
* @param tasks List of tasks available for cancellation. | ||
* @param limit The amount of tasks to select whose resources reach this limit | ||
* @param resourceType The type of resource that needs to be reduced, guiding the selection process. | ||
* | ||
* @return List of tasks that should be cancelled. | ||
*/ | ||
List<TaskCancellation> selectTasksForCancellation(List<Task> tasks, long limit, ResourceType resourceType); | ||
} |
Oops, something went wrong.