Skip to content

Commit

Permalink
Improved error handling and simplified task resource tracking complet…
Browse files Browse the repository at this point in the history
…ion listener

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Aug 1, 2022
1 parent a09a60a commit d89de65
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 92 deletions.
57 changes: 20 additions & 37 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.NotifyOnceListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteable;
import org.opensearch.common.xcontent.ToXContent;
Expand All @@ -47,7 +49,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -80,9 +81,7 @@ public class Task {

private final Map<Long, List<ThreadResourceInfo>> resourceStats;

private final List<TaskResourceTrackingListener> resourceTrackingListeners;

private final AtomicBoolean isResourceTrackingCompleted = new AtomicBoolean(false);
private final List<NotifyOnceListener<Task>> resourceTrackingCompletionListeners;

/**
* Keeps track of the number of active resource tracking threads for this task. It is initialized to 1 to track
Expand Down Expand Up @@ -112,7 +111,7 @@ public Task(long id, String type, String action, String description, TaskId pare
System.nanoTime(),
headers,
new ConcurrentHashMap<>(),
Collections.synchronizedList(new ArrayList<>())
new ArrayList<>()
);
}

Expand All @@ -126,7 +125,7 @@ public Task(
long startTimeNanos,
Map<String, String> headers,
ConcurrentHashMap<Long, List<ThreadResourceInfo>> resourceStats,
List<TaskResourceTrackingListener> resourceTrackingListeners
List<NotifyOnceListener<Task>> resourceTrackingCompletionListeners
) {
this.id = id;
this.type = type;
Expand All @@ -137,7 +136,7 @@ public Task(
this.startTimeNanos = startTimeNanos;
this.headers = headers;
this.resourceStats = resourceStats;
this.resourceTrackingListeners = resourceTrackingListeners;
this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners;
}

/**
Expand Down Expand Up @@ -319,13 +318,6 @@ public void startThreadResourceTracking(long threadId, ResourceStatsType statsTy
}
threadResourceInfoList.add(new ThreadResourceInfo(threadId, statsType, resourceUsageMetrics));
incrementResourceTrackingThreads();
resourceTrackingListeners.forEach(listener -> {
try {
listener.onTaskExecutionStartedOnThread(this, threadId);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskExecutionStartedOnThread", e);
}
});
}

/**
Expand All @@ -343,13 +335,6 @@ public void updateThreadResourceStats(long threadId, ResourceStatsType statsType
// the active entry present in the list is updated
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
resourceTrackingListeners.forEach(listener -> {
try {
listener.onTaskResourceStatsUpdated(this);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskResourceStatsUpdated", e);
}
});
return;
}
}
Expand All @@ -373,14 +358,6 @@ public void stopThreadResourceTracking(long threadId, ResourceStatsType statsTyp
threadResourceInfo.setActive(false);
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
decrementResourceTrackingThreads();
resourceTrackingListeners.forEach(listener -> {
try {
listener.onTaskExecutionFinishedOnThread(this, threadId);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskExecutionFinishedOnThread", e);
}
});

return;
}
}
Expand Down Expand Up @@ -433,10 +410,10 @@ public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOE
}

/**
* Registers a TaskResourceTrackingListener callback listener on this task.
* Registers a task resource tracking completion listener on this task.
*/
public void addTaskResourceTrackingListener(TaskResourceTrackingListener listener) {
resourceTrackingListeners.add(listener);
public void addResourceTrackingCompletionListener(NotifyOnceListener<Task> listener) {
resourceTrackingCompletionListeners.add(listener);
}

/**
Expand Down Expand Up @@ -466,14 +443,20 @@ public int incrementResourceTrackingThreads() {
public int decrementResourceTrackingThreads() {
int count = numActiveResourceTrackingThreads.decrementAndGet();

if (count == 0 && isResourceTrackingCompleted.compareAndSet(false, true)) {
resourceTrackingListeners.forEach(listener -> {
if (count == 0) {
List<Exception> listenerExceptions = new ArrayList<>();
resourceTrackingCompletionListeners.forEach(listener -> {
try {
listener.onTaskResourceTrackingCompleted(this);
} catch (Exception e) {
logger.warn("failure in listener during handling of onTaskResourceTrackingCompleted", e);
listener.onResponse(this);
} catch (Exception e1) {
try {
listener.onFailure(e1);
} catch (Exception e2) {
listenerExceptions.add(e2);
}
}
});
ExceptionsHelper.maybeThrowRuntimeAndSuppress(listenerExceptions);
}

return count;
Expand Down
29 changes: 19 additions & 10 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.NotifyOnceListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -85,7 +86,7 @@
*
* @opensearch.internal
*/
public class TaskManager implements ClusterStateApplier, TaskResourceTrackingListener {
public class TaskManager implements ClusterStateApplier {

private static final Logger logger = LogManager.getLogger(TaskManager.class);

Expand Down Expand Up @@ -153,13 +154,29 @@ public Task register(String type, String action, TaskAwareRequest request) {
}
}
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
task.addTaskResourceTrackingListener(this);
Objects.requireNonNull(task);
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
if (logger.isTraceEnabled()) {
logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());
}

if (task.supportsResourceTracking()) {
task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
@Override
protected void innerOnResponse(Task task) {
// Stop tracking the task once the last thread has been marked inactive.
if (taskResourceTrackingService.get() != null && task.supportsResourceTracking()) {
taskResourceTrackingService.get().stopTracking(task);
}
}

@Override
protected void innerOnFailure(Exception e) {
ExceptionsHelper.reThrowIfNotNull(e);
}
});
}

if (task instanceof CancellableTask) {
registerCancellableTask(task);
} else {
Expand Down Expand Up @@ -700,12 +717,4 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea
throw new IllegalStateException("TaskCancellationService is not initialized");
}
}

@Override
public void onTaskResourceTrackingCompleted(Task task) {
// Stop tracking the task once the last thread has been marked inactive.
if (taskResourceTrackingService.get() != null && task.supportsResourceTracking()) {
taskResourceTrackingService.get().stopTracking(task);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.sun.management.ThreadMXBean;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.NotifyOnceListener;
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
Expand All @@ -28,7 +29,6 @@
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskResourceTrackingListener;
import org.opensearch.test.tasks.MockTaskManager;
import org.opensearch.test.tasks.MockTaskManagerListener;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -185,11 +185,16 @@ protected void doRun() {
// operationFinishedValidator will be called just after all task threads are marked inactive and
// the task is unregistered.
if (taskTestContext.operationFinishedValidator != null) {
task.addTaskResourceTrackingListener(new TaskResourceTrackingListener() {
task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
@Override
public void onTaskResourceTrackingCompleted(Task task) {
protected void innerOnResponse(Task task) {
taskTestContext.operationFinishedValidator.accept(task, threadId.get());
}

@Override
protected void innerOnFailure(Exception e) {
ExceptionsHelper.reThrowIfNotNull(e);
}
});
}

Expand Down

0 comments on commit d89de65

Please sign in to comment.