Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support task resource tracking in OpenSearch #3982

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.tasks;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteable;
Expand All @@ -45,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -54,6 +57,8 @@
*/
public class Task {

private static final Logger logger = LogManager.getLogger(Task.class);

/**
* The request header to mark tasks with specific ids
*/
Expand All @@ -77,9 +82,13 @@ public class Task {

private final List<TaskResourceTrackingListener> resourceTrackingListeners;

// Keeps track of the number of active resource tracking threads for this task. It is initialized to 1 to track
// the task's own/self thread. When this value becomes 0, all threads have been marked inactive and the resource
// tracking can be stopped for this task.
private final AtomicBoolean isResourceTrackingCompleted = new AtomicBoolean(false);
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Keeps track of the number of active resource tracking threads for this task. It is initialized to 1 to track
* the task's own/self thread. When this value becomes 0, all threads have been marked inactive and the resource
* tracking can be stopped for this task.
*/
private final AtomicInteger numActiveResourceTrackingThreads = new AtomicInteger(1);

/**
Expand Down Expand Up @@ -310,7 +319,13 @@ public void startThreadResourceTracking(long threadId, ResourceStatsType statsTy
}
threadResourceInfoList.add(new ThreadResourceInfo(threadId, statsType, resourceUsageMetrics));
incrementResourceTrackingThreads();
resourceTrackingListeners.forEach(listener -> listener.onTaskExecutionStartedOnThread(this, threadId));
resourceTrackingListeners.forEach(listener -> {
try {
listener.onTaskExecutionStartedOnThread(this, threadId);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskExecutionStartedOnThread", e);
}
});
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -328,7 +343,13 @@ public void updateThreadResourceStats(long threadId, ResourceStatsType statsType
// the active entry present in the list is updated
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
resourceTrackingListeners.forEach(listener -> listener.onTaskResourceStatsUpdated(this));
resourceTrackingListeners.forEach(listener -> {
try {
listener.onTaskResourceStatsUpdated(this);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskResourceStatsUpdated", e);
}
});
return;
}
}
Expand All @@ -352,7 +373,14 @@ public void stopThreadResourceTracking(long threadId, ResourceStatsType statsTyp
threadResourceInfo.setActive(false);
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
decrementResourceTrackingThreads();
resourceTrackingListeners.forEach(listener -> listener.onTaskExecutionFinishedOnThread(this, threadId));
resourceTrackingListeners.forEach(listener -> {
try {
listener.onTaskExecutionFinishedOnThread(this, threadId);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskExecutionFinishedOnThread", e);
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
}
});

return;
}
}
Expand Down Expand Up @@ -422,15 +450,30 @@ public int incrementResourceTrackingThreads() {

/**
* Decrements the number of active resource tracking threads.
* When this value becomes zero, the onTaskResourceTrackingCompleted method is called on all registered listeners.
* This method is called when threads finish execution, and also when the task is unregistered (to mark the task's
* own thread as complete). When the active thread count becomes zero, the onTaskResourceTrackingCompleted method
* is called exactly once on all registered listeners.
*
* Since a task is unregistered after the message is processed, it implies that the threads responsible to produce
* the response must have started prior to it (i.e. startThreadResourceTracking called before unregister).
* This ensures that the number of active threads doesn't drop to zero pre-maturely.
*
* Rarely, some threads may even start execution after the task is unregistered. As resource stats are piggy-backed
* with the response, any thread usage info captured after the task is unregistered may be irrelevant.
*
* @return the number of active resource tracking threads.
*/
public int decrementResourceTrackingThreads() {
int count = numActiveResourceTrackingThreads.decrementAndGet();

if (count == 0) {
resourceTrackingListeners.forEach(listener -> listener.onTaskResourceTrackingCompleted(this));
if (count == 0 && isResourceTrackingCompleted.compareAndSet(false, true)) {
resourceTrackingListeners.forEach(listener -> {
try {
listener.onTaskResourceTrackingCompleted(this);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskResourceTrackingCompleted", e);
}
});
}

return count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void cancel(CancellableTask task, String reason, Runnable listener) {
public Task unregister(Task task) {
logger.trace("unregister task for id: {}", task.getId());

// Decrement the task's self-thread as a part of unregistration.
// Decrement the task's self-thread as part of unregistration.
task.decrementResourceTrackingThreads();

if (task instanceof CancellableTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,6 @@ public void stopTracking(Task task) {
if (isCurrentThreadWorkingOnTask(task)) {
taskExecutionFinishedOnThread(task.getId(), Thread.currentThread().getId());
}

List<Long> threadsWorkingOnTask = getThreadsWorkingOnTask(task);
if (threadsWorkingOnTask.size() > 0) {
logger.warn("No thread should be active when task finishes. Active threads: {}", threadsWorkingOnTask);
assert false : "No thread should be marked active when task finishes";
}
} catch (Exception e) {
logger.warn("Failed while trying to mark the task execution on current thread completed.", e);
assert false;
Expand Down Expand Up @@ -165,11 +159,10 @@ private void refreshResourceStats(Task resourceAwareTask) {
@Override
public void taskExecutionStartedOnThread(long taskId, long threadId) {
try {
if (resourceAwareTasks.containsKey(taskId)) {
final Task task = resourceAwareTasks.get(taskId);
if (task != null) {
logger.debug("Task execution started on thread. Task: {}, Thread: {}", taskId, threadId);

resourceAwareTasks.get(taskId)
.startThreadResourceTracking(threadId, WORKER_STATS, getResourceUsageMetricsForThread(threadId));
task.startThreadResourceTracking(threadId, WORKER_STATS, getResourceUsageMetricsForThread(threadId));
}
} catch (Exception e) {
logger.warn(new ParameterizedMessage("Failed to mark thread execution started for task: [{}]", taskId), e);
Expand All @@ -187,10 +180,10 @@ public void taskExecutionStartedOnThread(long taskId, long threadId) {
@Override
public void taskExecutionFinishedOnThread(long taskId, long threadId) {
try {
if (resourceAwareTasks.containsKey(taskId)) {
final Task task = resourceAwareTasks.get(taskId);
if (task != null) {
logger.debug("Task execution finished on thread. Task: {}, Thread: {}", taskId, threadId);
resourceAwareTasks.get(taskId)
.stopThreadResourceTracking(threadId, WORKER_STATS, getResourceUsageMetricsForThread(threadId));
task.stopThreadResourceTracking(threadId, WORKER_STATS, getResourceUsageMetricsForThread(threadId));
}
} catch (Exception e) {
logger.warn(new ParameterizedMessage("Failed to mark thread execution finished for task: [{}]", taskId), e);
Expand Down