Skip to content

Commit

Permalink
Merge pull request #47522 from nextcloud/enh/noid/taskprocessing-runtask
Browse files Browse the repository at this point in the history
[TaskProcessing] Add manager::runTask method
  • Loading branch information
julien-nc authored Aug 27, 2024
2 parents 9754f4f + 396b8f5 commit 558877c
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 83 deletions.
164 changes: 122 additions & 42 deletions lib/private/TaskProcessing/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -716,55 +716,69 @@ public function scheduleTask(Task $task): void {
if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}
$taskTypes = $this->getAvailableTaskTypes();
$inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape'];
$inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults'];
$inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues'];
$optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape'];
$optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues'];
$optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults'];
// validate input
$this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
$this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
// authenticate access to mentioned files
$ids = [];
foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var list<int>|int $inputSlot */
$inputSlot = $task->getInput()[$key];
if (is_array($inputSlot)) {
$ids += $inputSlot;
} else {
$ids[] = $inputSlot;
}
}
}
foreach ($ids as $fileId) {
$this->validateFileId($fileId);
$this->validateUserAccessToFile($fileId, $task->getUserId());
}
// remove superfluous keys and set input
$input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
$inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
$task->setInput($inputWithDefaults);
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$task->setScheduledAt(time());
$provider = $this->getPreferredProvider($task->getTaskTypeId());
// calculate expected completion time
$completionExpectedAt = new \DateTime('now');
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
$task->setCompletionExpectedAt($completionExpectedAt);
// create a db entity and insert into db table
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->insert($taskEntity);
// make sure the scheduler knows the id
$task->setId($taskEntity->getId());
$this->storeTask($task);
// schedule synchronous job if the provider is synchronous
$provider = $this->getPreferredProvider($task->getTaskTypeId());
if ($provider instanceof ISynchronousProvider) {
$this->jobList->add(SynchronousBackgroundJob::class, null);
}
}

public function runTask(Task $task): Task {
if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}

$provider = $this->getPreferredProvider($task->getTaskTypeId());
if ($provider instanceof ISynchronousProvider) {
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$this->storeTask($task);
$this->processTask($task, $provider);
$task = $this->getTask($task->getId());
} else {
$this->scheduleTask($task);
// poll task
while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) {
sleep(1);
$task = $this->getTask($task->getId());
}
}
return $task;
}

public function processTask(Task $task, ISynchronousProvider $provider): bool {
try {
try {
$input = $this->prepareInputData($task);
} catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
$this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->setTaskResult($task->getId(), $e->getMessage(), null);
return false;
}
try {
$this->setTaskStatus($task, Task::STATUS_RUNNING);
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
} catch (ProcessingException $e) {
$this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->setTaskResult($task->getId(), $e->getMessage(), null);
return false;
} catch (\Throwable $e) {
$this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
$this->setTaskResult($task->getId(), $e->getMessage(), null);
return false;
}
$this->setTaskResult($task->getId(), null, $output);
} catch (NotFoundException $e) {
$this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
} catch (Exception $e) {
$this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
}
return true;
}

public function deleteTask(Task $task): void {
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->delete($taskEntity);
Expand Down Expand Up @@ -1095,6 +1109,72 @@ public function setTaskStatus(Task $task, int $status): void {
$this->taskMapper->update($taskEntity);
}

/**
* Validate input, fill input default values, set completionExpectedAt, set scheduledAt
*
* @param Task $task
* @return void
* @throws UnauthorizedException
* @throws ValidationException
* @throws \OCP\TaskProcessing\Exception\Exception
*/
private function prepareTask(Task $task): void {
$taskTypes = $this->getAvailableTaskTypes();
$taskType = $taskTypes[$task->getTaskTypeId()];
$inputShape = $taskType['inputShape'];
$inputShapeDefaults = $taskType['inputShapeDefaults'];
$inputShapeEnumValues = $taskType['inputShapeEnumValues'];
$optionalInputShape = $taskType['optionalInputShape'];
$optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues'];
$optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults'];
// validate input
$this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
$this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
// authenticate access to mentioned files
$ids = [];
foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var list<int>|int $inputSlot */
$inputSlot = $task->getInput()[$key];
if (is_array($inputSlot)) {
$ids += $inputSlot;
} else {
$ids[] = $inputSlot;
}
}
}
foreach ($ids as $fileId) {
$this->validateFileId($fileId);
$this->validateUserAccessToFile($fileId, $task->getUserId());
}
// remove superfluous keys and set input
$input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
$inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
$task->setInput($inputWithDefaults);
$task->setScheduledAt(time());
$provider = $this->getPreferredProvider($task->getTaskTypeId());
// calculate expected completion time
$completionExpectedAt = new \DateTime('now');
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
$task->setCompletionExpectedAt($completionExpectedAt);
}

