Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 88 additions & 23 deletions core/Controller/TaskProcessingApiController.php
Original file line number Diff line number Diff line change
Expand Up @@ -532,29 +532,7 @@ public function cancelTask(int $taskId): DataResponse {
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next', root: '/taskprocessing')]
public function getNextScheduledTask(array $providerIds, array $taskTypeIds): DataResponse {
try {
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
try {
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
return null;
}
}, $taskTypeIds));

$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);

// restrict $providerIds to providers that are configured as preferred for the passed task types
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));

// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
try {
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
// no provider found for task type
return false;
}
return in_array($providerForTaskType, $possibleProviderIds, true);
}));
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);

if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
throw new NotFoundException();
Expand Down Expand Up @@ -596,6 +574,61 @@ public function getNextScheduledTask(array $providerIds, array $taskTypeIds): Da
}
}

/**
* Returns the next n scheduled tasks for the specified set of taskTypes and providers
* The returned tasks are capped at ~50MiB
*
* @param list<string> $providerIds The ids of the providers
* @param list<string> $taskTypeIds The ids of the task types
* @param int $numberOfTasks The number of tasks to return
* @return DataResponse<Http::STATUS_OK, array{tasks: list<array{task: CoreTaskProcessingTask, provider: string}>, has_more: bool}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR, array{message: string}, array{}>
*
* 200: Tasks returned
*/
#[ExAppRequired]
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next_batch', root: '/taskprocessing')]
public function getNextScheduledTaskBatch(array $providerIds, array $taskTypeIds, int $numberOfTasks = 1): DataResponse {
try {
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);

if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
return new DataResponse([
'tasks' => [],
'has_more' => false,
]);
}

$tasks = $this->taskProcessingManager->getNextScheduledTasks($possibleTaskTypeIds, numberOfTasks: $numberOfTasks + 1);
$tasksJson = [];
// Stop when $numberOfTasks is reached or the json payload is larger than 50MiB
while (count($tasks) > 0 && count($tasksJson) < $numberOfTasks && strlen(json_encode($tasks)) < 50 * 1024 * 1024) {
// Until we find a task whose task type is set to be provided by the providers requested with this request
// Or no scheduled task is found anymore (given the taskIds to ignore)
$task = array_shift($tasks);
try {
$provider = $this->taskProcessingManager->getPreferredProvider($task->getTaskTypeId());
if (in_array($provider->getId(), $possibleProviderIds, true)) {
if ($this->taskProcessingManager->lockTask($task)) {
$tasksJson[] = ['task' => $task->jsonSerialize(), 'provider' => $provider->getId()];
continue;
}
}
} catch (Exception) {
// There is no provider set for the task type of this task
// proceed to ignore this task
}
}
$hasMore = count($tasks) > 0;

return new DataResponse([
'tasks' => $tasksJson,
'has_more' => $hasMore,
]);
} catch (Exception) {
return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR);
}
}

/**
* @param resource $data
* @return int
Expand All @@ -611,4 +644,36 @@ private function setFileContentsInternal($data): int {
$file = $folder->newFile(time() . '-' . rand(1, 100000), $data);
return $file->getId();
}

/**
* @param array $taskTypeIds
* @param array $providerIds
* @return array
*/
private function intersectTaskTypesAndProviders(array $taskTypeIds, array $providerIds): array {
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
try {
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
return null;
}
}, $taskTypeIds));

$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);

// restrict $providerIds to providers that are configured as preferred for the passed task types
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));

// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
try {
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
// no provider found for task type
return false;
}
return in_array($providerForTaskType, $possibleProviderIds, true);
}));
return [$possibleProviderIds, $possibleTaskTypeIds];
}
}
217 changes: 217 additions & 0 deletions core/openapi-ex_app.json
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,223 @@
}
}
}
},
"/ocs/v2.php/taskprocessing/tasks_provider/next_batch": {
"get": {
"operationId": "task_processing_api-get-next-scheduled-task-batch",
"summary": "Returns the next n scheduled tasks for the specified set of taskTypes and providers The returned tasks are capped at ~50MiB",
"description": "This endpoint requires admin access",
"tags": [
"task_processing_api"
],
"security": [
{
"bearer_auth": []
},
{
"basic_auth": []
}
],
"parameters": [
{
"name": "providerIds[]",
"in": "query",
"description": "The ids of the providers",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "taskTypeIds[]",
"in": "query",
"description": "The ids of the task types",
"required": true,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name": "numberOfTasks",
"in": "query",
"description": "The number of tasks to return",
"schema": {
"type": "integer",
"format": "int64",
"default": 1
}
},
{
"name": "OCS-APIRequest",
"in": "header",
"description": "Required to be true for the API request to pass",
"required": true,
"schema": {
"type": "boolean",
"default": true
}
}
],
"responses": {
"200": {
"description": "Tasks returned",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"tasks",
"has_more"
],
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"required": [
"task",
"provider"
],
"properties": {
"task": {
"$ref": "#/components/schemas/TaskProcessingTask"
},
"provider": {
"type": "string"
}
}
}
},
"has_more": {
"type": "boolean"
}
}
}
}
}
}
}
}
}
},
"500": {
"description": "",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {
"type": "object",
"required": [
"message"
],
"properties": {
"message": {
"type": "string"
}
}
}
}
}
}
}
}
}
},
"401": {
"description": "Current user is not logged in",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
},
"403": {
"description": "Logged in account must be an admin",
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"ocs"
],
"properties": {
"ocs": {
"type": "object",
"required": [
"meta",
"data"
],
"properties": {
"meta": {
"$ref": "#/components/schemas/OCSMeta"
},
"data": {}
}
}
}
}
}
}
}
}
}
}
},
"tags": [
Expand Down
Loading
Loading