Skip to content

Commit

Permalink
[PLAT-5603] Add a way to cache task runtime data while the task is ru…
Browse files Browse the repository at this point in the history
…nning

Summary: We are going make changes to report transient, runtime progress data of task like data move % completion, number of tablets moved etc. This enables caching the data to the task while it is running.

Test Plan: Unit test + trivial change

Reviewers: amalyshev, shagarwal

Reviewed By: shagarwal

Subscribers: jenkins-bot, yugaware

Differential Revision: https://phabricator.dev.yugabyte.com/D19867
  • Loading branch information
nkhogen committed Sep 29, 2022
1 parent e588946 commit 485d32a
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.typesafe.config.Config;
import com.yugabyte.yw.commissioner.TaskExecutor.RunnableTask;
import com.yugabyte.yw.commissioner.TaskExecutor.SubTaskGroup;
import com.yugabyte.yw.commissioner.TaskExecutor.TaskCache;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
import com.yugabyte.yw.common.ConfigHelper;
import com.yugabyte.yw.common.PlatformExecutorFactory;
Expand All @@ -24,6 +25,7 @@
import com.yugabyte.yw.common.services.YBClientService;
import com.yugabyte.yw.forms.ITaskParams;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import com.yugabyte.yw.models.TaskInfo;
import com.yugabyte.yw.models.Universe.UniverseUpdater;
import com.yugabyte.yw.models.helpers.NodeDetails;
import com.yugabyte.yw.models.helpers.NodeStatus;
Expand All @@ -49,6 +51,9 @@ public abstract class AbstractTaskBase implements ITask {
// The threadpool on which the tasks are executed.
protected ExecutorService executor;

// The UUID of this task.
protected UUID taskUUID;

// The UUID of the top-level user-facing task at the top of Task tree. Eg. CreateUniverse, etc.
protected UUID userTaskUUID;

Expand Down Expand Up @@ -129,6 +134,11 @@ public void createThreadpool() {
executor = platformExecutorFactory.createExecutor("task", namedThreadFactory);
}

@Override
public void setTaskUUID(UUID taskUUID) {
this.taskUUID = taskUUID;
}

@Override
public void setUserTaskUUID(UUID userTaskUUID) {
this.userTaskUUID = userTaskUUID;
Expand Down Expand Up @@ -221,4 +231,16 @@ protected SubTaskGroup createSubTaskGroup(String name, SubTaskGroupType subTaskG
protected void waitFor(Duration duration) {
getRunnableTask().waitFor(duration);
}

protected UUID getUserTaskUUID() {
return userTaskUUID;
}

protected UUID getTaskUUID() {
return taskUUID;
}

protected TaskCache getTaskCache() {
return getRunnableTask().getTaskCache();
}
}
7 changes: 7 additions & 0 deletions managed/src/main/java/com/yugabyte/yw/commissioner/ITask.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ default void onFailure(TaskInfo taskInfo, Throwable cause) {}
*/
JsonNode getTaskDetails();

/**
* Sets the UUID info of the task. E.g subtask UUID. It is invoked by the task executor.
*
* @param taskUUID the task UUID.
*/
void setTaskUUID(UUID taskUUID);

/**
* Set the user-facing top-level task for the Task tree that this Task belongs to. E.g.
* CreateUniverse, EditUniverse, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,35 @@ public String toString() {
}
}

/** A simple cache for caching task runtime data. */
public static class TaskCache {
private final Map<String, JsonNode> data = new ConcurrentHashMap<>();

public void put(String key, JsonNode value) {
data.put(key, value);
}

public JsonNode get(String key) {
return data.get(key);
}

public Set<String> keys() {
return data.keySet();
}

public JsonNode delete(String key) {
return data.remove(key);
}

public void clear() {
data.clear();
}

public int size() {
return data.size();
}
}

/**
* Abstract implementation of a task runnable which handles the state update after the task has
* started running. Synchronization is on the this object for taskInfo.
Expand Down Expand Up @@ -782,6 +811,7 @@ public void run() {
}
setTaskState(TaskInfo.State.Running);
log.debug("Invoking run() of task {}", task.getName());
task.setTaskUUID(getTaskUUID());
task.run();
setTaskState(TaskInfo.State.Success);
} catch (CancellationException e) {
Expand Down Expand Up @@ -937,6 +967,8 @@ public class RunnableTask extends AbstractRunnableTask {
private final Queue<SubTaskGroup> subTaskGroups = new ConcurrentLinkedQueue<>();
// Latch for timed wait for this task.
private final CountDownLatch waiterLatch = new CountDownLatch(1);
// Cache for caching any runtime data when the task is being run.
private final TaskCache taskCache = new TaskCache();
// Current execution position of subtasks.
private int subTaskPosition = 0;
private AtomicReference<TaskExecutionListener> taskExecutionListenerRef =
Expand All @@ -958,6 +990,15 @@ public void setTaskExecutionListener(TaskExecutionListener taskExecutionListener
taskExecutionListenerRef.set(taskExecutionListener);
}

/**
* Get the task cache for caching any runtime data.
*
* @return the cache instance.
*/
public TaskCache getTaskCache() {
return taskCache;
}

/** Invoked by the ExecutorService. Do not invoke this directly. */
@Override
public void run() {
Expand All @@ -970,6 +1011,8 @@ public void run() {
} finally {
// Remove the task.
runnableTasks.remove(taskUUID);
// Empty the cache.
taskCache.clear();
// Update the customer task to a completed state.
CustomerTask customerTask = CustomerTask.findByTaskUUID(taskUUID);
if (customerTask != null) {
Expand Down
1 change: 0 additions & 1 deletion managed/src/main/java/com/yugabyte/yw/models/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import com.yugabyte.yw.commissioner.ITask.Abortable;
import com.yugabyte.yw.commissioner.TaskExecutor.RunnableTask;
import com.yugabyte.yw.commissioner.TaskExecutor.SubTaskGroup;
import com.yugabyte.yw.commissioner.TaskExecutor.TaskCache;
import com.yugabyte.yw.commissioner.TaskExecutor.TaskExecutionListener;
import com.yugabyte.yw.common.CustomWsClientFactory;
import com.yugabyte.yw.common.CustomWsClientFactoryProvider;
Expand Down Expand Up @@ -62,6 +64,7 @@
import org.mockito.junit.MockitoJUnitRunner;
import play.Application;
import play.inject.guice.GuiceApplicationBuilder;
import play.libs.Json;

@RunWith(MockitoJUnitRunner.class)
public class TaskExecutorTest extends PlatformGuiceApplicationBaseTest {
Expand Down Expand Up @@ -549,4 +552,56 @@ public void testRetryableAnnotation() {
.collect(Collectors.toSet());
assertEquals(RETRYABLE_TASKS, retryableTaskTypes);
}

@Test
public void testTaskCache() throws InterruptedException {
ITask task = mockTaskCommon(false);
ITask subTask = mockTaskCommon(false);
AtomicReference<UUID> taskUUIDRef = new AtomicReference<>();
doAnswer(
inv -> {
RunnableTask runnable = taskExecutor.getRunnableTask(taskUUIDRef.get());
TaskCache taskCache = runnable.getTaskCache();
// Retrieve the cached data in the subtask.
ObjectNode object = (ObjectNode) taskCache.get("key1");
assertEquals(2, object.size());
assertEquals("innerVal1", object.get("innerKey1").asText());
assertEquals("innerVal2", object.get("innerKey2").asText());
return null;
})
.when(subTask)
.run();
RunnableTask taskRunner = taskExecutor.createRunnableTask(task);
doAnswer(
inv -> {
RunnableTask runnable = taskExecutor.getRunnableTask(taskUUIDRef.get());
TaskCache taskCache = runnable.getTaskCache();
ObjectNode object = Json.newObject();
// Put in the cache in the parent task.
object.put("innerKey1", "innerVal1");
object.put("innerKey2", "innerVal2");
taskCache.put("key1", object);
// Invoke subTask from the parent task.
SubTaskGroup subTasksGroup = taskExecutor.createSubTaskGroup("test");
subTasksGroup.addSubTask(subTask);
runnable.addSubTaskGroup(subTasksGroup);
runnable.runSubTasks();
return null;
})
.when(task)
.run();

taskUUIDRef.set(taskRunner.getTaskUUID());
UUID taskUUID = taskExecutor.submit(taskRunner, Executors.newFixedThreadPool(1));
TaskInfo taskInfo = waitForTask(taskUUID);
List<TaskInfo> subTaskInfos = taskInfo.getSubTasks();
Map<Integer, List<TaskInfo>> subTasksByPosition =
subTaskInfos.stream().collect(Collectors.groupingBy(TaskInfo::getPosition));
assertEquals(1, subTasksByPosition.size());
verify(subTask, times(1)).run();
verify(task, times(1)).setTaskUUID(any());
verify(subTask, times(1)).setTaskUUID(any());
assertEquals(TaskInfo.State.Success, taskInfo.getTaskState());
assertEquals(TaskInfo.State.Success, subTaskInfos.get(0).getTaskState());
}
}

0 comments on commit 485d32a

Please sign in to comment.