Skip to content

Commit

Permalink
Add FlyteNonRecoverableSystemException (#2700)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Co-authored-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
  • Loading branch information
pingsutw and wild-endeavor authored Sep 25, 2024
1 parent f394bc9 commit e60c152
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 28 deletions.
18 changes: 17 additions & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.promise import VoidPromise
from flytekit.deck.deck import _output_deck
from flytekit.exceptions.system import FlyteNonRecoverableSystemException
from flytekit.exceptions.user import FlyteRecoverableException, FlyteUserRuntimeException
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.loggers import logger, user_space_logger
Expand Down Expand Up @@ -159,7 +160,22 @@ def _dispatch_execute(
logger.error(exc_str)
logger.error("!! End Error Captured by Flyte !!")

# All the Non-user errors are captured here, and are considered system errors
except FlyteNonRecoverableSystemException as e:
exc_str = get_traceback_str(e.value)
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
"SYSTEM",
exc_str,
_error_models.ContainerError.Kind.NON_RECOVERABLE,
_execution_models.ExecutionError.ErrorKind.SYSTEM,
)
)

logger.error("!! Begin Non-recoverable System Error Captured by Flyte !!")
logger.error(exc_str)
logger.error("!! End Error Captured by Flyte !!")

# All other errors are captured here, and are considered system errors
except Exception as e:
exc_str = get_traceback_str(e)
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
Expand Down
39 changes: 27 additions & 12 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.core.utils import timeit
from flytekit.deck import DeckField
from flytekit.exceptions.system import (
FlyteDownloadDataException,
FlyteNonRecoverableSystemException,
FlyteUploadDataException,
)
from flytekit.exceptions.user import FlyteUserRuntimeException
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
Expand Down Expand Up @@ -636,13 +641,12 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte
except Exception as e:
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != f"o{i}" else i
msg = (
e.args = (
f"Failed to convert outputs of task '{self.name}' at position {key}.\n"
f"Failed to convert type {type(native_outputs_as_map[expected_output_names[i]])} to type {py_type}.\n"
f"Error Message: {e}."
f"Error Message: {e.args[0]}.",
)
logger.error(msg)
raise TypeError(msg) from e
raise
# Now check if there is any output metadata associated with this output variable and attach it to the
# literal
if omt is not None:
Expand Down Expand Up @@ -721,23 +725,26 @@ def dispatch_execute(
)
# type: ignore
) as exec_ctx:
is_local_execution = cast(ExecutionState, exec_ctx.execution_state).is_local_execution()
# TODO We could support default values here too - but not part of the plan right now
# Translate the input literals to Python native
try:
native_inputs = self._literal_map_to_python_input(input_literal_map, exec_ctx)
except Exception as exc:
exc.args = (f"Error encountered while converting inputs of '{self.name}':\n {exc.args[0]}",)
except (FlyteUploadDataException, FlyteDownloadDataException):
raise

except Exception as e:
if is_local_execution:
e.args = (f"Error encountered while converting inputs of '{self.name}':\n {e.args[0]}",)
raise
raise FlyteNonRecoverableSystemException(e) from e
# TODO: Logger should auto inject the current context information to indicate if the task is running within
# a workflow or a subworkflow etc
logger.info(f"Invoking {self.name} with inputs: {native_inputs}")
with timeit("Execute user level code"):
try:
native_outputs = self.execute(**native_inputs)
except Exception as e:
ctx = FlyteContextManager().current_context()
if ctx.execution_state and ctx.execution_state.is_local_execution():
if is_local_execution:
# If the task is being executed locally, we want to raise the original exception
e.args = (f"Error encountered while executing '{self.name}':\n {e.args[0]}",)
raise
Expand Down Expand Up @@ -779,9 +786,17 @@ def dispatch_execute(
):
return native_outputs

literals_map, native_outputs_as_map = self._output_to_literal_map(native_outputs, exec_ctx)
self._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params)
# After the execute has been successfully completed
try:
literals_map, native_outputs_as_map = self._output_to_literal_map(native_outputs, exec_ctx)
self._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params)
except (FlyteUploadDataException, FlyteDownloadDataException):
raise
except Exception as e:
if is_local_execution:
raise
raise FlyteNonRecoverableSystemException(e) from e

# After the execution has been successfully completed
return literals_map

