From d89de65f34f44dd21e49941bee0be63fa905be14 Mon Sep 17 00:00:00 2001 From: Ketan Verma Date: Tue, 2 Aug 2022 00:07:13 +0530 Subject: [PATCH] Improved error handling and simplified task resource tracking completion listener Signed-off-by: Ketan Verma --- .../main/java/org/opensearch/tasks/Task.java | 57 +++++++------------ .../org/opensearch/tasks/TaskManager.java | 29 ++++++---- .../tasks/TaskResourceTrackingListener.java | 42 -------------- .../node/tasks/ResourceAwareTasksTests.java | 11 +++- 4 files changed, 47 insertions(+), 92 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/tasks/TaskResourceTrackingListener.java diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index e501f68be0f5b..4f6724041902d 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -34,7 +34,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionResponse; +import org.opensearch.action.NotifyOnceListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.NamedWriteable; import org.opensearch.common.xcontent.ToXContent; @@ -47,7 +49,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -80,9 +81,7 @@ public class Task { private final Map> resourceStats; - private final List resourceTrackingListeners; - - private final AtomicBoolean isResourceTrackingCompleted = new AtomicBoolean(false); + private final List> resourceTrackingCompletionListeners; /** * Keeps track of the number of active resource tracking threads for this task. It is initialized to 1 to track @@ -112,7 +111,7 @@ public Task(long id, String type, String action, String description, TaskId pare System.nanoTime(), headers, new ConcurrentHashMap<>(), - Collections.synchronizedList(new ArrayList<>()) + new ArrayList<>() ); } @@ -126,7 +125,7 @@ public Task( long startTimeNanos, Map headers, ConcurrentHashMap> resourceStats, - List resourceTrackingListeners + List> resourceTrackingCompletionListeners ) { this.id = id; this.type = type; @@ -137,7 +136,7 @@ public Task( this.startTimeNanos = startTimeNanos; this.headers = headers; this.resourceStats = resourceStats; - this.resourceTrackingListeners = resourceTrackingListeners; + this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners; } /** @@ -319,13 +318,6 @@ public void startThreadResourceTracking(long threadId, ResourceStatsType statsTy } threadResourceInfoList.add(new ThreadResourceInfo(threadId, statsType, resourceUsageMetrics)); incrementResourceTrackingThreads(); - resourceTrackingListeners.forEach(listener -> { - try { - listener.onTaskExecutionStartedOnThread(this, threadId); - } catch (Exception e) { - logger.warn("failure in listener during handling of onTaskExecutionStartedOnThread", e); - } - }); } /** @@ -343,13 +335,6 @@ public void updateThreadResourceStats(long threadId, ResourceStatsType statsType // the active entry present in the list is updated if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); - resourceTrackingListeners.forEach(listener -> { - try { - listener.onTaskResourceStatsUpdated(this); - } catch (Exception e) { - logger.warn("failure in listener during handling of onTaskResourceStatsUpdated", e); - } - }); return; } } @@ -373,14 +358,6 @@ public void stopThreadResourceTracking(long threadId, ResourceStatsType statsTyp threadResourceInfo.setActive(false); threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); decrementResourceTrackingThreads(); - resourceTrackingListeners.forEach(listener -> { - try { - listener.onTaskExecutionFinishedOnThread(this, threadId); - } catch (Exception e) { - logger.warn("failure in listener during handling of onTaskExecutionFinishedOnThread", e); - } - }); - return; } } @@ -433,10 +410,10 @@ public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOE } /** - * Registers a TaskResourceTrackingListener callback listener on this task. + * Registers a task resource tracking completion listener on this task. */ - public void addTaskResourceTrackingListener(TaskResourceTrackingListener listener) { - resourceTrackingListeners.add(listener); + public void addResourceTrackingCompletionListener(NotifyOnceListener listener) { + resourceTrackingCompletionListeners.add(listener); } /** @@ -466,14 +443,20 @@ public int incrementResourceTrackingThreads() { public int decrementResourceTrackingThreads() { int count = numActiveResourceTrackingThreads.decrementAndGet(); - if (count == 0 && isResourceTrackingCompleted.compareAndSet(false, true)) { - resourceTrackingListeners.forEach(listener -> { + if (count == 0) { + List listenerExceptions = new ArrayList<>(); + resourceTrackingCompletionListeners.forEach(listener -> { try { - listener.onTaskResourceTrackingCompleted(this); - } catch (Exception e) { - logger.warn("failure in listener during handling of onTaskResourceTrackingCompleted", e); + listener.onResponse(this); + } catch (Exception e1) { + try { + listener.onFailure(e1); + } catch (Exception e2) { + listenerExceptions.add(e2); + } } }); + ExceptionsHelper.maybeThrowRuntimeAndSuppress(listenerExceptions); } return count; diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index b98f1e44df349..95f2ef0c4d731 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -44,6 +44,7 @@ import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionResponse; +import org.opensearch.action.NotifyOnceListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.node.DiscoveryNode; @@ -85,7 +86,7 @@ * * @opensearch.internal */ -public class TaskManager implements ClusterStateApplier, TaskResourceTrackingListener { +public class TaskManager implements ClusterStateApplier { private static final Logger logger = LogManager.getLogger(TaskManager.class); @@ -153,13 +154,29 @@ public Task register(String type, String action, TaskAwareRequest request) { } } Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); - task.addTaskResourceTrackingListener(this); Objects.requireNonNull(task); assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId"; if (logger.isTraceEnabled()) { logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription()); } + if (task.supportsResourceTracking()) { + task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() { + @Override + protected void innerOnResponse(Task task) { + // Stop tracking the task once the last thread has been marked inactive. + if (taskResourceTrackingService.get() != null && task.supportsResourceTracking()) { + taskResourceTrackingService.get().stopTracking(task); + } + } + + @Override + protected void innerOnFailure(Exception e) { + ExceptionsHelper.reThrowIfNotNull(e); + } + }); + } + if (task instanceof CancellableTask) { registerCancellableTask(task); } else { @@ -700,12 +717,4 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea throw new IllegalStateException("TaskCancellationService is not initialized"); } } - - @Override - public void onTaskResourceTrackingCompleted(Task task) { - // Stop tracking the task once the last thread has been marked inactive. - if (taskResourceTrackingService.get() != null && task.supportsResourceTracking()) { - taskResourceTrackingService.get().stopTracking(task); - } - } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingListener.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingListener.java deleted file mode 100644 index e29c6ec8a0256..0000000000000 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.tasks; - -/** - * Listener for events related to resource tracking of a Task. - */ -public interface TaskResourceTrackingListener { - - /** - * Invoked when a task execution is started on a thread. - * @param task object - * @param threadId on which execution of task started - */ - default void onTaskExecutionStartedOnThread(Task task, long threadId) {} - - /** - * Invoked when a task execution is finished on a thread. - * @param task object - * @param threadId on which execution of task finished. - */ - default void onTaskExecutionFinishedOnThread(Task task, long threadId) {} - - /** - * Invoked when a task's resource stats are updated. - * @param task object - */ - default void onTaskResourceStatsUpdated(Task task) {} - - /** - * Invoked when a task's resource tracking is completed. - * This happens when all of task's threads are marked inactive. - * @param task object - */ - default void onTaskResourceTrackingCompleted(Task task) {} -} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/ResourceAwareTasksTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/ResourceAwareTasksTests.java index c7f53754d015c..c269bf6e7fa77 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/ResourceAwareTasksTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/ResourceAwareTasksTests.java @@ -11,6 +11,7 @@ import com.sun.management.ThreadMXBean; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; +import org.opensearch.action.NotifyOnceListener; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -28,7 +29,6 @@ import org.opensearch.tasks.TaskCancelledException; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; -import org.opensearch.tasks.TaskResourceTrackingListener; import org.opensearch.test.tasks.MockTaskManager; import org.opensearch.test.tasks.MockTaskManagerListener; import org.opensearch.threadpool.ThreadPool; @@ -185,11 +185,16 @@ protected void doRun() { // operationFinishedValidator will be called just after all task threads are marked inactive and // the task is unregistered. if (taskTestContext.operationFinishedValidator != null) { - task.addTaskResourceTrackingListener(new TaskResourceTrackingListener() { + task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() { @Override - public void onTaskResourceTrackingCompleted(Task task) { + protected void innerOnResponse(Task task) { taskTestContext.operationFinishedValidator.accept(task, threadId.get()); } + + @Override + protected void innerOnFailure(Exception e) { + ExceptionsHelper.reThrowIfNotNull(e); + } }); }