Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
NUMBER_TASKS_SUBMITTED("tasks", false),
NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false);
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);


private final String _brokerMeterName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.exception;

public class NoTaskScheduledException extends RuntimeException {
public NoTaskScheduledException(String message) {
super(message);
}

public NoTaskScheduledException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.exception;

public class TaskAlreadyExistsException extends RuntimeException {
public TaskAlreadyExistsException(String message) {
super(message);
}

public TaskAlreadyExistsException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.exception;

public class UnknownTaskTypeException extends RuntimeException {
public UnknownTaskTypeException(String message) {
super(message);
}

public UnknownTaskTypeException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,21 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.quartz.CronTrigger;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
Expand All @@ -55,6 +63,8 @@
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -67,6 +77,7 @@
* <li>GET '/tasks/task/{taskName}/state': Get the task state for the given task</li>
* <li>GET '/tasks/task/{taskName}/config': Get the task config (a list of child task configs) for the given task</li>
* <li>POST '/tasks/schedule': Schedule tasks</li>
* <li>POST '/tasks/execute': Execute an adhoc task</li>
* <li>PUT '/tasks/{taskType}/cleanup': Clean up finished tasks (COMPLETED, FAILED) for the given task type</li>
* <li>PUT '/tasks/{taskType}/stop': Stop all running/pending tasks (as well as the task queue) for the given task
* type</li>
Expand All @@ -78,6 +89,8 @@
@Api(tags = Constants.TASK_TAG)
@Path("/")
public class PinotTaskRestletResource {
public static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskRestletResource.class);

private static final String TASK_QUEUE_STATE_STOP = "STOP";
private static final String TASK_QUEUE_STATE_RESUME = "RESUME";

Expand Down Expand Up @@ -366,6 +379,33 @@ public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryPa
}
}

@POST
@Path("/tasks/execute")
@Authenticate(AccessType.CREATE)
@ApiOperation("Execute a task on minion")
public Map<String, String> executeAdhocTask(AdhocTaskConfig adhocTaskConfig) {
try {
return _pinotTaskManager.createTask(adhocTaskConfig.getTaskType(), adhocTaskConfig.getTableName(),
adhocTaskConfig.getTaskName(), adhocTaskConfig.getTaskConfigs());
} catch (TableNotFoundException e) {
throw new ControllerApplicationException(LOGGER, "Failed to find table: " + adhocTaskConfig.getTableName(),
Response.Status.NOT_FOUND, e);
} catch (TaskAlreadyExistsException e) {
throw new ControllerApplicationException(LOGGER, "Task already exists: " + adhocTaskConfig.getTaskName(),
Response.Status.CONFLICT, e);
} catch (UnknownTaskTypeException e) {
throw new ControllerApplicationException(LOGGER, "Unknown task type: " + adhocTaskConfig.getTaskType(),
Response.Status.NOT_FOUND, e);
} catch (NoTaskScheduledException e) {
throw new ControllerApplicationException(LOGGER,
"No task is generated for table: " + adhocTaskConfig.getTableName() + ", with task type: "
+ adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
"Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e), Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

@Deprecated
@PUT
@Path("/tasks/scheduletasks")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
Expand All @@ -44,6 +45,7 @@
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.pinot.common.utils.DateTimeUtils;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.slf4j.Logger;
Expand Down Expand Up @@ -232,7 +234,29 @@ public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, St
Preconditions.checkState(numConcurrentTasksPerInstance > 0);

String taskType = pinotTaskConfigs.get(0).getTaskType();
String parentTaskName = TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + System.currentTimeMillis();
String parentTaskName =
getParentTaskName(taskType, UUID.randomUUID() + "_" + System.currentTimeMillis());
return submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag, taskTimeoutMs,
numConcurrentTasksPerInstance);
}

/**
* Submit a list of child tasks with same task type to the Minion instances with the given tag.
*
* @param parentTaskName Parent task name to be submitted
* @param pinotTaskConfigs List of child task configs to be submitted
* @param minionInstanceTag Tag of the Minion instances to submit the task to
* @param taskTimeoutMs Timeout in milliseconds for each task
* @param numConcurrentTasksPerInstance Maximum number of concurrent tasks allowed per instance
* @return Name of the submitted parent task
*/
public synchronized String submitTask(String parentTaskName, List<PinotTaskConfig> pinotTaskConfigs,
String minionInstanceTag, long taskTimeoutMs, int numConcurrentTasksPerInstance) {
int numChildTasks = pinotTaskConfigs.size();
Preconditions.checkState(numChildTasks > 0);
Preconditions.checkState(numConcurrentTasksPerInstance > 0);

String taskType = pinotTaskConfigs.get(0).getTaskType();
LOGGER
.info("Submitting parent task: {} of type: {} with {} child task configs: {} to Minion instances with tag: {}",
parentTaskName, taskType, numChildTasks, pinotTaskConfigs, minionInstanceTag);
Expand Down Expand Up @@ -347,7 +371,11 @@ public synchronized Set<String> getTasksInProgress(String taskType) {
*/
public synchronized TaskState getTaskState(String taskName) {
String taskType = getTaskType(taskName);
return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
if (workflowContext == null) {
throw new UnknownTaskTypeException("Workflow context for task type doesn't exist: " + taskType);
}
return workflowContext.getJobState(getHelixJobName(taskName));
}

/**
Expand Down Expand Up @@ -611,6 +639,10 @@ private static String getTaskType(String name) {
return name.split(TASK_NAME_SEPARATOR)[1];
}

public String getParentTaskName(String taskType, String taskName) {
return TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + taskName;
}

@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "subtaskInfos"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class TaskDebugInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,31 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkChildListener;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
Expand Down Expand Up @@ -128,6 +135,71 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
}
}

public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
Map<String, String> taskConfigs)
throws Exception {
if (taskName == null) {
taskName = tableName + "_" + UUID.randomUUID();
LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
}
String minionInstanceTag =
taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
if (taskState != null) {
throw new TaskAlreadyExistsException(
"Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
}
List<String> tableNameWithTypes = new ArrayList<>();
if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
tableNameWithTypes.add(offlineTableName);
}
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
tableNameWithTypes.add(realtimeTableName);
}
} else {
if (_pinotHelixResourceManager.hasTable(tableName)) {
tableNameWithTypes.add(tableName);
}
}
if (tableNameWithTypes.isEmpty()) {
throw new TableNotFoundException("'tableName' " + tableName + " is not found");
}

PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
// Generate each type of tasks
if (taskGenerator == null) {
throw new UnknownTaskTypeException(
"Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName);
}
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);

// responseMap holds the table to task name mapping.
Map<String, String> responseMap = new HashMap<>();
for (String tableNameWithType : tableNameWithTypes) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, tableNameWithType);
List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(tableConfig, taskConfigs);
if (pinotTaskConfigs.isEmpty()) {
LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
continue;
}
LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
responseMap.put(tableNameWithType,
_helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag,
taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance()));
}
if (responseMap.isEmpty()) {
throw new NoTaskScheduledException("No task scheduled for 'tableName': " + tableName);
}
return responseMap;
}

private class ZkTableConfigChangeListener implements IZkChildListener {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.minion.generator;

import java.util.List;
import java.util.Map;
import org.apache.helix.task.JobConfig;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,4 +73,10 @@ public int getNumConcurrentTasksPerInstance() {
}
return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}

@Override
public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
throws Exception {
throw new UnknownTaskTypeException("Adhoc task generation is not supported for task type - " + this.getTaskType());
}
}
Loading