Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support task resource tracking in OpenSearch #3982

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed a race-condition when Task is unregistered before its threads a…
…re stopped

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Jul 30, 2022
commit 3190e4469ed1cac36cc4924e8d89ed5c4fe440c9
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Current task information
Expand Down Expand Up @@ -74,6 +77,8 @@ public class Task {

private final Map<Long, List<ThreadResourceInfo>> resourceStats;

private final Phaser resourceTrackingThreadsBarrier = new Phaser(1);

/**
* The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
*/
Expand Down Expand Up @@ -288,6 +293,7 @@ public void startThreadResourceTracking(long threadId, ResourceStatsType statsTy
}
}
threadResourceInfoList.add(new ThreadResourceInfo(threadId, statsType, resourceUsageMetrics));
resourceTrackingThreadsBarrier.register();
}

/**
Expand Down Expand Up @@ -327,6 +333,7 @@ public void stopThreadResourceTracking(long threadId, ResourceStatsType statsTyp
if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) {
threadResourceInfo.setActive(false);
threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics);
resourceTrackingThreadsBarrier.arriveAndDeregister();
return;
}
}
Expand Down Expand Up @@ -377,4 +384,17 @@ public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOE
throw new IllegalStateException("response has to implement ToXContent to be able to store the results");
}
}

/**
* Awaits for stopThreadResourceTracking to be called for all threads of the current task.
* @throws InterruptedException if thread interrupted while waiting
* @throws TimeoutException if timed out while waiting
*/
public void awaitResourceTrackingThreadsCompletion() throws InterruptedException, TimeoutException {
resourceTrackingThreadsBarrier.awaitAdvanceInterruptibly(
resourceTrackingThreadsBarrier.arriveAndDeregister(),
10L,
TimeUnit.SECONDS
);
ketanv3 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void stopTracking(Task task) {
taskExecutionFinishedOnThread(task.getId(), Thread.currentThread().getId());
}

task.awaitResourceTrackingThreadsCompletion();
List<Long> threadsWorkingOnTask = getThreadsWorkingOnTask(task);
if (threadsWorkingOnTask.size() > 0) {
logger.warn("No thread should be active when task finishes. Active threads: {}", threadsWorkingOnTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.ThreadResourceInfo;
import org.opensearch.test.tasks.MockTaskManager;
import org.opensearch.test.tasks.MockTaskManagerListener;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -42,7 +41,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -215,13 +214,9 @@ protected void doRun() {
} finally {
if (taskTestContext.operationFinishedValidator != null) {
try {
// Wait until Task.stopThreadResourceTracking is called and threads are marked inactive
// before performing validation checks.
assertTrue(
"threads should not be marked active when task finishes",
waitUntil(() -> !hasActiveThreads(task), 5, TimeUnit.SECONDS)
);
} catch (InterruptedException ignored) {}
// Wait for threads to be marked inactive before performing validation checks.
task.awaitResourceTrackingThreadsCompletion();
} catch (InterruptedException | TimeoutException ignored) {}
taskTestContext.operationFinishedValidator.accept(threadId.get());
}
}
Expand Down Expand Up @@ -655,16 +650,4 @@ private void assertMemoryUsageWithinLimits(long actual, long expected) {
long maxOverhead = Math.min(200000, expected * 5 / 100);
assertThat(actual, lessThanOrEqualTo(expected + maxOverhead));
}

private boolean hasActiveThreads(Task task) {
for (List<ThreadResourceInfo> threadResourceInfos : task.getResourceStats().values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfos) {
if (threadResourceInfo.isActive()) {
return true;
}
}
}

return false;
}
}