/**
* Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param
*
* @param Task $task
* @return void
* @throws Exception
* @throws \JsonException
*/
private function storeTask(Task $task): void {
// create a db entity and insert into db table
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->insert($taskEntity);
// make sure the scheduler knows the id
$task->setId($taskEntity->getId());
}

/**
* @param array $output
* @param ShapeDescriptor[] ...$specs the specs that define which keys to keep
Expand Down
57 changes: 16 additions & 41 deletions lib/private/TaskProcessing/SynchronousBackgroundJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,8 @@
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\IJobList;
use OCP\BackgroundJob\QueuedJob;
use OCP\Files\GenericFileException;
use OCP\Files\NotPermittedException;
use OCP\Lock\LockedException;
use OCP\TaskProcessing\Exception\Exception;
use OCP\TaskProcessing\Exception\NotFoundException;
use OCP\TaskProcessing\Exception\ProcessingException;
use OCP\TaskProcessing\Exception\UnauthorizedException;
use OCP\TaskProcessing\Exception\ValidationException;
use OCP\TaskProcessing\IManager;
use OCP\TaskProcessing\ISynchronousProvider;
use OCP\TaskProcessing\Task;
Expand Down Expand Up @@ -57,46 +51,27 @@ protected function run($argument) {
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
continue;
}
try {
try {
$input = $this->taskProcessingManager->prepareInputData($task);
} catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
$this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
// Schedule again
$this->jobList->add(self::class, $argument);
return;
}
try {
$this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING);
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
} catch (ProcessingException $e) {
$this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
// Schedule again
$this->jobList->add(self::class, $argument);
return;
} catch (\Throwable $e) {
$this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
// Schedule again
$this->jobList->add(self::class, $argument);
return;
}
$this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
} catch (NotFoundException $e) {
$this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
} catch (Exception $e) {
$this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
if (!$this->taskProcessingManager->processTask($task, $provider)) {
// Schedule again
$this->jobList->add(self::class, $argument);
}
}

// check if this job needs to be scheduled again:
// if there is at least one preferred synchronous provider that has a scheduled task
$synchronousProviders = array_filter($providers, fn ($provider) =>
$provider instanceof ISynchronousProvider);
$taskTypes = array_values(array_map(fn ($provider) =>
$provider->getTaskTypeId(),
$synchronousProviders
));
$synchronousPreferredProviders = array_filter($synchronousProviders, function ($provider) {
$taskTypeId = $provider->getTaskTypeId();
$preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId);
return $provider->getId() === $preferredProvider->getId();
});
$taskTypes = array_values(
array_map(
fn ($provider) => $provider->getTaskTypeId(),
$synchronousPreferredProviders
)
);
$taskTypesWithTasks = array_filter($taskTypes, function ($taskType) {
try {
$this->taskProcessingManager->getNextScheduledTask([$taskType]);
Expand Down
27 changes: 27 additions & 0 deletions lib/public/TaskProcessing/IManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,33 @@ public function getAvailableTaskTypes(): array;
*/
public function scheduleTask(Task $task): void;

/**
* Run the task and return the finished task
*
* @param Task $task The task to run
* @return Task The result task
* @throws PreConditionNotMetException If no or not the requested provider was registered but this method was still called
* @throws ValidationException the given task input didn't pass validation against the task type's input shape and/or the providers optional input shape specs
* @throws Exception storing the task in the database failed
* @throws UnauthorizedException the user scheduling the task does not have access to the files used in the input
* @since 30.0.0
*/
public function runTask(Task $task): Task;

/**
* Process task with a synchronous provider
*
* Prepare task input data and run the process method of the provider
* This should only be used by OC\TaskProcessing\SynchronousBackgroundJob::run() and OCP\TaskProcessing\IManager::runTask()
*
* @param Task $task
* @param ISynchronousProvider $provider
* @return bool True if the task has run successfully
* @throws Exception
* @since 30.0.0
*/
public function processTask(Task $task, ISynchronousProvider $provider): bool;

/**
* Delete a task that has been scheduled before
*
Expand Down

0 comments on commit 558877c

Please sign in to comment.