Skip to content

Commit

Permalink
Issue #28 initial implementation of parallel requests (on /jobs)
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 4, 2023
1 parent b91e8cb commit 6ae502a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 18 deletions.
34 changes: 20 additions & 14 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,22 +599,28 @@ def __init__(
def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
jobs = []
federation_missing = set()
for con in self.backends:
with con.authenticated_from_request(request=flask.request, user=User(user_id)), \
TimingLogger(f"get_user_jobs: {con.id}", logger=_log.debug):

results = self.backends.request_parallel(
path="/jobs", method="GET", expected_status=[200], authenticated_from_request=flask.request
)
for backend_id, success, result in results:
if success:
try:
backend_jobs = con.list_jobs()
for job in result["jobs"]:
try:
job["id"] = JobIdMapping.get_aggregator_job_id(
backend_job_id=job["id"], backend_id=backend_id
)
jobs.append(BatchJobMetadata.from_api_dict(job))
except Exception as e:
_log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True)
except Exception as e:
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed to get job listing from backend {con.id!r}: {e!r}")
federation_missing.add(con.id)
backend_jobs = []
for job in backend_jobs:
try:
job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=con.id)
jobs.append(BatchJobMetadata.from_api_dict(job))
except Exception as e:
_log.error(f"get_user_jobs: skipping job with parse issue: {e!r}", exc_info=True)
_log.warning(f"Invalid job listing from backend {backend_id!r}: {e!r}")
federation_missing.add(backend_id)
else:
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed to get job listing from backend {backend_id!r}: {result!r}")
federation_missing.add(backend_id)

if self.partitioned_job_tracker:
for job in self.partitioned_job_tracker.list_user_jobs(user_id=user_id):
Expand Down
111 changes: 107 additions & 4 deletions src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
import collections
import concurrent.futures
import contextlib
import logging
import re
from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
Union,
)

import flask
import requests
from openeo import Connection
from openeo.capabilities import ComparableVersion
from openeo.rest.auth.auth import BearerAuth, OpenEoApiAuthBase
from openeo.rest.connection import RestApiConnection
from openeo.util import TimingLogger
from openeo_driver.backend import OidcProvider
from openeo_driver.errors import (
AuthenticationRequiredException,
Expand All @@ -31,6 +45,10 @@
_log = logging.getLogger(__name__)


# Type annotation aliases

BackendId = str

class LockedAuthException(InternalException):
def __init__(self):
super().__init__(message="Setting auth while locked.")
Expand Down Expand Up @@ -105,8 +123,11 @@ def _build_oidc_provider_map(self, configured_providers: List[OidcProvider]) ->
def get_oidc_provider_map(self) -> Dict[str, str]:
return self._oidc_provider_map

def _get_bearer(self, request: flask.Request) -> str:
"""Extract authorization header from request and (optionally) transform for given backend """
def extract_bearer(self, request: flask.Request) -> str:
"""
Extract authorization header from flask request
and (optionally) transform for current backend.
"""
if "Authorization" not in request.headers:
raise AuthenticationRequiredException
auth = request.headers["Authorization"]
Expand Down Expand Up @@ -134,7 +155,7 @@ def authenticated_from_request(
Context manager to temporarily authenticate upstream connection based on current incoming flask request.
"""
self._auth_locked = False
self.auth = BearerAuth(bearer=self._get_bearer(request=request))
self.auth = BearerAuth(bearer=self.extract_bearer(request=request))
# TODO store and use `user` object?
try:
yield self
Expand Down Expand Up @@ -256,6 +277,9 @@ def get_connections(self) -> List[BackendConnection]:

return self._connections_cache.connections

def __len__(self) -> int:
return len(self._backend_urls)

def __iter__(self) -> Iterator[BackendConnection]:
return iter(self.get_connections())

Expand Down Expand Up @@ -320,6 +344,85 @@ def map(
# TODO: customizable exception handling: skip, warn, re-raise?
yield con.id, res

def request_parallel(
self,
path: str,
*,
method: str = "GET",
parse_json: bool = True,
authenticated_from_request: Optional[flask.Request] = None,
expected_status: Union[int, Iterable[int], None] = None,
request_timeout: float = 5,
overall_timeout: float = 8,
max_workers=5,
) -> List[Tuple[BackendId, bool, Any]]:
"""
Request a given (relative) url on each backend in parallel
:param path: relative (openEO) path to request
:return:
"""

def do_request(
root_url: str,
path: str,
*,
method: str = "GET",
headers: Optional[dict] = None,
auth: Optional[str] = None,
) -> Union[dict, bytes]:
"""Isolated request, to behanled by future."""
with TimingLogger(title=f"request_parallel {method} {path} on {root_url}", logger=_log):
con = RestApiConnection(root_url=root_url)
resp = con.request(
method=method,
path=path,
headers=headers,
auth=auth,
timeout=request_timeout,
expected_status=expected_status,
)
if parse_json:
return resp.json()
else:
return resp.content

max_workers = min(max_workers, len(self))

with TimingLogger(
title=f"request_parallel {method} {path} on {len(self)} backends with thread pool {max_workers=}",
logger=_log,
), concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all futures (one for each backend connection)
futures: List[Tuple[BackendId, concurrent.futures.Future]] = []
for con in self.get_connections():
if authenticated_from_request:
auth = BearerAuth(bearer=con.extract_bearer(request=authenticated_from_request))
else:
auth = None
future = executor.submit(
do_request,
root_url=con.root_url,
path=path,
method=method,
headers=con.default_headers,
auth=auth,
)
futures.append((con.id, future))

# Give futures some time to finish
concurrent.futures.wait([f for (_, f) in futures], timeout=overall_timeout)

# Collect results.
results: List[Tuple[BackendId, bool, Any]] = []
for backend_id, future in futures:
try:
result = future.result(timeout=0)
results.append((backend_id, True, result))
except Exception as e:
results.append((backend_id, False, e))

return results


def streaming_flask_response(
backend_response: requests.Response,
Expand Down

0 comments on commit 6ae502a

Please sign in to comment.