Skip to content

Send result events for failed analyses #733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0f4da15
REF: Make `Service.send_exception` private
cortadocodes May 30, 2025
29f1b4c
MRG: Merge branch 'main' into return-outputs-on-exception
cortadocodes May 30, 2025
890382d
FEA: Send result event for failed analyses
cortadocodes May 30, 2025
921cb0f
FIX: Fix analysis attribute setting
cortadocodes May 30, 2025
af59521
TST: Fix `make_new_child`
cortadocodes May 30, 2025
0790317
FIX: Prepare analysis object properly in runner
cortadocodes Jun 10, 2025
e8145ce
TST: Update service tests
cortadocodes Jun 10, 2025
8124e65
FIX: Avoid sending `Analysis` kwargs to superclasses
cortadocodes Jun 10, 2025
6e24ea4
TST: Simplify `make_run_function`
cortadocodes Jun 10, 2025
9caa15f
TST: Update child tests
cortadocodes Jun 10, 2025
2eee569
ENH: Allow not raising errors from exception events in event handler
cortadocodes Jun 10, 2025
7536ef8
ENH: Add `success` key to result events
cortadocodes Jun 14, 2025
707e403
REF: Factor out converting exception event to exception
cortadocodes Jun 14, 2025
1fccf23
ENH: Add exception to result event if success is `False`
cortadocodes Jun 14, 2025
ec63c50
ENH: Trigger question retries in `Child` if result `success=False`
cortadocodes Jun 14, 2025
c8b1d63
ENH: Add `raise_errors` option to `Service.wait_for_answer`
cortadocodes Jun 14, 2025
8cb5400
REF: Simplify handling of `analysis` in `Service.answer`
cortadocodes Jun 14, 2025
87e05c9
REF: Move private method further down class
cortadocodes Jun 14, 2025
3fbff4e
FIX: Avoid raising errors in child's first question attempt
cortadocodes Jun 14, 2025
00460a3
FIX: Update exception handling in `Child.ask`
cortadocodes Jun 14, 2025
ddbb536
DOC: Update docstrings
cortadocodes Jun 14, 2025
f8b63bc
FIX: Ensure full result event is returned from event handler
cortadocodes Jun 14, 2025
456e014
TST: Update test
cortadocodes Jun 14, 2025
0e2a215
REV: Revert "FIX: Ensure full result event is returned from event han…
cortadocodes Jun 14, 2025
e9baa4f
FIX: Add `success` key to result returned from event handler
cortadocodes Jun 14, 2025
13a4ed4
FIX: Add `result` key to result returned from event handler
cortadocodes Jun 14, 2025
7522472
REF: Remove extra whitespace
cortadocodes Jun 14, 2025
99c9cdc
TST: Update tests
cortadocodes Jun 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions octue/cloud/events/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from octue.definitions import GOOGLE_COMPUTE_PROVIDERS
from octue.log_handlers import COLOUR_PALETTE
from octue.resources.manifest import Manifest
from octue.utils.exceptions import convert_exception_event_to_exception

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,6 +38,7 @@ class AbstractEventHandler:
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
:param bool only_handle_result: if `True`, skip handling non-result events and only handle the "result" event when received (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (turning this off speeds up event handling)
:param bool raise_errors: if `True`, raise any exceptions received; otherwise, just log them (just logging them allows a partial result event to be received afterwards and handled)
:return None:
"""

Expand All @@ -50,6 +52,7 @@ def __init__(
exclude_logs_containing=None,
only_handle_result=False,
validate_events=True,
raise_errors=True,
):
self.handle_monitor_message = handle_monitor_message
self.record_events = record_events
Expand All @@ -58,6 +61,7 @@ def __init__(
self.exclude_logs_containing = exclude_logs_containing
self.only_handle_result = only_handle_result
self.validate_events = validate_events
self.raise_errors = raise_errors

self.handled_events = []
self._start_time = None
Expand Down Expand Up @@ -221,29 +225,19 @@ def _handle_log_message(self, event, attributes):
logger.handle(record)

def _handle_exception(self, event, attributes):
"""Raise the exception from the child.
"""Raise or log the exception from the child.

:param dict event:
:param dict attributes: the event's attributes
:param octue.cloud.events.attributes.ResponseAttributes attributes: the event's attributes
:raise Exception:
:return None:
"""
exception_message = "\n\n".join(
(
event["exception_message"],
f"The following traceback was captured from the remote service {attributes.sender!r}:",
"".join(event["exception_traceback"]),
)
)

try:
exception_type = EXCEPTIONS_MAPPING[event["exception_type"]]
error = convert_exception_event_to_exception(event, attributes.sender, EXCEPTIONS_MAPPING)

# Allow unknown exception types to still be raised.
except KeyError:
exception_type = type(event["exception_type"], (Exception,), {})
if self.raise_errors:
raise error

raise exception_type(exception_message)
logger.error("", exc_info=error)

def _handle_result(self, event, attributes):
"""Extract any output values and output manifest from the result, deserialising the manifest if present.
Expand All @@ -259,4 +253,13 @@ def _handle_result(self, event, attributes):
else:
output_manifest = None

return {"output_values": event.get("output_values"), "output_manifest": output_manifest}
result = {
"output_values": event.get("output_values"),
"output_manifest": output_manifest,
"success": event["success"],
}

if event.get("exception"):
result["exception"] = event["exception"]

return result
3 changes: 3 additions & 0 deletions octue/cloud/events/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class EventReplayer(AbstractEventHandler):
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (this is off by default to speed up event handling)
:param bool raise_errors: if `True`, raise any exceptions received; otherwise, just log them (just logging them allows a partial result event to be received afterwards and handled)
:return None:
"""

Expand All @@ -32,6 +33,7 @@ def __init__(
exclude_logs_containing=None,
only_handle_result=False,
validate_events=False,
raise_errors=True,
):
event_handlers = event_handlers or {
"question": self._handle_question,
Expand All @@ -52,6 +54,7 @@ def __init__(
exclude_logs_containing=exclude_logs_containing,
only_handle_result=only_handle_result,
validate_events=validate_events,
raise_errors=raise_errors,
)

def handle_events(self, events):
Expand Down
3 changes: 3 additions & 0 deletions octue/cloud/pub_sub/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class GoogleCloudPubSubEventHandler(AbstractEventHandler):
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (turn this off to speed up event handling at risk of failure if an invalid event is received)
:param bool raise_errors: if `True`, raise any exceptions received; otherwise, just log them (just logging them allows a partial result event to be received afterwards and handled)
:return None:
"""

Expand All @@ -60,6 +61,7 @@ def __init__(
exclude_logs_containing=None,
only_handle_result=False,
validate_events=True,
raise_errors=True,
):
self.subscription = subscription

Expand All @@ -72,6 +74,7 @@ def __init__(
exclude_logs_containing=exclude_logs_containing,
only_handle_result=only_handle_result,
validate_events=validate_events,
raise_errors=raise_errors,
)

self._heartbeat_checker = None
Expand Down
59 changes: 38 additions & 21 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from octue.compatibility import warn_if_incompatible
from octue.definitions import DEFAULT_MAXIMUM_HEARTBEAT_INTERVAL, LOCAL_SDK_VERSION
import octue.exceptions
from octue.resources import Analysis
from octue.utils.dictionaries import make_minimal_dictionary
from octue.utils.encoders import OctueJSONEncoder
from octue.utils.exceptions import convert_exception_to_primitives
Expand Down Expand Up @@ -195,12 +196,16 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
:raise Exception: if any exception arises during running analysis and sending its results
:return dict: the result event
"""
heartbeater = None

# Instantiate analysis here so outputs can be accessed even in the event of an exception in the run function.
analysis = Analysis()

try:
question, question_attributes = self._parse_question(question)
except jsonschema.ValidationError:
return

heartbeater = None
response_attributes = ResponseAttributes.from_question_attributes(question_attributes)

try:
Expand All @@ -225,7 +230,8 @@ def answer(self, question, heartbeat_interval=120, timeout=30):

handle_monitor_message = functools.partial(self._send_monitor_message, attributes=response_attributes)

analysis = self.run_function(
self.run_function(
analysis=analysis,
analysis_id=question_attributes.question_uuid,
input_values=question.get("input_values"),
input_manifest=question.get("input_manifest"),
Expand All @@ -237,7 +243,7 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
originator=question_attributes.originator,
)

result = self._send_result(analysis, response_attributes)
result = self._send_result(analysis, response_attributes, success=True)
heartbeater.cancel()
logger.info("%r answered question %r.", self, question_attributes.question_uuid)
return result
Expand All @@ -251,7 +257,8 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
sender_sdk_version=question_attributes.sender_sdk_version,
)

self.send_exception(attributes=response_attributes, timeout=timeout)
self._send_exception(attributes=response_attributes, timeout=timeout)
self._send_result(analysis, response_attributes, success=False, exception=self._serialise_exception())
raise error

def ask(
Expand Down Expand Up @@ -381,6 +388,7 @@ def wait_for_answer(
record_events=True,
timeout=60,
maximum_heartbeat_interval=DEFAULT_MAXIMUM_HEARTBEAT_INTERVAL,
raise_errors=True,
):
"""Wait for an answer to a question on the given subscription, deleting the subscription and its topic once
the answer is received.
Expand All @@ -390,8 +398,9 @@ def wait_for_answer(
:param bool record_events: if `True`, record messages received from the child in the `received_events` attribute
:param float|None timeout: how long in seconds to wait for an answer before raising a `TimeoutError`
:param float|int maximum_heartbeat_interval: the maximum amount of time (in seconds) allowed between child heartbeats before an error is raised
:param bool raise_errors:
:raise TimeoutError: if the timeout is exceeded
:return dict: dictionary containing the keys "output_values" and "output_manifest"
:return dict: dictionary containing the keys "output_values", "output_manifest", "success", and for a failed analysis, "exception"
"""
if subscription.is_push_subscription:
raise octue.exceptions.NotAPullSubscription(
Expand All @@ -403,6 +412,7 @@ def wait_for_answer(
subscription=subscription,
handle_monitor_message=handle_monitor_message,
record_events=record_events,
raise_errors=raise_errors,
)

try:
Expand Down Expand Up @@ -444,26 +454,15 @@ def wait_for_answer(
# self._emit_event({"kind": "cancellation"}, attributes=question_attributes, timeout=timeout)
# logger.info("Cancellation of question %r requested.", question_uuid)

def send_exception(self, attributes, timeout=30):
def _send_exception(self, attributes, timeout=30):
"""Serialise and send the exception being handled to the parent.

:param octue.cloud.events.attributes.ResponseAttributes attributes: the attributes to use for the exception event
:param float|None timeout: time in seconds to keep retrying sending of the exception
:return None:
"""
exception = convert_exception_to_primitives()
exception_message = f"Error in {self!r}: {exception['message']}"

self._emit_event(
{
"kind": "exception",
"exception_type": exception["type"],
"exception_message": exception_message,
"exception_traceback": exception["traceback"],
},
attributes=attributes,
timeout=timeout,
)
event = self._serialise_exception()
self._emit_event(event, attributes=attributes, timeout=timeout)

def _emit_event(self, event, attributes, wait=True, timeout=30):
"""Emit a JSON-serialised event as a Pub/Sub message to the services topic with optional message attributes.
Expand Down Expand Up @@ -592,22 +591,40 @@ def _send_monitor_message(self, data, attributes, timeout=30):
self._emit_event({"kind": "monitor_message", "data": data}, attributes=attributes, timeout=timeout, wait=False)
logger.debug("Monitor message sent by %r.", self)

def _send_result(self, analysis, attributes, timeout=30):
def _send_result(self, analysis, attributes, success, exception=None, timeout=30):
"""Send the result to the parent.

:param octue.resources.analysis.Analysis analysis: the analysis object containing the output values and/or output manifest
:param octue.cloud.events.attributes.ResponseAttributes attributes: the attributes to use for the result event
:param bool success:
:param dict|None exception:
:param float timeout: time in seconds to retry sending the message
:return dict: the result
"""
result = make_minimal_dictionary(kind="result", output_values=analysis.output_values)
result = make_minimal_dictionary(
kind="result",
output_values=analysis.output_values,
success=success,
exception=exception,
)

if analysis.output_manifest is not None:
result["output_manifest"] = analysis.output_manifest.to_primitive()

self._emit_event(event=result, attributes=attributes, timeout=timeout)
return result

def _serialise_exception(self):
exception = convert_exception_to_primitives()
exception_message = f"Error in {self!r}: {exception['message']}"

return {
"kind": "exception",
"exception_type": exception["type"],
"exception_message": exception_message,
"exception_traceback": exception["traceback"],
}

def _parse_question(self, question):
"""Parse a question in dictionary format or direct Google Pub/Sub format.

Expand Down
56 changes: 37 additions & 19 deletions octue/resources/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@

import coolname

import twined.exceptions
from octue.cloud import storage
from octue.exceptions import InvalidMonitorMessage
from octue.mixins import Hashable, Identifiable, Labelable, Serialisable, Taggable
from octue.resources.manifest import Manifest
from octue.utils.encoders import OctueJSONEncoder
from octue.utils.threads import RepeatingTimer
from twined import ALL_STRANDS, Twine

import twined.exceptions

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,7 +47,7 @@ class Analysis(Identifiable, Serialisable, Labelable, Taggable):
If a strand is ``None``, so will its corresponding hash attribute be. The hash of a datafile is the hash of its
file, while the hash of a manifest or dataset is the cumulative hash of the files it refers to.

:param twined.Twine|dict|str twine: the twine, dictionary defining a twine, or path to "twine.json" file defining the service's data interface
:param twined.Twine|dict|str|None twine: the twine, dictionary defining a twine, or path to "twine.json" file defining the service's data interface
:param callable|None handle_monitor_message: an optional function for sending monitor messages to the parent that requested the analysis
:param any configuration_values: the configuration values for the analysis - this can be expressed as a python primitive (e.g. dict), a path to a JSON file, or a JSON string.
:param octue.resources.manifest.Manifest configuration_manifest: a manifest of configuration datasets for the analysis if required
Expand All @@ -61,16 +60,45 @@ class Analysis(Identifiable, Serialisable, Labelable, Taggable):
:return None:
"""

def __init__(self, twine, handle_monitor_message=None, **kwargs):
if isinstance(twine, Twine):
def __init__(self, twine=None, handle_monitor_message=None, **kwargs):
strand_kwargs = {name: kwargs.pop(name, None) for name in ALL_STRANDS}
output_location = kwargs.pop("output_location", None)
use_signed_urls_for_output_datasets = kwargs.pop("use_signed_urls_for_output_datasets", False)

self.prepare(
twine=twine,
handle_monitor_message=handle_monitor_message,
output_location=output_location,
use_signed_urls_for_output_datasets=use_signed_urls_for_output_datasets,
**strand_kwargs,
)

super().__init__(**kwargs)

@property
def finalised(self):
"""Check whether the analysis has been finalised (i.e. whether its outputs have been validated and, if an output
manifest is produced, its datasets uploaded).

:return bool:
"""
return self._finalised

def prepare(
self,
twine=None,
handle_monitor_message=None,
output_location=None,
use_signed_urls_for_output_datasets=None,
**strand_kwargs,
):
if twine is None or isinstance(twine, Twine):
self.twine = twine
else:
self.twine = Twine(source=twine)

self._handle_monitor_message = handle_monitor_message

strand_kwargs = {name: kwargs.pop(name, None) for name in ALL_STRANDS}

# Values strands.
self.configuration_values = strand_kwargs.get("configuration_values", None)
self.input_values = strand_kwargs.get("input_values", None)
Expand All @@ -85,22 +113,12 @@ def __init__(self, twine, handle_monitor_message=None, **kwargs):
self.children = strand_kwargs.get("children", None)

# Non-strands.
self.output_location = kwargs.pop("output_location", None)
self.use_signed_urls_for_output_datasets = kwargs.pop("use_signed_urls_for_output_datasets", False)
self.output_location = output_location
self.use_signed_urls_for_output_datasets = use_signed_urls_for_output_datasets

self._calculate_strand_hashes(strands=strand_kwargs)
self._periodic_monitor_message_sender_threads = []
self._finalised = False
super().__init__(**kwargs)

@property
def finalised(self):
"""Check whether the analysis has been finalised (i.e. whether its outputs have been validated and, if an output
manifest is produced, its datasets uploaded).

:return bool:
"""
return self._finalised

def send_monitor_message(self, data):
"""Send a monitor message to the parent that requested the analysis.
Expand Down
Loading
Loading