Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mrchtr committed Mar 28, 2024
1 parent 135f951 commit 905dbd8
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 52 deletions.
8 changes: 0 additions & 8 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,14 +902,6 @@ def dataset_from_module(module_str: str) -> Dataset:
msg = f"No dataset found in module {module_str}"
raise DatasetImportError(msg)

# Skip this one and choose the first dataset instance?
if len(dataset_instances) > 1:
msg = (
f"Found multiple instantiated datasets in {module_str}. Use the first dataset to start "
f"the execution."
)
logger.info(msg)

return dataset_instances[0]


Expand Down
8 changes: 4 additions & 4 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from referencing import Registry, Resource
from referencing.jsonschema import DRAFT4

from fondant.core.exceptions import InvalidComponentSpec, InvalidWorkspaceDefinition
from fondant.core.exceptions import InvalidComponentSpec, InvalidDatasetDefinition
from fondant.core.schema import Field, Type


Expand Down Expand Up @@ -411,7 +411,7 @@ def _validate_mappings(self) -> None:
for key, value in mapping.items():
if not isinstance(value, (str, pa.DataType)):
msg = f"Unexpected type {type(value)} received for key {key} in {name} mapping"
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

def _dataset_schema_to_operation_schema(self, name: str) -> t.Mapping[str, Field]:
"""Calculate the operations schema based on dataset schema.
Expand Down Expand Up @@ -450,7 +450,7 @@ def _dataset_schema_to_operation_schema(self, name: str) -> t.Mapping[str, Field
f"already defined in the `{name}` section of the component spec "
f"with type {spec_type}"
)
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

return types.MappingProxyType(mapping)

Expand Down Expand Up @@ -492,7 +492,7 @@ def _operation_schema_to_dataset_schema(self, name: str) -> t.Mapping[str, Field
f"argument passed to the operation, but `{operations_column_name}` is not "
f"defined in the `{name}` section of the component spec."
)
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

return types.MappingProxyType(mapping)

Expand Down
4 changes: 2 additions & 2 deletions src/fondant/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class InvalidComponentSpec(ValidationError, FondantException):
"""Thrown when a component spec cannot be validated against the schema."""


class InvalidWorkspaceDefinition(ValidationError, FondantException):
"""Thrown when a pipeline definition is invalid."""
class InvalidDatasetDefinition(ValidationError, FondantException):
"""Thrown when a dataset definition is invalid."""


class InvalidTypeSchema(ValidationError, FondantException):
Expand Down
4 changes: 2 additions & 2 deletions src/fondant/core/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class Metadata:
Class representing the Metadata of the manifest.
Args:
dataset_name: the name of the pipeline
run_id: the run id of the pipeline
dataset_name: the name of the dataset
run_id: the run id of the dataset
component_id: the name of the component
cache_key: the cache key of the component.
manifest_location: path to the manifest file itself
Expand Down
8 changes: 4 additions & 4 deletions src/fondant/dataset/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from fsspec.registry import known_implementations

from fondant.core.component_spec import ComponentSpec
from fondant.core.exceptions import InvalidWorkspaceDefinition
from fondant.core.exceptions import InvalidDatasetDefinition
from fondant.core.manifest import Metadata
from fondant.core.schema import CloudCredentialsMount, DockerVolume
from fondant.dataset import (
Expand Down Expand Up @@ -336,7 +336,7 @@ def _set_configuration(self, services, fondant_component_operation, component_id
f" is not a valid accelerator type for Docker Compose compiler."
f" Available options: {VALID_VERTEX_ACCELERATOR_TYPES}"
)
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

if accelerator_name == "GPU":
services[component_id]["deploy"] = {
Expand Down Expand Up @@ -660,7 +660,7 @@ def _set_configuration(self, task, fondant_component_operation):
f"Configured accelerator `{accelerator_name}` is not a valid accelerator type"
f"for Kubeflow compiler. Available options: {VALID_ACCELERATOR_TYPES}"
)
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

task.set_accelerator_limit(accelerator_number)
if accelerator_name == "GPU":
Expand Down Expand Up @@ -719,7 +719,7 @@ def _set_configuration(self, task, fondant_component_operation):
f"Configured accelerator `{accelerator_name}` is not a valid accelerator type"
f"for Vertex compiler. Available options: {VALID_VERTEX_ACCELERATOR_TYPES}"
)
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

task.set_accelerator_type(accelerator_name)

Expand Down
28 changes: 12 additions & 16 deletions src/fondant/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from fondant.component import BaseComponent
from fondant.core.component_spec import ComponentSpec, OperationSpec
from fondant.core.exceptions import (
InvalidDatasetDefinition,
InvalidLightweightComponent,
InvalidWorkspaceDefinition,
)
from fondant.core.manifest import Manifest
from fondant.core.schema import Field
Expand Down Expand Up @@ -98,7 +98,7 @@ def __post_init__(self):
"""Validate the resources."""
if bool(self.node_pool_label) != bool(self.node_pool_name):
msg = "Both node_pool_label and node_pool_name must be specified or both must be None."
raise InvalidWorkspaceDefinition(
raise InvalidDatasetDefinition(
msg,
)

Expand All @@ -107,7 +107,7 @@ def __post_init__(self):
"Both number of accelerators and accelerator name must be specified or both must"
" be None."
)
raise InvalidWorkspaceDefinition(
raise InvalidDatasetDefinition(
msg,
)

Expand Down Expand Up @@ -267,7 +267,7 @@ def _validate_consumes(
f"The dataset does not contain the column {dataset_column_name_or_type} "
f"required by the component {component_spec.name}."
)
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

# If operations column name is not in the component spec, but additional properties
# are true we will infer the correct type from the dataset fields
Expand All @@ -286,7 +286,7 @@ def _validate_consumes(
f"but `{operations_column_name}` is not defined in the `consumes` "
f"section of the component spec."
)
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)

return validated_consumes

Expand Down Expand Up @@ -458,7 +458,7 @@ def _validate_dataset_name(name: str) -> str:
pattern = r"^[a-z0-9][a-z0-9_-]*$"
if not re.match(pattern, name):
msg = f"The dataset name violates the pattern {pattern}"
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)
return name

@staticmethod
Expand All @@ -485,9 +485,7 @@ def register_operation(
input_dataset: t.Optional["Dataset"],
output_dataset: t.Optional["Dataset"],
) -> None:
if self._graph is None:
self._graph = OrderedDict()

"""Register an operation in the dataset graph."""
dependencies = []
for component_name, info in self._graph.items():
if info["output_dataset"] == input_dataset:
Expand Down Expand Up @@ -594,15 +592,13 @@ def validate(self):

def _validate_dataset_definition(self):
"""
Validates the workspace definition by ensuring that the consumed and produced subsets and
Validates the dataset definition by ensuring that the consumed and produced subsets and
their associated fields match and are invoked in the correct order.
Raises:
InvalidPipelineDefinition: If a component is trying to invoke a subset that is not
InvalidDatasetDefinition: If a component is trying to invoke a subset that is not
defined or created in previous components, or if an invoked subset's schema does not
match the previously created subset definition.
base_path: the base path where to store the pipelines artifacts
run_id: the run id of the component
match the previously created subset definition
"""
run_id = self.manifest.run_id
if len(self._graph.keys()) == 0:
Expand Down Expand Up @@ -636,7 +632,7 @@ def _validate_dataset_definition(self):
f"in the previous components. \n"
f"Available field names: {list(manifest.fields.keys())}"
)
raise InvalidWorkspaceDefinition(
raise InvalidDatasetDefinition(
msg,
)

Expand All @@ -653,7 +649,7 @@ def _validate_dataset_definition(self):
f"{manifest_field.type}\nThe current component to "
f"trying to invoke it with this type:\n{component_field.type}"
)
raise InvalidWorkspaceDefinition(
raise InvalidDatasetDefinition(
msg,
)

Expand Down
4 changes: 2 additions & 2 deletions src/fondant/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import yaml

from fondant.core.exceptions import InvalidWorkspaceDefinition
from fondant.core.exceptions import InvalidDatasetDefinition


@dataclass
Expand Down Expand Up @@ -196,7 +196,7 @@ def from_spec(cls, spec_path: str) -> "KubeflowPipelineConfigs":

if not specification:
msg = "No component specification found in the pipeline specification"
raise InvalidWorkspaceDefinition(msg)
raise InvalidDatasetDefinition(msg)
components_configs_dict = {}

# Iterate through each service
Expand Down
4 changes: 2 additions & 2 deletions tests/core/test_manifest_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import yaml
from fondant.core.component_spec import ComponentSpec, OperationSpec
from fondant.core.exceptions import InvalidWorkspaceDefinition
from fondant.core.exceptions import InvalidDatasetDefinition
from fondant.core.manifest import Manifest

EXAMPLES_PATH = Path(__file__).parent / "examples/evolution_examples"
Expand Down Expand Up @@ -107,7 +107,7 @@ def test_invalid_evolution_examples(
component_spec = ComponentSpec.from_dict(component_spec)
for test_condition in test_conditions:
produces = test_condition["produces"]
with pytest.raises(InvalidWorkspaceDefinition): # noqa: PT012
with pytest.raises(InvalidDatasetDefinition): # noqa: PT012
operation_spec = OperationSpec(component_spec, produces=produces)
manifest.evolve(
operation_spec=operation_spec,
Expand Down
8 changes: 4 additions & 4 deletions tests/pipeline/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import yaml
from fondant.component import DaskLoadComponent
from fondant.core.component_spec import ComponentSpec
from fondant.core.exceptions import InvalidWorkspaceDefinition
from fondant.core.exceptions import InvalidDatasetDefinition
from fondant.core.manifest import Manifest, Metadata
from fondant.core.schema import CloudCredentialsMount
from fondant.dataset import (
Expand Down Expand Up @@ -387,7 +387,7 @@ def test_invalid_docker_configuration(tmp_path_factory):

compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012
InvalidWorkspaceDefinition,
InvalidDatasetDefinition,
):
working_directory = str(fn)
compiler.compile(
Expand Down Expand Up @@ -565,7 +565,7 @@ def test_invalid_kubeflow_configuration(tmp_path_factory):
)

compiler = KubeFlowCompiler()
with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
compiler.compile(
dataset=dataset,
working_directory="/foo/bar",
Expand Down Expand Up @@ -666,7 +666,7 @@ def test_invalid_vertex_configuration(tmp_path_factory):
dataset_name="test_pipeline",
)
compiler = VertexCompiler()
with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
compiler.compile(
dataset=dataset,
working_directory="/foo/bar",
Expand Down
16 changes: 8 additions & 8 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import yaml
from fondant.component import DaskLoadComponent
from fondant.core.component_spec import ComponentSpec
from fondant.core.exceptions import InvalidWorkspaceDefinition
from fondant.core.exceptions import InvalidDatasetDefinition
from fondant.core.schema import Field, Type
from fondant.dataset import (
ComponentOp,
Expand Down Expand Up @@ -58,7 +58,7 @@ def test_component_op(
arguments=component_args,
)

with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
ComponentOp(
Path(components_path / component_names[0]),
arguments=component_args,
Expand All @@ -67,7 +67,7 @@ def test_component_op(
),
)

with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
ComponentOp(
Path(components_path / component_names[0]),
arguments=component_args,
Expand Down Expand Up @@ -270,15 +270,15 @@ def test_invalid_pipeline_schema(
)

# "images_pictures" does not exist in the dataset
with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
dataset.apply(
Path(components_path / "second_component"),
arguments=component_args,
consumes={"images_pictures": "images_array"},
)

# "images_array" does not exist in the component spec
with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
dataset.apply(
Path(components_path / "second_component"),
arguments=component_args,
Expand All @@ -287,7 +287,7 @@ def test_invalid_pipeline_schema(

# Extra field in the consumes mapping that does not have a corresponding field
# in the dataset
with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
dataset.apply(
Path(components_path / "second_component"),
arguments=component_args,
Expand Down Expand Up @@ -360,7 +360,7 @@ def test_invalid_pipeline_declaration(
arguments=component_args,
)

with pytest.raises(InvalidWorkspaceDefinition):
with pytest.raises(InvalidDatasetDefinition):
dataset._validate_dataset_definition()


Expand Down Expand Up @@ -504,7 +504,7 @@ def test_invoked_field_schema_raise_exception():
"it with this type:\nType(DataType(string))",
)

with pytest.raises(InvalidWorkspaceDefinition, match=expected_error_msg):
with pytest.raises(InvalidDatasetDefinition, match=expected_error_msg):
dataset.validate()


Expand Down

0 comments on commit 905dbd8

Please sign in to comment.