Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,17 @@ public String getTaskGenerationDebugInto(
// Relying on original schema that was used to query the controller
URI uri = _uriInfo.getRequestUri();
String scheme = uri.getScheme();
List<String> controllerUrls = controllers.stream().map(controller -> {
return String.format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(),
Integer.parseInt(controller.getPort()), tableNameWithType, taskType);
}).collect(Collectors.toList());
List<String> controllerUrls = controllers.stream().map(controller -> String
.format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(),
Integer.parseInt(controller.getPort()), tableNameWithType, taskType)).collect(Collectors.toList());

CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, HashBiMap.create(0));
Map<String, String> requestHeaders = new HashMap<>();
httpHeaders.getRequestHeaders().keySet().forEach(header -> {
requestHeaders.put(header, httpHeaders.getHeaderString(header));
});
LOGGER.debug("Getting task generation info with controllerUrls: {}", controllerUrls);
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(controllerUrls, null, true, requestHeaders, 10000);

Expand Down Expand Up @@ -386,7 +386,15 @@ public Map<String, TaskPartitionState> getSubtaskStates(
@ApiOperation("Get the task config (a list of child task configs) for the given task")
public List<PinotTaskConfig> getTaskConfigs(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
}

@GET
@Path("/tasks/task/{taskName}/runtime/config")
@ApiOperation("Get the task runtime config for the given task")
public Map<String, String> getTaskConfig(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
return _pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
}

@Deprecated
Expand All @@ -395,7 +403,7 @@ public List<PinotTaskConfig> getTaskConfigs(
@ApiOperation("Get the task config (a list of child task configs) for the given task (deprecated)")
public List<PinotTaskConfig> getTaskConfigsDeprecated(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public Map<String, TaskState> getTaskStates(String taskType) {
* @return List of child task configs
*/
public List<PinotTaskConfig> getTaskConfigs(String taskName) {
return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public synchronized Map<String, TaskPartitionState> getSubtaskStates(String task
* @param taskName Task name
* @return List of child task configs
*/
public synchronized List<PinotTaskConfig> getTaskConfigs(String taskName) {
public synchronized List<PinotTaskConfig> getSubtaskConfigs(String taskName) {
Collection<TaskConfig> helixTaskConfigs =
_taskDriver.getJobConfig(getHelixJobName(taskName)).getTaskConfigMap().values();
List<PinotTaskConfig> taskConfigs = new ArrayList<>(helixTaskConfigs.size());
Expand All @@ -444,6 +444,23 @@ public synchronized List<PinotTaskConfig> getTaskConfigs(String taskName) {
return taskConfigs;
}

/**
* Get the task runtime config for the given task name. A task can have multiple subtasks, whose configs can be
* retrieved via the getSubtaskConfigs() method instead.
*
* @param taskName Task name
* @return Configs for the task returned as a Map.
*/
public synchronized Map<String, String> getTaskRuntimeConfig(String taskName) {
JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
HashMap<String, String> configs = new HashMap<>();
configs.put("ConcurrentTasksPerWorker", String.valueOf(jobConfig.getNumConcurrentTasksPerInstance()));
configs.put("TaskTimeoutMs", String.valueOf(jobConfig.getTimeoutPerTask()));
configs.put("TaskExpireTimeMs", String.valueOf(jobConfig.getExpiry()));
configs.put("MinionWorkerGroupTag", jobConfig.getInstanceGroupTag());
return configs;
}

/**
* Get configs of the specified sub task for a given task.
*
Expand Down Expand Up @@ -567,7 +584,7 @@ public synchronized Map<String, TaskState> getTaskStatesByTable(String taskType,
String taskName = taskState.getKey();

// Iterate through all task configs associated with this task name
for (PinotTaskConfig taskConfig : getTaskConfigs(taskName)) {
for (PinotTaskConfig taskConfig : getSubtaskConfigs(taskName)) {
Map<String, String> pinotConfigs = taskConfig.getConfigs();

// Filter task configs that matches this table name
Expand Down Expand Up @@ -650,7 +667,7 @@ public synchronized Map<String, TaskDebugInfo> getTasksDebugInfoByTable(String t
String pinotTaskName = getPinotTaskName(helixJobName);

// Iterate through all task configs associated with this task name
for (PinotTaskConfig taskConfig : getTaskConfigs(pinotTaskName)) {
for (PinotTaskConfig taskConfig : getSubtaskConfigs(pinotTaskName)) {
Map<String, String> pinotConfigs = taskConfig.getConfigs();

// Filter task configs that matches this table name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void testSingleLevelConcat()
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
Expand Down Expand Up @@ -393,7 +393,7 @@ public void testSingleLevelRollup()
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), 1);
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
Expand Down Expand Up @@ -541,7 +541,7 @@ public void testMultiLevelConcat()
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private void startCleanup() {
LOGGER.info("Configured to clean up task event observers immediately");
return;
}
LOGGER.info("Configured to clean up task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs);
_cleanupExecutor.submit(() -> {
LOGGER.info("Start to cleanup task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs);
while (!Thread.interrupted()) {
Expand Down Expand Up @@ -97,7 +98,13 @@ public static void init(MinionConf config, ExecutorService executorService) {
}

public static MinionEventObservers getInstance() {
return _customInstance != null ? _customInstance : DEFAULT_INSTANCE;
if (_customInstance != null) {
return _customInstance;
}
// Test code might reach here, but this should never happen in prod case, as instance is created upon worker
// starts before any tasks can run. But log something for debugging just in case.
LOGGER.warn("Using default MinionEventObservers instance");
return DEFAULT_INSTANCE;
}

public MinionEventObserver getMinionEventObserver(String taskId) {
Expand Down