def pre_execute(self, user_params: Optional[ExecutionParameters]) -> Optional[ExecutionParameters]: # type: ignore
Expand Down
5 changes: 3 additions & 2 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from flytekit.configuration import DataConfig
from flytekit.core.local_fsspec import FlyteLocalFileSystem
from flytekit.core.utils import timeit
from flytekit.exceptions.system import FlyteDownloadDataException, FlyteUploadDataException
from flytekit.exceptions.user import FlyteAssertion, FlyteDataNotFoundException
from flytekit.interfaces.random import random
from flytekit.loggers import logger
Expand Down Expand Up @@ -561,7 +562,7 @@ def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False
except FlyteDataNotFoundException:
raise
except Exception as ex:
raise FlyteAssertion(
raise FlyteDownloadDataException(
f"Failed to get data from {remote_path} to {local_path} (recursive={is_multipart}).\n\n"
f"Original exception: {str(ex)}"
)
Expand Down Expand Up @@ -589,7 +590,7 @@ def put_data(
return put_result
return remote_path
except Exception as ex:
raise FlyteAssertion(
raise FlyteUploadDataException(
f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n"
f"Original exception: {str(ex)}"
) from ex
Expand Down
26 changes: 26 additions & 0 deletions flytekit/exceptions/system.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from flytekit.exceptions import base as _base_exceptions
from flytekit.exceptions.base import FlyteException


class FlyteSystemException(_base_exceptions.FlyteRecoverableException):
Expand Down Expand Up @@ -48,3 +49,28 @@ class FlyteSystemAssertion(FlyteSystemException, AssertionError):

class FlyteAgentNotFound(FlyteSystemException, AssertionError):
_ERROR_CODE = "SYSTEM:AgentNotFound"


class FlyteDownloadDataException(FlyteSystemException):
_ERROR_CODE = "SYSTEM:DownloadDataError"


class FlyteUploadDataException(FlyteSystemException):
_ERROR_CODE = "SYSTEM:UploadDataError"


class FlyteNonRecoverableSystemException(FlyteException):
_ERROR_CODE = "USER:NonRecoverableSystemError"

def __init__(self, exc_value: Exception):
"""
FlyteNonRecoverableSystemException is thrown when a system code raises an exception.
:param exc_value: The exception that was raised from system code.
"""
self._exc_value = exc_value
super().__init__(str(exc_value))

@property
def value(self):
return self._exc_value
5 changes: 2 additions & 3 deletions plugins/flytekit-pandera/tests/test_plugin.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os

import pandas
import pandera
import pytest

from flytekitplugins.pandera import schema # noqa: F401

from flytekit import task, workflow
Expand Down Expand Up @@ -73,7 +72,7 @@ def wf_invalid_output(df: pandera.typing.DataFrame[InSchema]) -> pandera.typing.
return transform2_noop(df=transform1(df=df))

with pytest.raises(
TypeError,
pandera.errors.SchemaError,
match=f"Failed to convert type <class 'pandas.core.frame.DataFrame'> to type pandera.typing.pandas.DataFrame",
):
wf_invalid_output(df=valid_df)
Expand Down
8 changes: 4 additions & 4 deletions tests/flytekit/unit/core/test_flyte_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def my_wf(path: FlyteFile[typing.TypeVar("txt")]) -> FlyteFile[typing.TypeVar("j
f = t1(path=path)
return f

with pytest.raises(TypeError) as excinfo:
with pytest.raises(ValueError) as excinfo:
my_wf(path=local_dummy_txt_file)
assert "Incorrect file type, expected image/jpeg, got text/plain" in str(excinfo.value)

Expand Down Expand Up @@ -200,7 +200,7 @@ def wf(path: str) -> None:
ff = t1(path=path)
t2(ff=ff)

with pytest.raises(TypeError) as excinfo:
with pytest.raises(ValueError) as excinfo:
wf(path=local_dummy_file)
assert "Incorrect file type, expected image/jpeg, got text/plain" in str(excinfo.value)

Expand Down Expand Up @@ -509,7 +509,7 @@ def t1() -> FlyteFile:
def wf1() -> FlyteFile:
return t1()

with pytest.raises(TypeError):
with pytest.raises(ValueError):
wf1()

@task
Expand All @@ -521,7 +521,7 @@ def t2() -> FlyteFile:
def wf2() -> FlyteFile:
return t2()

with pytest.raises(TypeError):
with pytest.raises(ValueError):
wf2()


Expand Down
10 changes: 5 additions & 5 deletions tests/flytekit/unit/core/test_type_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from flytekit.core.resources import Resources
from flytekit.core.task import TaskMetadata, task
from flytekit.core.testing import patch, task_mock
from flytekit.core.type_engine import RestrictedTypeError, SimpleTransformer, TypeEngine
from flytekit.core.type_engine import RestrictedTypeError, SimpleTransformer, TypeEngine, TypeTransformerFailedError
from flytekit.core.workflow import workflow
from flytekit.exceptions.user import FlyteValidationException, FlyteFailureNodeInputMismatchException
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -1596,7 +1596,7 @@ def foo4(input: DC1=DC1(1, 'a')) -> DC2:
return input # type: ignore

with pytest.raises(
TypeError,
TypeTransformerFailedError,
match=(
f"Failed to convert inputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo':\n"
" Failed argument 'a': Expected value of type <class 'int'> but got 'hello' of type <class 'str'>"
Expand All @@ -1605,7 +1605,7 @@ def foo4(input: DC1=DC1(1, 'a')) -> DC2:
foo(a="hello", b=10) # type: ignore

with pytest.raises(
TypeError,
ValueError,
match=(
f"Failed to convert outputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo2' at position 0.\n"
f"Failed to convert type <class 'str'> to type <class 'int'>.\n"
Expand All @@ -1615,14 +1615,14 @@ def foo4(input: DC1=DC1(1, 'a')) -> DC2:
foo2(a=10, b="hello")

with pytest.raises(
TypeError,
TypeTransformerFailedError,
match=f"Failed to convert inputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo3':\n "
f"Failed argument 'a': Expected a dict",
):
foo3(a=[{"hello": 2}])

with pytest.raises(
TypeError,
AttributeError,
match=(
f"Failed to convert outputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo4' at position 0.\n"
f"Failed to convert type <class 'tests.flytekit.unit.core.test_type_hints.test_error_messages.<locals>.DC1'> to type <class 'tests.flytekit.unit.core.test_type_hints.test_error_messages.<locals>.DC2'>.\n"
Expand Down
2 changes: 1 addition & 1 deletion tests/flytekit/unit/types/iterator/test_json_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_jsons_tasks():
next(iterator)

# 2
with pytest.raises(TypeError, match="The iterator is empty."):
with pytest.raises(ValueError, match="The iterator is empty."):
jsons_loop_task(x=jsons())

# 3
Expand Down

0 comments on commit e60c152

Please sign in to comment.