diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index ee5904fdf1..36c5994421 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -36,6 +36,7 @@ from flytekit.core.data_persistence import FileAccessProvider from flytekit.core.promise import VoidPromise from flytekit.deck.deck import _output_deck +from flytekit.exceptions.system import FlyteNonRecoverableSystemException from flytekit.exceptions.user import FlyteRecoverableException, FlyteUserRuntimeException from flytekit.interfaces.stats.taggable import get_stats as _get_stats from flytekit.loggers import logger, user_space_logger @@ -159,7 +160,22 @@ def _dispatch_execute( logger.error(exc_str) logger.error("!! End Error Captured by Flyte !!") - # All the Non-user errors are captured here, and are considered system errors + except FlyteNonRecoverableSystemException as e: + exc_str = get_traceback_str(e.value) + output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( + _error_models.ContainerError( + "SYSTEM", + exc_str, + _error_models.ContainerError.Kind.NON_RECOVERABLE, + _execution_models.ExecutionError.ErrorKind.SYSTEM, + ) + ) + + logger.error("!! Begin Non-recoverable System Error Captured by Flyte !!") + logger.error(exc_str) + logger.error("!! End Error Captured by Flyte !!") + + # All other errors are captured here, and are considered system errors except Exception as e: exc_str = get_traceback_str(e) output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 060077a65a..682749c273 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -71,6 +71,11 @@ from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError from flytekit.core.utils import timeit from flytekit.deck import DeckField +from flytekit.exceptions.system import ( + FlyteDownloadDataException, + FlyteNonRecoverableSystemException, + FlyteUploadDataException, +) from flytekit.exceptions.user import FlyteUserRuntimeException from flytekit.loggers import logger from flytekit.models import dynamic_job as _dynamic_job @@ -636,13 +641,12 @@ def _output_to_literal_map(self, native_outputs: Dict[int, Any], ctx: FlyteConte except Exception as e: # only show the name of output key if it's user-defined (by default Flyte names these as "o") key = k if k != f"o{i}" else i - msg = ( + e.args = ( f"Failed to convert outputs of task '{self.name}' at position {key}.\n" f"Failed to convert type {type(native_outputs_as_map[expected_output_names[i]])} to type {py_type}.\n" - f"Error Message: {e}." + f"Error Message: {e.args[0]}.", ) - logger.error(msg) - raise TypeError(msg) from e + raise # Now check if there is any output metadata associated with this output variable and attach it to the # literal if omt is not None: @@ -721,14 +725,18 @@ def dispatch_execute( ) # type: ignore ) as exec_ctx: + is_local_execution = cast(ExecutionState, exec_ctx.execution_state).is_local_execution() # TODO We could support default values here too - but not part of the plan right now # Translate the input literals to Python native try: native_inputs = self._literal_map_to_python_input(input_literal_map, exec_ctx) - except Exception as exc: - exc.args = (f"Error encountered while converting inputs of '{self.name}':\n {exc.args[0]}",) + except (FlyteUploadDataException, FlyteDownloadDataException): raise - + except Exception as e: + if is_local_execution: + e.args = (f"Error encountered while converting inputs of '{self.name}':\n {e.args[0]}",) + raise + raise FlyteNonRecoverableSystemException(e) from e # TODO: Logger should auto inject the current context information to indicate if the task is running within # a workflow or a subworkflow etc logger.info(f"Invoking {self.name} with inputs: {native_inputs}") @@ -736,8 +744,7 @@ def dispatch_execute( try: native_outputs = self.execute(**native_inputs) except Exception as e: - ctx = FlyteContextManager().current_context() - if ctx.execution_state and ctx.execution_state.is_local_execution(): + if is_local_execution: # If the task is being executed locally, we want to raise the original exception e.args = (f"Error encountered while executing '{self.name}':\n {e.args[0]}",) raise @@ -779,9 +786,17 @@ def dispatch_execute( ): return native_outputs - literals_map, native_outputs_as_map = self._output_to_literal_map(native_outputs, exec_ctx) - self._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params) - # After the execute has been successfully completed + try: + literals_map, native_outputs_as_map = self._output_to_literal_map(native_outputs, exec_ctx) + self._write_decks(native_inputs, native_outputs_as_map, ctx, new_user_params) + except (FlyteUploadDataException, FlyteDownloadDataException): + raise + except Exception as e: + if is_local_execution: + raise + raise FlyteNonRecoverableSystemException(e) from e + + # After the execution has been successfully completed return literals_map def pre_execute(self, user_params: Optional[ExecutionParameters]) -> Optional[ExecutionParameters]: # type: ignore diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 89556a53d0..cdd07afba7 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -36,6 +36,7 @@ from flytekit.configuration import DataConfig from flytekit.core.local_fsspec import FlyteLocalFileSystem from flytekit.core.utils import timeit +from flytekit.exceptions.system import FlyteDownloadDataException, FlyteUploadDataException from flytekit.exceptions.user import FlyteAssertion, FlyteDataNotFoundException from flytekit.interfaces.random import random from flytekit.loggers import logger @@ -561,7 +562,7 @@ def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False except FlyteDataNotFoundException: raise except Exception as ex: - raise FlyteAssertion( + raise FlyteDownloadDataException( f"Failed to get data from {remote_path} to {local_path} (recursive={is_multipart}).\n\n" f"Original exception: {str(ex)}" ) @@ -589,7 +590,7 @@ def put_data( return put_result return remote_path except Exception as ex: - raise FlyteAssertion( + raise FlyteUploadDataException( f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n" f"Original exception: {str(ex)}" ) from ex diff --git a/flytekit/exceptions/system.py b/flytekit/exceptions/system.py index d965d129d7..c1e3c6010c 100644 --- a/flytekit/exceptions/system.py +++ b/flytekit/exceptions/system.py @@ -1,4 +1,5 @@ from flytekit.exceptions import base as _base_exceptions +from flytekit.exceptions.base import FlyteException class FlyteSystemException(_base_exceptions.FlyteRecoverableException): @@ -48,3 +49,28 @@ class FlyteSystemAssertion(FlyteSystemException, AssertionError): class FlyteAgentNotFound(FlyteSystemException, AssertionError): _ERROR_CODE = "SYSTEM:AgentNotFound" + + +class FlyteDownloadDataException(FlyteSystemException): + _ERROR_CODE = "SYSTEM:DownloadDataError" + + +class FlyteUploadDataException(FlyteSystemException): + _ERROR_CODE = "SYSTEM:UploadDataError" + + +class FlyteNonRecoverableSystemException(FlyteException): + _ERROR_CODE = "USER:NonRecoverableSystemError" + + def __init__(self, exc_value: Exception): + """ + FlyteNonRecoverableSystemException is thrown when a system code raises an exception. + + :param exc_value: The exception that was raised from system code. + """ + self._exc_value = exc_value + super().__init__(str(exc_value)) + + @property + def value(self): + return self._exc_value diff --git a/plugins/flytekit-pandera/tests/test_plugin.py b/plugins/flytekit-pandera/tests/test_plugin.py index 3c7a5107d4..f3a97c395e 100644 --- a/plugins/flytekit-pandera/tests/test_plugin.py +++ b/plugins/flytekit-pandera/tests/test_plugin.py @@ -1,8 +1,7 @@ -import os - import pandas import pandera import pytest + from flytekitplugins.pandera import schema # noqa: F401 from flytekit import task, workflow @@ -73,7 +72,7 @@ def wf_invalid_output(df: pandera.typing.DataFrame[InSchema]) -> pandera.typing. return transform2_noop(df=transform1(df=df)) with pytest.raises( - TypeError, + pandera.errors.SchemaError, match=f"Failed to convert type to type pandera.typing.pandas.DataFrame", ): wf_invalid_output(df=valid_df) diff --git a/tests/flytekit/unit/core/test_flyte_file.py b/tests/flytekit/unit/core/test_flyte_file.py index d17464c1e9..cce4f35afc 100644 --- a/tests/flytekit/unit/core/test_flyte_file.py +++ b/tests/flytekit/unit/core/test_flyte_file.py @@ -137,7 +137,7 @@ def my_wf(path: FlyteFile[typing.TypeVar("txt")]) -> FlyteFile[typing.TypeVar("j f = t1(path=path) return f - with pytest.raises(TypeError) as excinfo: + with pytest.raises(ValueError) as excinfo: my_wf(path=local_dummy_txt_file) assert "Incorrect file type, expected image/jpeg, got text/plain" in str(excinfo.value) @@ -200,7 +200,7 @@ def wf(path: str) -> None: ff = t1(path=path) t2(ff=ff) - with pytest.raises(TypeError) as excinfo: + with pytest.raises(ValueError) as excinfo: wf(path=local_dummy_file) assert "Incorrect file type, expected image/jpeg, got text/plain" in str(excinfo.value) @@ -509,7 +509,7 @@ def t1() -> FlyteFile: def wf1() -> FlyteFile: return t1() - with pytest.raises(TypeError): + with pytest.raises(ValueError): wf1() @task @@ -521,7 +521,7 @@ def t2() -> FlyteFile: def wf2() -> FlyteFile: return t2() - with pytest.raises(TypeError): + with pytest.raises(ValueError): wf2() diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 0e7b88bd08..4e5c8b6fb0 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -31,7 +31,7 @@ from flytekit.core.resources import Resources from flytekit.core.task import TaskMetadata, task from flytekit.core.testing import patch, task_mock -from flytekit.core.type_engine import RestrictedTypeError, SimpleTransformer, TypeEngine +from flytekit.core.type_engine import RestrictedTypeError, SimpleTransformer, TypeEngine, TypeTransformerFailedError from flytekit.core.workflow import workflow from flytekit.exceptions.user import FlyteValidationException, FlyteFailureNodeInputMismatchException from flytekit.models import literals as _literal_models @@ -1596,7 +1596,7 @@ def foo4(input: DC1=DC1(1, 'a')) -> DC2: return input # type: ignore with pytest.raises( - TypeError, + TypeTransformerFailedError, match=( f"Failed to convert inputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo':\n" " Failed argument 'a': Expected value of type but got 'hello' of type " @@ -1605,7 +1605,7 @@ def foo4(input: DC1=DC1(1, 'a')) -> DC2: foo(a="hello", b=10) # type: ignore with pytest.raises( - TypeError, + ValueError, match=( f"Failed to convert outputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo2' at position 0.\n" f"Failed to convert type to type .\n" @@ -1615,14 +1615,14 @@ def foo4(input: DC1=DC1(1, 'a')) -> DC2: foo2(a=10, b="hello") with pytest.raises( - TypeError, + TypeTransformerFailedError, match=f"Failed to convert inputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo3':\n " f"Failed argument 'a': Expected a dict", ): foo3(a=[{"hello": 2}]) with pytest.raises( - TypeError, + AttributeError, match=( f"Failed to convert outputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo4' at position 0.\n" f"Failed to convert type .DC1'> to type .DC2'>.\n" diff --git a/tests/flytekit/unit/types/iterator/test_json_iterator.py b/tests/flytekit/unit/types/iterator/test_json_iterator.py index fbef86d791..fba58cb9f6 100644 --- a/tests/flytekit/unit/types/iterator/test_json_iterator.py +++ b/tests/flytekit/unit/types/iterator/test_json_iterator.py @@ -74,7 +74,7 @@ def test_jsons_tasks(): next(iterator) # 2 - with pytest.raises(TypeError, match="The iterator is empty."): + with pytest.raises(ValueError, match="The iterator is empty."): jsons_loop_task(x=jsons()) # 3