Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand All @@ -37,6 +38,7 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
Expand Down Expand Up @@ -205,6 +207,14 @@ public StringResultResponse getTaskStateDeprecated(
return new StringResultResponse(_pinotHelixTaskResourceManager.getTaskState(taskName).toString());
}

@GET
@Path("/tasks/subtask/{taskName}/state")
@ApiOperation("Get the states of all the sub tasks for the given task")
public Map<String, TaskPartitionState> getSubtaskStates(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
return _pinotHelixTaskResourceManager.getSubtaskStates(taskName);
}

@GET
@Path("/tasks/task/{taskName}/config")
@ApiOperation("Get the task config (a list of child task configs) for the given task")
Expand All @@ -222,6 +232,16 @@ public List<PinotTaskConfig> getTaskConfigsDeprecated(
return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
}

@GET
@Path("/tasks/subtask/{taskName}/config")
@ApiOperation("Get the configs of specified sub tasks for the given task")
public Map<String, PinotTaskConfig> getSubtaskConfigs(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName,
@ApiParam(value = "Sub task names separated by comma") @QueryParam("subtaskNames") @Nullable
String subtaskNames) {
return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, subtaskNames);
}

@GET
@Path("/tasks/scheduler/information")
@ApiOperation("Fetch cron scheduler information")
Expand Down Expand Up @@ -428,6 +448,18 @@ public SuccessResponse deleteTasks(
return new SuccessResponse("Successfully deleted tasks for task type: " + taskType);
}

@DELETE
@Path("/tasks/task/{taskName}")
@Authenticate(AccessType.DELETE)
@ApiOperation("Delete a single task given its task name")
public SuccessResponse deleteTask(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName,
@ApiParam(value = "Whether to force deleting the task (expert only option, enable with cautious")
@DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) {
_pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
return new SuccessResponse("Successfully deleted task: " + taskName);
}

@Deprecated
@DELETE
@Path("/tasks/taskqueue/{taskType}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
Expand Down Expand Up @@ -162,6 +165,25 @@ public synchronized void deleteTaskQueue(String taskType, boolean forceDelete) {
_taskDriver.delete(helixJobQueueName, forceDelete);
}

/**
* Delete a single task from the task queue. The task queue should be
* stopped before deleting the task, otherwise it fails with exception.
*
* @param taskName the task to delete from the queue.
* @param forceDelete as said in helix comment, if set true, all job's related zk nodes will
* be clean up from zookeeper even if its workflow information can not be found.
*/
public synchronized void deleteTask(String taskName, boolean forceDelete) {
String taskType = getTaskType(taskName);
String helixJobQueueName = getHelixJobQueueName(taskType);
if (forceDelete) {
LOGGER.warn("Force deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType);
} else {
LOGGER.info("Deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType);
}
_taskDriver.deleteJob(helixJobQueueName, taskName, forceDelete);
}

/**
* Get all task queues.
*
Expand Down Expand Up @@ -328,6 +350,33 @@ public synchronized TaskState getTaskState(String taskName) {
return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
}

/**
* Get states of all the sub tasks for a given task.
*
* @param taskName the task whose sub tasks to check
* @return states of all the sub tasks
*/
public synchronized Map<String, TaskPartitionState> getSubtaskStates(String taskName) {
String taskType = getTaskType(taskName);
WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
if (workflowContext == null) {
return Collections.emptyMap();
}
String helixJobName = getHelixJobName(taskName);
JobContext jobContext = _taskDriver.getJobContext(helixJobName);
if (jobContext == null) {
return Collections.emptyMap();
}
Map<String, TaskPartitionState> subtaskStates = new HashMap<>();
Set<Integer> partitionSet = jobContext.getPartitionSet();
for (int partition : partitionSet) {
String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
TaskPartitionState partitionState = jobContext.getPartitionState(partition);
subtaskStates.put(taskIdForPartition, partitionState);
}
return subtaskStates;
}

/**
* Get the child task configs for the given task name.
*
Expand All @@ -344,6 +393,33 @@ public synchronized List<PinotTaskConfig> getTaskConfigs(String taskName) {
return taskConfigs;
}

/**
* Get configs of the specified sub task for a given task.
*
* @param taskName the task whose sub tasks to check
* @param subtaskNames the sub tasks to check
* @return the configs of the sub tasks
*/
public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskName, @Nullable String subtaskNames) {
JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
if (jobConfig == null) {
return Collections.emptyMap();
}
Map<String, TaskConfig> helixTaskConfigs = jobConfig.getTaskConfigMap();
Map<String, PinotTaskConfig> taskConfigs = new HashMap<>(helixTaskConfigs.size());
if (StringUtils.isEmpty(subtaskNames)) {
helixTaskConfigs.forEach((sub, cfg) -> taskConfigs.put(sub, PinotTaskConfig.fromHelixTaskConfig(cfg)));
return taskConfigs;
}
for (String subtaskName : StringUtils.split(subtaskNames, ',')) {
TaskConfig taskConfig = helixTaskConfigs.get(subtaskName);
if (taskConfig != null) {
taskConfigs.put(subtaskName, PinotTaskConfig.fromHelixTaskConfig(taskConfig));
}
}
return taskConfigs;
}

/**
* Helper method to return a map of task names to corresponding task state
* where the task corresponds to the given Pinot table name. This is used to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void setUp()
// Set task timeout in cluster config
PinotHelixResourceManager helixResourceManager = _controllerStarter.getHelixResourceManager();
helixResourceManager.getHelixAdmin().setConfig(
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
helixResourceManager.getHelixClusterName()).build(),
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
.forCluster(helixResourceManager.getHelixClusterName()).build(),
Collections.singletonMap(TASK_TYPE + MinionConstants.TIMEOUT_MS_KEY_SUFFIX, Long.toString(600_000L)));

// Add 3 offline tables, where 2 of them have TestTask enabled
Expand All @@ -106,9 +106,7 @@ private void verifyTaskCount(String task, int errors, int waiting, int running,
// Wait for at most 10 seconds for Helix to generate the tasks
TestUtils.waitForCondition((aVoid) -> {
PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
return taskCount.getError() == errors
&& taskCount.getWaiting() == waiting
&& taskCount.getRunning() == running
return taskCount.getError() == errors && taskCount.getWaiting() == waiting && taskCount.getRunning() == running
&& taskCount.getTotal() == total;
}, 10_000L, "Failed to reach expected task count");
}
Expand Down Expand Up @@ -199,19 +197,25 @@ public void testStopResumeDeleteTaskQueue() {
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0,
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");

// Task deletion requires the task queue to be stopped,
// so deleting task1 here before resuming the task queue.
assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
_helixTaskResourceManager.deleteTask(task1, false);
// Resume the task queue, and let the task complete
_helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
HOLD.set(false);

// Wait at most 60 seconds for all tasks COMPLETED
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
assertEquals(taskStates.size(), NUM_TASKS);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.COMPLETED) {
return false;
}
}
// Task deletion happens eventually along with other state transitions.
assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
assertEquals(taskStates.size(), (NUM_TASKS - 1));
assertTrue(TASK_START_NOTIFIED.get());
assertTrue(TASK_SUCCESS_NOTIFIED.get());
assertTrue(TASK_CANCELLED_NOTIFIED.get());
Expand All @@ -223,7 +227,7 @@ public void testStopResumeDeleteTaskQueue() {
TestUtils.waitForCondition(
input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS,
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == (NUM_TASKS - 1),
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");

// Delete the task queue
Expand Down