Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workload Management] QueryGroup Resource Cancellation #15151

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709))
- [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15151](https://github.com/opensearch-project/OpenSearch/pull/15151))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.cancellation;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.ResourceType;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
import org.opensearch.wlm.QueryGroupLevelResourceUsageView;
import org.opensearch.wlm.WorkloadManagementSettings;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService.TRACKED_RESOURCES;

/**
* Manages the cancellation of tasks enforced by QueryGroup thresholds on resource usage criteria.
* This class utilizes a strategy pattern through {@link DefaultTaskSelectionStrategy} to identify tasks that exceed
* predefined resource usage limits and are therefore eligible for cancellation.
*
* <p>The cancellation process is initiated by evaluating the resource usage of each QueryGroup against its
* resource limits. Tasks that contribute to exceeding these limits are selected for cancellation based on the
* implemented task selection strategy.</p>
*
* <p>Instances of this class are configured with a map linking QueryGroup IDs to their corresponding resource usage
* views, a set of active QueryGroups, and a task selection strategy. These components collectively facilitate the
* identification and cancellation of tasks that threaten to breach QueryGroup resource limits.</p>
*
* @see DefaultTaskSelectionStrategy
* @see QueryGroup
* @see ResourceType
*/
public class DefaultTaskCancellation {
private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes();

protected final WorkloadManagementSettings workloadManagementSettings;
protected final DefaultTaskSelectionStrategy defaultTaskSelectionStrategy;
// a map of QueryGroupId to its corresponding QueryGroupLevelResourceUsageView object
protected final Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews;
protected final Collection<QueryGroup> activeQueryGroups;
protected final Collection<QueryGroup> deletedQueryGroups;
protected BooleanSupplier isNodeInDuress;

public DefaultTaskCancellation(
WorkloadManagementSettings workloadManagementSettings,
DefaultTaskSelectionStrategy defaultTaskSelectionStrategy,
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews,
Collection<QueryGroup> activeQueryGroups,
Collection<QueryGroup> deletedQueryGroups,
BooleanSupplier isNodeInDuress
) {
this.workloadManagementSettings = workloadManagementSettings;
this.defaultTaskSelectionStrategy = defaultTaskSelectionStrategy;
this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews;
this.activeQueryGroups = activeQueryGroups;
this.deletedQueryGroups = deletedQueryGroups;
this.isNodeInDuress = isNodeInDuress;
}

/**
* Cancel tasks based on the implemented strategy.
*/
public final void cancelTasks() {
kiranprakash154 marked this conversation as resolved.
Show resolved Hide resolved
// cancel tasks from QueryGroups that are in Enforced mode that are breaching their resource limits
cancelTasks(QueryGroup.ResiliencyMode.ENFORCED);
// if the node is in duress, cancel tasks accordingly.
handleNodeDuress();
}

private void handleNodeDuress() {
if (!isNodeInDuress.getAsBoolean()) {
return;
}
// List of tasks to be executed in order if the node is in duress
List<Consumer<Void>> duressActions = List.of(
v -> cancelTasksFromDeletedQueryGroups(),
v -> cancelTasks(QueryGroup.ResiliencyMode.SOFT)
);

for (Consumer<Void> duressAction : duressActions) {
if (!isNodeInDuress.getAsBoolean()) {
break;
}
duressAction.accept(null);
}
}

private void cancelTasksFromDeletedQueryGroups() {
cancelTasks(getAllCancellableTasks(this.deletedQueryGroups));
}

/**
* Get all cancellable tasks from the QueryGroups.
*
* @return List of tasks that can be cancelled
*/
protected List<TaskCancellation> getAllCancellableTasks(QueryGroup.ResiliencyMode resiliencyMode) {
return getAllCancellableTasks(getQueryGroupsToCancelFrom(resiliencyMode));
}

/**
* Get all cancellable tasks from the given QueryGroups.
*
* @return List of tasks that can be cancelled
*/
protected List<TaskCancellation> getAllCancellableTasks(Collection<QueryGroup> queryGroups) {
return queryGroups.stream().flatMap(queryGroup -> getCancellableTasksFrom(queryGroup).stream()).collect(Collectors.toList());
}

/**
* returns the list of QueryGroups breaching their resource limits.
*
* @return List of QueryGroups
*/
private List<QueryGroup> getQueryGroupsToCancelFrom(QueryGroup.ResiliencyMode resiliencyMode) {
final List<QueryGroup> queryGroupsToCancelFrom = new ArrayList<>();

for (QueryGroup queryGroup : this.activeQueryGroups) {
if (queryGroup.getResiliencyMode() != resiliencyMode) {
continue;
}
Map<ResourceType, Long> queryGroupResourceUsage = queryGroupLevelResourceUsageViews.get(queryGroup.get_id())
.getResourceUsageData();

for (ResourceType resourceType : TRACKED_RESOURCES) {
if (queryGroup.getResourceLimits().containsKey(resourceType) && queryGroupResourceUsage.containsKey(resourceType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think right side of this && operator is not required

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its just a check to proceed with isBreachingThreshold only if the querygroup has limits set on resourceType and there is usage recorded for the usage type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true but it is unnecessary because the queryGroupResourceUsage map contains resource usage for all tracked resources

Double resourceLimit = (Double) queryGroup.getResourceLimits().get(resourceType);
Long resourceUsage = queryGroupResourceUsage.get(resourceType);

if (isBreachingThreshold(resourceType, resourceLimit, resourceUsage)) {
queryGroupsToCancelFrom.add(queryGroup);
break;
}
}
}
}

return queryGroupsToCancelFrom;
}

private void cancelTasks(QueryGroup.ResiliencyMode resiliencyMode) {
cancelTasks(getAllCancellableTasks(resiliencyMode));
}

private void cancelTasks(List<TaskCancellation> cancellableTasks) {
cancellableTasks.forEach(TaskCancellation::cancel);
}

/**
* Get cancellable tasks from a specific queryGroup.
*
* @param queryGroup The QueryGroup from which to get cancellable tasks
* @return List of tasks that can be cancelled
*/
protected List<TaskCancellation> getCancellableTasksFrom(QueryGroup queryGroup) {
return TRACKED_RESOURCES.stream()
.filter(resourceType -> shouldCancelTasks(queryGroup, resourceType))
.flatMap(resourceType -> getTaskCancellations(queryGroup, resourceType).stream())
.collect(Collectors.toList());
}

private boolean shouldCancelTasks(QueryGroup queryGroup, ResourceType resourceType) {
return getReduceBy(queryGroup, resourceType) > 0;
}

private List<TaskCancellation> getTaskCancellations(QueryGroup queryGroup, ResourceType resourceType) {
List<Task> selectedTasksToCancel = defaultTaskSelectionStrategy.selectTasksForCancellation(
queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getActiveTasks(),
getReduceBy(queryGroup, resourceType),
resourceType
);
List<TaskCancellation> taskCancellations = new ArrayList<>();
for (Task task : selectedTasksToCancel) {
String cancellationReason = createCancellationReason(queryGroup, task, resourceType);
taskCancellations.add(createTaskCancellation((CancellableTask) task, cancellationReason));
}
return taskCancellations;
}

private String createCancellationReason(QueryGroup querygroup, Task task, ResourceType resourceType) {
Double thresholdInPercent = getThresholdInPercent(querygroup, resourceType);
return "[Workload Management] Cancelling Task ID : "
+ task.getId()
+ " from QueryGroup ID : "
+ querygroup.get_id()
+ " breached the resource limit of : "
+ thresholdInPercent
+ " for resource type : "
+ resourceType.getName();
}

private Double getThresholdInPercent(QueryGroup querygroup, ResourceType resourceType) {
return ((Double) (querygroup.getResourceLimits().get(resourceType))) * 100;
}

private TaskCancellation createTaskCancellation(CancellableTask task, String cancellationReason) {
return new TaskCancellation(task, List.of(new TaskCancellation.Reason(cancellationReason, 5)), List.of(this::callbackOnCancel));
}

protected List<TaskCancellation> getTaskCancellationsForDeletedQueryGroup(QueryGroup queryGroup) {
List<Task> tasks = defaultTaskSelectionStrategy.selectTasksFromDeletedQueryGroup(
queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getActiveTasks()
);
List<TaskCancellation> taskCancellations = new ArrayList<>();
for (Task task : tasks) {
String cancellationReason = "[Workload Management] Cancelling Task ID : "
+ task.getId()
+ " from QueryGroup ID : "
+ queryGroup.get_id();
taskCancellations.add(createTaskCancellation((CancellableTask) task, cancellationReason));
}
return taskCancellations;
}

private long getReduceBy(QueryGroup queryGroup, ResourceType resourceType) {
if (queryGroup.getResourceLimits().get(resourceType) == null) {
return 0;
}
Double threshold = (Double) queryGroup.getResourceLimits().get(resourceType);
return getResourceUsage(queryGroup, resourceType) - convertThresholdIntoLong(resourceType, threshold);
}

private Long convertThresholdIntoLong(ResourceType resourceType, Double resourceThresholdInPercentage) {
Long threshold = null;
if (resourceType == ResourceType.MEMORY) {
// Check if resource usage is breaching the threshold
double nodeLevelCancellationThreshold = this.workloadManagementSettings.getNodeLevelMemoryCancellationThreshold()
* HEAP_SIZE_BYTES;
threshold = (long) (resourceThresholdInPercentage * nodeLevelCancellationThreshold);
} else if (resourceType == ResourceType.CPU) {
// Get the total CPU time of the process in milliseconds
long cpuTotalTimeInMillis = ProcessProbe.getInstance().getProcessCpuTotalTime();
double nodeLevelCancellationThreshold = this.workloadManagementSettings.getNodeLevelCpuCancellationThreshold()
* cpuTotalTimeInMillis;
// Check if resource usage is breaching the threshold
threshold = (long) (resourceThresholdInPercentage * nodeLevelCancellationThreshold);
}
Comment on lines +246 to +252
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct way, since we are taking process's total CPU time which could be well lets say in hours/days/months magnitude while on the other the hand, tasks may only be running since last couple milliseconds or even minutes in worst case before the request times out.

This will not refelect actual nano/milli seconds worth of a percentage. We are also not considering the number of processors available in this calculation as well

return threshold;
}

private Long getResourceUsage(QueryGroup queryGroup, ResourceType resourceType) {
if (!queryGroupLevelResourceUsageViews.containsKey(queryGroup.get_id())) {
return 0L;
}
return queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getResourceUsageData().get(resourceType);
}

private boolean isBreachingThreshold(ResourceType resourceType, Double resourceThresholdInPercentage, long resourceUsage) {
kiranprakash154 marked this conversation as resolved.
Show resolved Hide resolved
if (resourceType == ResourceType.MEMORY) {
// Check if resource usage is breaching the threshold
return resourceUsage > convertThresholdIntoLong(resourceType, resourceThresholdInPercentage);
}
// Resource types should be CPU, resourceUsage is in nanoseconds, convert to milliseconds
long resourceUsageInMillis = resourceUsage / 1_000_000;
// Check if resource usage is breaching the threshold
return resourceUsageInMillis > convertThresholdIntoLong(resourceType, resourceThresholdInPercentage);
}

private void callbackOnCancel() {
// TODO Implement callback logic here mostly used for Stats
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.cancellation;

import org.opensearch.search.ResourceType;
kiranprakash154 marked this conversation as resolved.
Show resolved Hide resolved
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Represents an abstract task selection strategy.
* This class implements the DefaultTaskSelectionStrategy interface and provides a method to select tasks for cancellation based on a sorting condition.
* The specific sorting condition depends on the implementation.
*/
public class DefaultTaskSelectionStrategy {

/**
* Returns a comparator that defines the sorting condition for tasks.
* This is the default implementation since the longest running tasks are the ones that consume the most resources.
*
* @return The comparator
*/
public Comparator<Task> sortingCondition() {
return Comparator.comparingLong(Task::getStartTime);
}

/**
* Selects tasks for cancellation based on the provided limit and resource type.
* The tasks are sorted based on the sorting condition and then selected until the accumulated resource usage reaches the limit.
*
* @param tasks The list of tasks from which to select
* @param limit The limit on the accumulated resource usage
* @param resourceType The type of resource to consider
* @return The list of selected tasks
* @throws IllegalArgumentException If the limit is less than zero
*/
public List<Task> selectTasksForCancellation(List<Task> tasks, long limit, ResourceType resourceType) {
if (limit < 0) {
throw new IllegalArgumentException("limit has to be greater than zero");
}
if (limit == 0) {
return Collections.emptyList();
}

List<Task> sortedTasks = tasks.stream().sorted(sortingCondition()).collect(Collectors.toList());

List<Task> selectedTasks = new ArrayList<>();
long accumulated = 0;
for (Task task : sortedTasks) {
if (task instanceof CancellableTask) {
selectedTasks.add(task);
accumulated += resourceType.getResourceUsage(task);
if (accumulated >= limit) {
break;
}
}
}
return selectedTasks;
}

/**
* Selects tasks for cancellation from deleted query group.
* This method iterates over the provided list of tasks and selects those that are instances of
* {@link CancellableTask}. For each selected task, it creates a cancellation reason and adds
* a {@link TaskCancellation} object to the list of selected tasks.
*
* @param tasks The list of {@link Task} objects to be evaluated for cancellation.
* @return A list of {@link TaskCancellation} objects representing the tasks selected for cancellation.
*/
public List<Task> selectTasksFromDeletedQueryGroup(List<Task> tasks) {
return tasks.stream().filter(task -> task instanceof CancellableTask).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Workload management resource based cancellation artifacts
*/
package org.opensearch.wlm.cancellation;
Loading
Loading