From 905dbd8da198e6f6afda89afb805f8fb4ee188a5 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 28 Mar 2024 11:47:28 +0100 Subject: [PATCH] Addressing comments --- src/fondant/cli.py | 8 -------- src/fondant/core/component_spec.py | 8 ++++---- src/fondant/core/exceptions.py | 4 ++-- src/fondant/core/manifest.py | 4 ++-- src/fondant/dataset/compiler.py | 8 ++++---- src/fondant/dataset/dataset.py | 28 ++++++++++++--------------- src/fondant/testing.py | 4 ++-- tests/core/test_manifest_evolution.py | 4 ++-- tests/pipeline/test_compiler.py | 8 ++++---- tests/pipeline/test_pipeline.py | 16 +++++++-------- 10 files changed, 40 insertions(+), 52 deletions(-) diff --git a/src/fondant/cli.py b/src/fondant/cli.py index 27173c0b..1aad2760 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -902,14 +902,6 @@ def dataset_from_module(module_str: str) -> Dataset: msg = f"No dataset found in module {module_str}" raise DatasetImportError(msg) - # Skip this one and choose the first dataset instance? - if len(dataset_instances) > 1: - msg = ( - f"Found multiple instantiated datasets in {module_str}. Use the first dataset to start " - f"the execution." - ) - logger.info(msg) - return dataset_instances[0] diff --git a/src/fondant/core/component_spec.py b/src/fondant/core/component_spec.py index c9647795..272e4403 100644 --- a/src/fondant/core/component_spec.py +++ b/src/fondant/core/component_spec.py @@ -15,7 +15,7 @@ from referencing import Registry, Resource from referencing.jsonschema import DRAFT4 -from fondant.core.exceptions import InvalidComponentSpec, InvalidWorkspaceDefinition +from fondant.core.exceptions import InvalidComponentSpec, InvalidDatasetDefinition from fondant.core.schema import Field, Type @@ -411,7 +411,7 @@ def _validate_mappings(self) -> None: for key, value in mapping.items(): if not isinstance(value, (str, pa.DataType)): msg = f"Unexpected type {type(value)} received for key {key} in {name} mapping" - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) def _dataset_schema_to_operation_schema(self, name: str) -> t.Mapping[str, Field]: """Calculate the operations schema based on dataset schema. @@ -450,7 +450,7 @@ def _dataset_schema_to_operation_schema(self, name: str) -> t.Mapping[str, Field f"already defined in the `{name}` section of the component spec " f"with type {spec_type}" ) - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) return types.MappingProxyType(mapping) @@ -492,7 +492,7 @@ def _operation_schema_to_dataset_schema(self, name: str) -> t.Mapping[str, Field f"argument passed to the operation, but `{operations_column_name}` is not " f"defined in the `{name}` section of the component spec." ) - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) return types.MappingProxyType(mapping) diff --git a/src/fondant/core/exceptions.py b/src/fondant/core/exceptions.py index fb1a3ff1..927f8f0a 100644 --- a/src/fondant/core/exceptions.py +++ b/src/fondant/core/exceptions.py @@ -15,8 +15,8 @@ class InvalidComponentSpec(ValidationError, FondantException): """Thrown when a component spec cannot be validated against the schema.""" -class InvalidWorkspaceDefinition(ValidationError, FondantException): - """Thrown when a pipeline definition is invalid.""" +class InvalidDatasetDefinition(ValidationError, FondantException): + """Thrown when a dataset definition is invalid.""" class InvalidTypeSchema(ValidationError, FondantException): diff --git a/src/fondant/core/manifest.py b/src/fondant/core/manifest.py index eb143409..ba86cbc3 100644 --- a/src/fondant/core/manifest.py +++ b/src/fondant/core/manifest.py @@ -24,8 +24,8 @@ class Metadata: Class representing the Metadata of the manifest. Args: - dataset_name: the name of the pipeline - run_id: the run id of the pipeline + dataset_name: the name of the dataset + run_id: the run id of the dataset component_id: the name of the component cache_key: the cache key of the component. manifest_location: path to the manifest file itself diff --git a/src/fondant/dataset/compiler.py b/src/fondant/dataset/compiler.py index 254b82ab..8b3fa1be 100644 --- a/src/fondant/dataset/compiler.py +++ b/src/fondant/dataset/compiler.py @@ -15,7 +15,7 @@ from fsspec.registry import known_implementations from fondant.core.component_spec import ComponentSpec -from fondant.core.exceptions import InvalidWorkspaceDefinition +from fondant.core.exceptions import InvalidDatasetDefinition from fondant.core.manifest import Metadata from fondant.core.schema import CloudCredentialsMount, DockerVolume from fondant.dataset import ( @@ -336,7 +336,7 @@ def _set_configuration(self, services, fondant_component_operation, component_id f" is not a valid accelerator type for Docker Compose compiler." f" Available options: {VALID_VERTEX_ACCELERATOR_TYPES}" ) - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) if accelerator_name == "GPU": services[component_id]["deploy"] = { @@ -660,7 +660,7 @@ def _set_configuration(self, task, fondant_component_operation): f"Configured accelerator `{accelerator_name}` is not a valid accelerator type" f"for Kubeflow compiler. Available options: {VALID_ACCELERATOR_TYPES}" ) - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) task.set_accelerator_limit(accelerator_number) if accelerator_name == "GPU": @@ -719,7 +719,7 @@ def _set_configuration(self, task, fondant_component_operation): f"Configured accelerator `{accelerator_name}` is not a valid accelerator type" f"for Vertex compiler. Available options: {VALID_VERTEX_ACCELERATOR_TYPES}" ) - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) task.set_accelerator_type(accelerator_name) diff --git a/src/fondant/dataset/dataset.py b/src/fondant/dataset/dataset.py index f680db57..c23343df 100644 --- a/src/fondant/dataset/dataset.py +++ b/src/fondant/dataset/dataset.py @@ -23,8 +23,8 @@ from fondant.component import BaseComponent from fondant.core.component_spec import ComponentSpec, OperationSpec from fondant.core.exceptions import ( + InvalidDatasetDefinition, InvalidLightweightComponent, - InvalidWorkspaceDefinition, ) from fondant.core.manifest import Manifest from fondant.core.schema import Field @@ -98,7 +98,7 @@ def __post_init__(self): """Validate the resources.""" if bool(self.node_pool_label) != bool(self.node_pool_name): msg = "Both node_pool_label and node_pool_name must be specified or both must be None." - raise InvalidWorkspaceDefinition( + raise InvalidDatasetDefinition( msg, ) @@ -107,7 +107,7 @@ def __post_init__(self): "Both number of accelerators and accelerator name must be specified or both must" " be None." ) - raise InvalidWorkspaceDefinition( + raise InvalidDatasetDefinition( msg, ) @@ -267,7 +267,7 @@ def _validate_consumes( f"The dataset does not contain the column {dataset_column_name_or_type} " f"required by the component {component_spec.name}." ) - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) # If operations column name is not in the component spec, but additional properties # are true we will infer the correct type from the dataset fields @@ -286,7 +286,7 @@ def _validate_consumes( f"but `{operations_column_name}` is not defined in the `consumes` " f"section of the component spec." ) - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) return validated_consumes @@ -458,7 +458,7 @@ def _validate_dataset_name(name: str) -> str: pattern = r"^[a-z0-9][a-z0-9_-]*$" if not re.match(pattern, name): msg = f"The dataset name violates the pattern {pattern}" - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) return name @staticmethod @@ -485,9 +485,7 @@ def register_operation( input_dataset: t.Optional["Dataset"], output_dataset: t.Optional["Dataset"], ) -> None: - if self._graph is None: - self._graph = OrderedDict() - + """Register an operation in the dataset graph.""" dependencies = [] for component_name, info in self._graph.items(): if info["output_dataset"] == input_dataset: @@ -594,15 +592,13 @@ def validate(self): def _validate_dataset_definition(self): """ - Validates the workspace definition by ensuring that the consumed and produced subsets and + Validates the dataset definition by ensuring that the consumed and produced subsets and their associated fields match and are invoked in the correct order. Raises: - InvalidPipelineDefinition: If a component is trying to invoke a subset that is not + InvalidDatasetDefinition: If a component is trying to invoke a subset that is not defined or created in previous components, or if an invoked subset's schema does not - match the previously created subset definition. - base_path: the base path where to store the pipelines artifacts - run_id: the run id of the component + match the previously created subset definition """ run_id = self.manifest.run_id if len(self._graph.keys()) == 0: @@ -636,7 +632,7 @@ def _validate_dataset_definition(self): f"in the previous components. \n" f"Available field names: {list(manifest.fields.keys())}" ) - raise InvalidWorkspaceDefinition( + raise InvalidDatasetDefinition( msg, ) @@ -653,7 +649,7 @@ def _validate_dataset_definition(self): f"{manifest_field.type}\nThe current component to " f"trying to invoke it with this type:\n{component_field.type}" ) - raise InvalidWorkspaceDefinition( + raise InvalidDatasetDefinition( msg, ) diff --git a/src/fondant/testing.py b/src/fondant/testing.py index 7b7ddca4..9c7102a1 100644 --- a/src/fondant/testing.py +++ b/src/fondant/testing.py @@ -5,7 +5,7 @@ import yaml -from fondant.core.exceptions import InvalidWorkspaceDefinition +from fondant.core.exceptions import InvalidDatasetDefinition @dataclass @@ -196,7 +196,7 @@ def from_spec(cls, spec_path: str) -> "KubeflowPipelineConfigs": if not specification: msg = "No component specification found in the pipeline specification" - raise InvalidWorkspaceDefinition(msg) + raise InvalidDatasetDefinition(msg) components_configs_dict = {} # Iterate through each service diff --git a/tests/core/test_manifest_evolution.py b/tests/core/test_manifest_evolution.py index b2a3e630..3bd0344a 100644 --- a/tests/core/test_manifest_evolution.py +++ b/tests/core/test_manifest_evolution.py @@ -5,7 +5,7 @@ import pytest import yaml from fondant.core.component_spec import ComponentSpec, OperationSpec -from fondant.core.exceptions import InvalidWorkspaceDefinition +from fondant.core.exceptions import InvalidDatasetDefinition from fondant.core.manifest import Manifest EXAMPLES_PATH = Path(__file__).parent / "examples/evolution_examples" @@ -107,7 +107,7 @@ def test_invalid_evolution_examples( component_spec = ComponentSpec.from_dict(component_spec) for test_condition in test_conditions: produces = test_condition["produces"] - with pytest.raises(InvalidWorkspaceDefinition): # noqa: PT012 + with pytest.raises(InvalidDatasetDefinition): # noqa: PT012 operation_spec = OperationSpec(component_spec, produces=produces) manifest.evolve( operation_spec=operation_spec, diff --git a/tests/pipeline/test_compiler.py b/tests/pipeline/test_compiler.py index 3c8c4ecf..f421a472 100644 --- a/tests/pipeline/test_compiler.py +++ b/tests/pipeline/test_compiler.py @@ -15,7 +15,7 @@ import yaml from fondant.component import DaskLoadComponent from fondant.core.component_spec import ComponentSpec -from fondant.core.exceptions import InvalidWorkspaceDefinition +from fondant.core.exceptions import InvalidDatasetDefinition from fondant.core.manifest import Manifest, Metadata from fondant.core.schema import CloudCredentialsMount from fondant.dataset import ( @@ -387,7 +387,7 @@ def test_invalid_docker_configuration(tmp_path_factory): compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012 - InvalidWorkspaceDefinition, + InvalidDatasetDefinition, ): working_directory = str(fn) compiler.compile( @@ -565,7 +565,7 @@ def test_invalid_kubeflow_configuration(tmp_path_factory): ) compiler = KubeFlowCompiler() - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): compiler.compile( dataset=dataset, working_directory="/foo/bar", @@ -666,7 +666,7 @@ def test_invalid_vertex_configuration(tmp_path_factory): dataset_name="test_pipeline", ) compiler = VertexCompiler() - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): compiler.compile( dataset=dataset, working_directory="/foo/bar", diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 06a0e53c..0c7eed11 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -10,7 +10,7 @@ import yaml from fondant.component import DaskLoadComponent from fondant.core.component_spec import ComponentSpec -from fondant.core.exceptions import InvalidWorkspaceDefinition +from fondant.core.exceptions import InvalidDatasetDefinition from fondant.core.schema import Field, Type from fondant.dataset import ( ComponentOp, @@ -58,7 +58,7 @@ def test_component_op( arguments=component_args, ) - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): ComponentOp( Path(components_path / component_names[0]), arguments=component_args, @@ -67,7 +67,7 @@ def test_component_op( ), ) - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): ComponentOp( Path(components_path / component_names[0]), arguments=component_args, @@ -270,7 +270,7 @@ def test_invalid_pipeline_schema( ) # "images_pictures" does not exist in the dataset - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): dataset.apply( Path(components_path / "second_component"), arguments=component_args, @@ -278,7 +278,7 @@ def test_invalid_pipeline_schema( ) # "images_array" does not exist in the component spec - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): dataset.apply( Path(components_path / "second_component"), arguments=component_args, @@ -287,7 +287,7 @@ def test_invalid_pipeline_schema( # Extra field in the consumes mapping that does not have a corresponding field # in the dataset - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): dataset.apply( Path(components_path / "second_component"), arguments=component_args, @@ -360,7 +360,7 @@ def test_invalid_pipeline_declaration( arguments=component_args, ) - with pytest.raises(InvalidWorkspaceDefinition): + with pytest.raises(InvalidDatasetDefinition): dataset._validate_dataset_definition() @@ -504,7 +504,7 @@ def test_invoked_field_schema_raise_exception(): "it with this type:\nType(DataType(string))", ) - with pytest.raises(InvalidWorkspaceDefinition, match=expected_error_msg): + with pytest.raises(InvalidDatasetDefinition, match=expected_error_msg): dataset.validate()