Skip to content

♻️ common http API interface for long_running_tasks #7843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
07dbc67
changing visibility
Jun 5, 2025
4c689a2
fixed imports
Jun 5, 2025
7977cd7
fixed imports
Jun 5, 2025
d7f04d1
removed unrequired
Jun 5, 2025
f50e77d
refactor imports
Jun 5, 2025
4cfa96d
refactor more imports
Jun 5, 2025
421f8cf
refactor
Jun 5, 2025
e737641
fixed import
Jun 6, 2025
ada5069
fixed broken imports
Jun 6, 2025
7070b66
fixed imports
Jun 6, 2025
61daf54
fixeed imports
Jun 6, 2025
2e752e4
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jun 6, 2025
75aa300
added missing specs
Jun 6, 2025
d943710
removed unused code
Jun 6, 2025
6a6677c
remove unrequired
Jun 6, 2025
47e5667
removed unused
Jun 6, 2025
e2dfb66
Merge remote-tracking branch 'upstream/master' into long-running-refa…
Jun 10, 2025
87dec05
reverterd interface change
Jun 10, 2025
ebe6a43
reverted
Jun 10, 2025
f630a39
updated openapi specs
Jun 10, 2025
757be74
using commond part to format responses
Jun 10, 2025
8497191
minor refacto
Jun 10, 2025
866656d
removed unused
Jun 10, 2025
2ee9203
refactor to use common constants
Jun 10, 2025
d886db0
using common renaming
Jun 10, 2025
e7df051
Merge remote-tracking branch 'upstream/master' into long-running-refa…
Jun 11, 2025
ecd1987
fixed imports
Jun 11, 2025
ea17b89
removed unused
Jun 11, 2025
0578e5c
refacto tasks
Jun 11, 2025
a4660de
minor
Jun 11, 2025
70a6921
revert changes
Jun 11, 2025
8374de5
fixed specs
Jun 11, 2025
221e051
fixed failing tests
Jun 11, 2025
fd50810
fixed imports
Jun 11, 2025
08a65e7
pylint
Jun 11, 2025
c30f80d
removed some constants
Jun 11, 2025
87b6b45
moved module
Jun 11, 2025
8d894ef
rename module
Jun 11, 2025
d2dbc95
import from correct place
Jun 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def cancel_async_job(

@router.get(
"/tasks/{task_id}/result",
response_model=Any,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enveloped or not?

name="get_task_result",
description="Retrieves the result of a task",
responses=_export_data_responses,
Expand Down
3 changes: 2 additions & 1 deletion api/specs/web-server/_long_running_tasks_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# pylint: disable=too-many-arguments


from typing import Annotated
from typing import Annotated, Any

from fastapi import APIRouter, Depends, status
from models_library.generics import Envelope
Expand Down Expand Up @@ -54,6 +54,7 @@ def cancel_and_delete_task(
@router.get(
"/{task_id}/result",
name="get_task_result",
response_model=Any,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enveloped or not?

description="Retrieves the result of a task",
)
def get_task_result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ class TaskResult(BaseModel):
error: Any | None


class TaskGet(BaseModel):
class TaskGetWithoutHref(BaseModel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit of a weird naming.
the base class defines what the inheritated class has?
maybe just Task ? or TaskBase ?

task_id: TaskId
task_name: str


class TaskGet(TaskGetWithoutHref):
status_href: str
result_href: str
abort_href: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
)

# NOTE: figure out how to remove these and expose them differently if possible
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disguised TODO? who is going to do something about it?



def get_tasks_manager(app: web.Application) -> TasksManager:
output: TasksManager = app[APP_LONG_RUNNING_TASKS_MANAGER_KEY]
Expand All @@ -17,7 +19,3 @@ def get_tasks_manager(app: web.Application) -> TasksManager:
def get_task_context(request: web.Request) -> dict[str, Any]:
output: dict[str, Any] = request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]
return output


def create_task_name_from_request(request: web.Request) -> str:
return f"{request.method} {request.rel_url}"
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import logging
from typing import Any

from aiohttp import web
from common_library.json_serialization import json_dumps
from pydantic import BaseModel
from servicelib.aiohttp import status
from servicelib.aiohttp.rest_responses import create_data_response

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relative import

from ...long_running_tasks.errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks import http_endpoint_responses
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
from ...long_running_tasks.task import TrackedTask
from ..requests_validation import parse_request_path_parameters_as
from ._dependencies import get_task_context, get_tasks_manager

_logger = logging.getLogger(__name__)
routes = web.RouteTableDef()


Expand All @@ -24,24 +21,17 @@ class _PathParam(BaseModel):
async def list_tasks(request: web.Request) -> web.Response:
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)
tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks(
with_task_context=task_context
)

return web.json_response(
{
"data": [
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in tracked_tasks
]
},
dumps=json_dumps,
return create_data_response(
[
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in http_endpoint_responses.list_tasks(tasks_manager, task_context)
]
)


Expand All @@ -51,10 +41,10 @@ async def get_task_status(request: web.Request) -> web.Response:
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)

task_status: TaskStatus = tasks_manager.get_task_status(
task_id=path_params.task_id, with_task_context=task_context
task_status: TaskStatus = http_endpoint_responses.get_task_status(
tasks_manager, task_context, path_params.task_id
)
return web.json_response({"data": task_status}, dumps=json_dumps)
return create_data_response(task_status)


@routes.get("/{task_id}/result", name="get_task_result")
Expand All @@ -64,37 +54,17 @@ async def get_task_result(request: web.Request) -> web.Response | Any:
task_context = get_task_context(request)

# NOTE: this might raise an exception that will be catched by the _error_handlers
try:
task_result = tasks_manager.get_task_result(
task_id=path_params.task_id, with_task_context=task_context
)
# NOTE: this will fail if the task failed for some reason....
await tasks_manager.remove_task(
path_params.task_id, with_task_context=task_context, reraise_errors=False
)
return task_result
except (TaskNotFoundError, TaskNotCompletedError):
raise
except Exception:
# the task shall be removed in this case
await tasks_manager.remove_task(
path_params.task_id, with_task_context=task_context, reraise_errors=False
)
raise
return await http_endpoint_responses.get_task_result(
tasks_manager, task_context, path_params.task_id
)


@routes.delete("/{task_id}", name="cancel_and_delete_task")
async def cancel_and_delete_task(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)
await tasks_manager.remove_task(path_params.task_id, with_task_context=task_context)
await http_endpoint_responses.remove_task(
tasks_manager, task_context, path_params.task_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this one not returning a response?

)
return web.json_response(status=status.HTTP_204_NO_CONTENT)


__all__: tuple[str, ...] = (
"get_tasks_manager",
"TaskId",
"TaskGet",
"TaskStatus",
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import asyncio
import datetime
import logging
from collections.abc import AsyncGenerator, Callable
from functools import wraps
from typing import Any

from aiohttp import web
from common_library.json_serialization import json_dumps
from pydantic import AnyHttpUrl, PositiveFloat, TypeAdapter
from pydantic import AnyHttpUrl, TypeAdapter

from ...aiohttp import status
from ...long_running_tasks.constants import (
DEFAULT_STALE_TASK_CHECK_INTERVAL,
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
)
from ...long_running_tasks.models import TaskGet
from ...long_running_tasks.task import (
TaskContext,
Expand All @@ -20,10 +25,9 @@
from . import _routes
from ._constants import (
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
MINUTE,
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
)
from ._dependencies import create_task_name_from_request, get_tasks_manager
from ._dependencies import get_tasks_manager
from ._error_handlers import base_long_running_error_handler

_logger = logging.getLogger(__name__)
Expand All @@ -42,6 +46,10 @@ async def _wrap(request: web.Request):
return _wrap


def _create_task_name_from_request(request: web.Request) -> str:
return f"{request.method} {request.rel_url}"


async def start_long_running_task(
# NOTE: positional argument are suffixed with "_" to avoid name conflicts with "task_kwargs" keys
request_: web.Request,
Expand All @@ -52,7 +60,7 @@ async def start_long_running_task(
**task_kwargs: Any,
) -> web.Response:
task_manager = get_tasks_manager(request_.app)
task_name = create_task_name_from_request(request_)
task_name = _create_task_name_from_request(request_)
task_id = None
try:
task_id = start_task(
Expand Down Expand Up @@ -121,8 +129,8 @@ def setup(
router_prefix: str,
handler_check_decorator: Callable = no_ops_decorator,
task_request_context_decorator: Callable = no_task_context_decorator,
stale_task_check_interval_s: PositiveFloat = 1 * MINUTE,
stale_task_detect_timeout_s: PositiveFloat = 5 * MINUTE,
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
) -> None:
"""
- `router_prefix` APIs are mounted on `/...`, this
Expand All @@ -135,21 +143,23 @@ def setup(
"""

async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
# add error handlers
app.middlewares.append(base_long_running_error_handler)

# add components to state
app[APP_LONG_RUNNING_TASKS_MANAGER_KEY] = long_running_task_manager = (
TasksManager(
stale_task_check_interval_s=stale_task_check_interval_s,
stale_task_detect_timeout_s=stale_task_detect_timeout_s,
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
)
)

# add error handlers
app.middlewares.append(base_long_running_error_handler)
await long_running_task_manager.setup()

yield

# cleanup
await long_running_task_manager.close()
await long_running_task_manager.teardown()

# add routing (done at setup-time)
_wrap_and_add_routes(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import logging
from collections.abc import AsyncGenerator
from typing import Any
from datetime import timedelta
from typing import Any, Final

from aiohttp import ClientConnectionError, ClientSession
from pydantic import PositiveFloat
from tenacity import TryAgain, retry
from tenacity.asyncio import AsyncRetrying
from tenacity.before_sleep import before_sleep_log
Expand All @@ -12,7 +14,7 @@
from tenacity.wait import wait_random_exponential
from yarl import URL

from ...long_running_tasks._constants import DEFAULT_POLL_INTERVAL_S, HOUR
from ...long_running_tasks.constants import DEFAULT_POLL_INTERVAL_S
from ...long_running_tasks.models import (
LRTask,
RequestBody,
Expand All @@ -26,6 +28,7 @@

_logger = logging.getLogger(__name__)

_DEFAULT_CLIENT_TIMEOUT_S: Final[PositiveFloat] = timedelta(hours=1).total_seconds()

_DEFAULT_AIOHTTP_RETRY_POLICY: dict[str, Any] = {
"retry": retry_if_exception_type(ClientConnectionError),
Expand All @@ -49,7 +52,7 @@ async def _wait_for_completion(
session: ClientSession,
task_id: TaskId,
status_url: URL,
client_timeout: int,
client_timeout: PositiveFloat,
) -> AsyncGenerator[TaskProgress, None]:
try:
async for attempt in AsyncRetrying(
Expand Down Expand Up @@ -98,7 +101,7 @@ async def long_running_task_request(
session: ClientSession,
url: URL,
json: RequestBody | None = None,
client_timeout: int = 1 * HOUR,
client_timeout: PositiveFloat = _DEFAULT_CLIENT_TIMEOUT_S,
) -> AsyncGenerator[LRTask, None]:
"""Will use the passed `ClientSession` to call an oSparc long
running task `url` passing `json` as request body.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
"""

from ._dependencies import (
create_task_name_from_request,
get_task_context,
get_tasks_manager,
)
from ._server import setup, start_long_running_task

__all__: tuple[str, ...] = (
"create_task_name_from_request",
"get_task_context",
"get_tasks_manager",
"setup",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import functools
import logging
import warnings
from collections.abc import Awaitable, Callable
from typing import Any, Final

Expand Down Expand Up @@ -181,16 +180,6 @@ async def cancel_and_delete_task(
timeout=timeout,
)

if result.status_code == status.HTTP_200_OK:
warnings.warn(
"returning a 200 when cancelling a task has been deprecated with PR#3236"
"and will be removed after 11.2022"
"please do close your studies at least once before that date, so that the dy-sidecar"
"get replaced",
category=DeprecationWarning,
)
return

if result.status_code not in (
status.HTTP_204_NO_CONTENT,
status.HTTP_404_NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ async def base_long_running_error_handler(
_: Request, exception: BaseLongRunningError
) -> JSONResponse:
_logger.debug("%s", exception, stack_info=True)
error_fields = dict(code=exception.code, message=f"{exception}")
error_fields = {"code": exception.code, "message": f"{exception}"}
status_code = (
status.HTTP_404_NOT_FOUND
if isinstance(exception, (TaskNotFoundError, TaskNotCompletedError))
if isinstance(exception, TaskNotFoundError | TaskNotCompletedError)
Copy link
Preview

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the union operator (|) in an isinstance() call is not valid; please replace it with a tuple, e.g., isinstance(exception, (TaskNotFoundError, TaskNotCompletedError)).

Suggested change
if isinstance(exception, TaskNotFoundError | TaskNotCompletedError)
if isinstance(exception, (TaskNotFoundError, TaskNotCompletedError))

Copilot uses AI. Check for mistakes.

else status.HTTP_400_BAD_REQUEST
)
return JSONResponse(content=jsonable_encoder(error_fields), status_code=status_code)
Loading
Loading