From 8f604d1da1428210b1792585bb1ab57a5de9b5b6 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 11 Sep 2024 10:47:44 +0100 Subject: [PATCH 01/15] Updated release notes Signed-off-by: Elena Khaustova --- RELEASE.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE.md b/RELEASE.md index 34e75ffb74..548b49a109 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,8 @@ # Upcoming Release ## Major features and improvements +* Refactored `kedro run` and `kedro catalog` commands. +* Moved pattern resolution logic from `DataCatalog` to a separate component - `CatalogConfigResolver`. Updated `DataCatalog` to use `CatalogConfigResolver` internally. * Made packaged Kedro projects return `session.run()` output to be used when running it in the interactive environment. * Enhanced `OmegaConfigLoader` configuration validation to detect duplicate keys at all parameter levels, ensuring comprehensive nested key checking. ## Bug fixes and other changes From 9a4db1858473294c6439687359e5368d06a1a2e1 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 11 Sep 2024 13:53:06 +0100 Subject: [PATCH 02/15] Returned DatasetError Signed-off-by: Elena Khaustova --- kedro/io/catalog_config_resolver.py | 4 ++-- tests/io/test_data_catalog.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kedro/io/catalog_config_resolver.py b/kedro/io/catalog_config_resolver.py index f3548e4dd5..ab679f1e4b 100644 --- a/kedro/io/catalog_config_resolver.py +++ b/kedro/io/catalog_config_resolver.py @@ -73,7 +73,7 @@ def _sort_patterns(cls, dataset_patterns: Patterns) -> Patterns: pattern for pattern in sorted_keys if cls._pattern_specificity(pattern) == 0 ] if len(catch_all) > 1: - raise ValueError( + raise DatasetError( f"Multiple catch-all patterns found in the catalog: {', '.join(catch_all)}. Only one catch-all pattern is allowed, remove the extras." ) return {key: dataset_patterns[key] for key in sorted_keys} @@ -150,7 +150,7 @@ def _resolve_dataset_config( try: config = config.format_map(resolved_vars.named) except KeyError as exc: - raise KeyError( + raise DatasetError( f"Unable to resolve '{config}' from the pattern '{pattern}'. Keys used in the configuration " f"should be present in the dataset factory pattern." ) from exc diff --git a/tests/io/test_data_catalog.py b/tests/io/test_data_catalog.py index be8ed0831e..db777cc634 100644 --- a/tests/io/test_data_catalog.py +++ b/tests/io/test_data_catalog.py @@ -925,7 +925,7 @@ def test_multiple_catch_all_patterns_not_allowed( } with pytest.raises( - ValueError, match="Multiple catch-all patterns found in the catalog" + DatasetError, match="Multiple catch-all patterns found in the catalog" ): DataCatalog.from_config(**config_with_dataset_factories) @@ -1019,7 +1019,7 @@ def test_unmatched_key_error_when_parsing_config( "Unable to resolve 'data/01_raw/{brand}_plane.pq' from the pattern '{type}@planes'. " "Keys used in the configuration should be present in the dataset factory pattern." ) - with pytest.raises(KeyError, match=re.escape(pattern)): + with pytest.raises(DatasetError, match=re.escape(pattern)): catalog._get_dataset("jet@planes") def test_factory_config_versioned( From 0a6946ab4032ffb51c74c93e1bd35784cecddbb9 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 11 Sep 2024 14:13:05 +0100 Subject: [PATCH 03/15] Added _dataset_patterns and _default_pattern to _config_resolver to avoid breaking change Signed-off-by: Elena Khaustova --- kedro/io/data_catalog.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 2b09c35e80..475c18a148 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -160,6 +160,12 @@ def __init__( # noqa: PLR0913 >>> catalog = DataCatalog(datasets={'cars': cars}) """ self._config_resolver = config_resolver or CatalogConfigResolver() + + # Kept to avoid breaking changes + if not config_resolver: + self._config_resolver._dataset_patterns = dataset_patterns or {} + self._config_resolver._default_pattern = default_pattern or {} + self._datasets: dict[str, AbstractDataset] = {} self.datasets: _FrozenDatasets | None = None From fee7bd6d662045572d09a98a3038d3e39671ec6d Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 11 Sep 2024 15:26:12 +0100 Subject: [PATCH 04/15] Made resolve_dataset_pattern return just dict Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 12 ++++---- kedro/io/catalog_config_resolver.py | 47 ++++++++++++----------------- kedro/io/data_catalog.py | 4 +-- 3 files changed, 28 insertions(+), 35 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 4001b696f3..7bd0197e5b 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -93,12 +93,12 @@ def list_datasets(metadata: ProjectMetadata, pipeline: str, env: str) -> None: # resolve any factory datasets in the pipeline factory_ds_by_type = defaultdict(list) - resolved_configs = data_catalog.config_resolver.resolve_dataset_pattern( - default_ds - ) - for ds_name, ds_config in zip(default_ds, resolved_configs): + for ds_name in default_ds: if data_catalog.config_resolver.match_pattern(ds_name): - factory_ds_by_type[ds_config.get("type", "DefaultDataset")].append( # type: ignore[attr-defined] + ds_config = data_catalog.config_resolver.resolve_dataset_pattern( + ds_name + ) + factory_ds_by_type[ds_config.get("type", "DefaultDataset")].append( ds_name ) @@ -253,7 +253,7 @@ def resolve_patterns(metadata: ProjectMetadata, env: str) -> None: ds_config = data_catalog.config_resolver.resolve_dataset_pattern(ds_name) # Exclude MemoryDatasets not set in the catalog explicitly - if ds_config is not None: + if ds_config: explicit_datasets[ds_name] = ds_config secho(yaml.dump(explicit_datasets)) diff --git a/kedro/io/catalog_config_resolver.py b/kedro/io/catalog_config_resolver.py index ab679f1e4b..91218d030c 100644 --- a/kedro/io/catalog_config_resolver.py +++ b/kedro/io/catalog_config_resolver.py @@ -229,36 +229,29 @@ def _resolve_config_credentials( return resolved_configs - def resolve_dataset_pattern( - self, datasets: str | list[str] - ) -> dict[str, Any] | list[dict[str, Any]]: + def resolve_dataset_pattern(self, ds_name: str) -> dict[str, Any]: """Resolve dataset patterns and return resolved configurations based on the existing patterns.""" - datasets_lst = [datasets] if isinstance(datasets, str) else datasets - resolved_configs = [] - - for ds_name in datasets_lst: - matched_pattern = self.match_pattern(ds_name) - if matched_pattern and ds_name not in self._resolved_configs: - pattern_config = self._get_pattern_config(matched_pattern) - ds_config = self._resolve_dataset_config( - ds_name, matched_pattern, copy.deepcopy(pattern_config) + matched_pattern = self.match_pattern(ds_name) + + if matched_pattern and ds_name not in self._resolved_configs: + pattern_config = self._get_pattern_config(matched_pattern) + ds_config = self._resolve_dataset_config( + ds_name, matched_pattern, copy.deepcopy(pattern_config) + ) + + if ( + self._pattern_specificity(matched_pattern) == 0 + and matched_pattern in self._default_pattern + ): + self._logger.warning( + "Config from the dataset factory pattern '%s' in the catalog will be used to " + "override the default dataset creation for '%s'", + matched_pattern, + ds_name, ) + return ds_config - if ( - self._pattern_specificity(matched_pattern) == 0 - and matched_pattern in self._default_pattern - ): - self._logger.warning( - "Config from the dataset factory pattern '%s' in the catalog will be used to " - "override the default dataset creation for '%s'", - matched_pattern, - ds_name, - ) - resolved_configs.append(ds_config) - else: - resolved_configs.append(self._resolved_configs.get(ds_name, None)) - - return resolved_configs[0] if isinstance(datasets, str) else resolved_configs + return self._resolved_configs.get(ds_name, {}) def add_runtime_patterns(self, dataset_patterns: Patterns) -> None: """Add new runtime patterns and re-sort them.""" diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 475c18a148..420f8857c8 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -326,10 +326,10 @@ def _get_dataset( ) -> AbstractDataset: ds_config = self._config_resolver.resolve_dataset_pattern(dataset_name) - if dataset_name not in self._datasets and ds_config is not None: + if dataset_name not in self._datasets and ds_config: ds = AbstractDataset.from_config( dataset_name, - ds_config, # type: ignore[arg-type] + ds_config, self._load_versions.get(dataset_name), self._save_version, ) From f5a7992a57b5407da8504be84eaf2a6888bce84f Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 11 Sep 2024 15:34:40 +0100 Subject: [PATCH 05/15] Fixed linter Signed-off-by: Elena Khaustova --- kedro/io/catalog_config_resolver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/io/catalog_config_resolver.py b/kedro/io/catalog_config_resolver.py index 91218d030c..97ffbadd5f 100644 --- a/kedro/io/catalog_config_resolver.py +++ b/kedro/io/catalog_config_resolver.py @@ -249,7 +249,7 @@ def resolve_dataset_pattern(self, ds_name: str) -> dict[str, Any]: matched_pattern, ds_name, ) - return ds_config + return ds_config # type: ignore[no-any-return] return self._resolved_configs.get(ds_name, {}) From 1c981f33033243d0205c58df1ea3d807337bc0aa Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 11 Sep 2024 15:55:24 +0100 Subject: [PATCH 06/15] Added Catalogprotocol draft Signed-off-by: Elena Khaustova --- kedro/io/core.py | 68 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/kedro/io/core.py b/kedro/io/core.py index f3975c9c3c..5a62ead381 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -17,7 +17,7 @@ from glob import iglob from operator import attrgetter from pathlib import Path, PurePath, PurePosixPath -from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Generic, Protocol, TypeVar from urllib.parse import urlsplit from cachetools import Cache, cachedmethod @@ -29,6 +29,8 @@ if TYPE_CHECKING: import os + from kedro.io.catalog_config_resolver import CatalogConfigResolver, Patterns + VERSION_FORMAT = "%Y-%m-%dT%H.%M.%S.%fZ" VERSIONED_FLAG_KEY = "versioned" VERSION_KEY = "version" @@ -871,3 +873,67 @@ def validate_on_forbidden_chars(**kwargs: Any) -> None: raise DatasetError( f"Neither white-space nor semicolon are allowed in '{key}'." ) + + +class CatalogProtocol(Protocol): + def __contains__(self, ds_name: str) -> bool: + """Check if a dataset is in the catalog.""" + ... + + @property + def config_resolver(self) -> CatalogConfigResolver: + """Return a copy of the datasets dictionary.""" + ... + + @classmethod + def from_config(cls, catalog: dict[str, dict[str, Any]] | None) -> Any: + """Create a ``KedroDataCatalog`` instance from configuration.""" + ... + + def _get_dataset( + self, ds_name: str, suggest: bool = True, version: Any = None | None + ) -> Any: + """Retrieve a dataset by its name.""" + ... + + def list(self, regex_search: str = None | None) -> list[str]: + """List all dataset names registered in the catalog.""" + ... + + def save(self, name: str, data: Any) -> None: + """Save data to a registered dataset.""" + ... + + def load(self, name: str, version: str = None | None) -> Any: + """Load data from a registered dataset.""" + ... + + def add(self, ds_name: str, dataset: Any, replace: bool = False) -> None: + """Add a new dataset to the catalog.""" + ... + + def add_all(self, datasets: dict[str, Any], replace: bool = False) -> None: + """Add a new dataset to the catalog.""" + ... + + def add_feed_dict(self, datasets: dict[str, Any], replace: bool = False) -> None: + """Add datasets to the catalog using the data provided through the `feed_dict`.""" + ... + + def exists(self, name: str) -> bool: + """Checks whether registered data set exists by calling its `exists()` method.""" + pass + + def release(self, name: str) -> None: + """Release any cached data associated with a dataset.""" + ... + + def confirm(self, name: str) -> None: + """Confirm a dataset by its name.""" + ... + + def shallow_copy( + self, extra_dataset_patterns: Patterns | None = None + ) -> CatalogProtocol: + """Returns a shallow copy of the current object.""" + ... From 6128be72ce887932c2cc7728f69c51057bd6dfb3 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 11:41:04 +0100 Subject: [PATCH 07/15] Implemented CatalogProtocol Signed-off-by: Elena Khaustova --- kedro/io/core.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/kedro/io/core.py b/kedro/io/core.py index 5a62ead381..ecdb85d505 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -17,7 +17,15 @@ from glob import iglob from operator import attrgetter from pathlib import Path, PurePath, PurePosixPath -from typing import TYPE_CHECKING, Any, Callable, Generic, Protocol, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Generic, + Protocol, + TypeVar, + runtime_checkable, +) from urllib.parse import urlsplit from cachetools import Cache, cachedmethod @@ -875,7 +883,13 @@ def validate_on_forbidden_chars(**kwargs: Any) -> None: ) -class CatalogProtocol(Protocol): +_C = TypeVar("_C") + + +@runtime_checkable +class CatalogProtocol(Protocol["_C"]): + _datasets: dict[str, AbstractDataset] + def __contains__(self, ds_name: str) -> bool: """Check if a dataset is in the catalog.""" ... @@ -886,17 +900,17 @@ def config_resolver(self) -> CatalogConfigResolver: ... @classmethod - def from_config(cls, catalog: dict[str, dict[str, Any]] | None) -> Any: + def from_config(cls, catalog: dict[str, dict[str, Any]] | None) -> _C: """Create a ``KedroDataCatalog`` instance from configuration.""" ... def _get_dataset( - self, ds_name: str, suggest: bool = True, version: Any = None | None - ) -> Any: + self, ds_name: str, suggest: bool = True, version: Any = None + ) -> AbstractDataset: """Retrieve a dataset by its name.""" ... - def list(self, regex_search: str = None | None) -> list[str]: + def list(self, regex_search: str | None = None) -> list[str]: """List all dataset names registered in the catalog.""" ... @@ -904,7 +918,7 @@ def save(self, name: str, data: Any) -> None: """Save data to a registered dataset.""" ... - def load(self, name: str, version: str = None | None) -> Any: + def load(self, name: str, version: str | None = None) -> _DO: """Load data from a registered dataset.""" ... @@ -932,8 +946,6 @@ def confirm(self, name: str) -> None: """Confirm a dataset by its name.""" ... - def shallow_copy( - self, extra_dataset_patterns: Patterns | None = None - ) -> CatalogProtocol: + def shallow_copy(self, extra_dataset_patterns: Patterns | None = None) -> _C: """Returns a shallow copy of the current object.""" ... From 8c91d0e828ee6e49e909fa2f6bc5f1ec9f59d605 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 11:42:48 +0100 Subject: [PATCH 08/15] Updated types Signed-off-by: Elena Khaustova --- kedro/framework/context/context.py | 8 ++--- kedro/framework/hooks/specs.py | 16 +++++----- kedro/io/__init__.py | 2 ++ kedro/runner/parallel_runner.py | 18 ++++++----- kedro/runner/runner.py | 50 +++++++++++++++--------------- kedro/runner/sequential_runner.py | 8 ++--- kedro/runner/thread_runner.py | 8 ++--- 7 files changed, 57 insertions(+), 53 deletions(-) diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index 3b61b747f6..25995eb63f 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -14,7 +14,7 @@ from kedro.config import AbstractConfigLoader, MissingConfigException from kedro.framework.project import settings -from kedro.io import DataCatalog # noqa: TCH001 +from kedro.io import CatalogProtocol, DataCatalog # noqa: TCH001 from kedro.pipeline.transcoding import _transcode_split if TYPE_CHECKING: @@ -123,7 +123,7 @@ def _convert_paths_to_absolute_posix( return conf_dictionary -def _validate_transcoded_datasets(catalog: DataCatalog) -> None: +def _validate_transcoded_datasets(catalog: CatalogProtocol) -> None: """Validates transcoded datasets are correctly named Args: @@ -178,7 +178,7 @@ class KedroContext: ) @property - def catalog(self) -> DataCatalog: + def catalog(self) -> CatalogProtocol: """Read-only property referring to Kedro's ``DataCatalog`` for this context. Returns: @@ -213,7 +213,7 @@ def _get_catalog( self, save_version: str | None = None, load_versions: dict[str, str] | None = None, - ) -> DataCatalog: + ) -> CatalogProtocol: """A hook for changing the creation of a DataCatalog instance. Returns: diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index b0037a0878..3fd4871aee 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from kedro.framework.context import KedroContext - from kedro.io import DataCatalog + from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline from kedro.pipeline.node import Node @@ -22,7 +22,7 @@ class DataCatalogSpecs: @hook_spec def after_catalog_created( # noqa: PLR0913 self, - catalog: DataCatalog, + catalog: CatalogProtocol, conf_catalog: dict[str, Any], conf_creds: dict[str, Any], feed_dict: dict[str, Any], @@ -53,7 +53,7 @@ class NodeSpecs: def before_node_run( self, node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, session_id: str, @@ -81,7 +81,7 @@ def before_node_run( def after_node_run( # noqa: PLR0913 self, node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], outputs: dict[str, Any], is_async: bool, @@ -110,7 +110,7 @@ def on_node_error( # noqa: PLR0913 self, error: Exception, node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, session_id: str, @@ -137,7 +137,7 @@ class PipelineSpecs: @hook_spec def before_pipeline_run( - self, run_params: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog + self, run_params: dict[str, Any], pipeline: Pipeline, catalog: CatalogProtocol ) -> None: """Hook to be invoked before a pipeline runs. @@ -174,7 +174,7 @@ def after_pipeline_run( run_params: dict[str, Any], run_result: dict[str, Any], pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, ) -> None: """Hook to be invoked after a pipeline runs. @@ -212,7 +212,7 @@ def on_pipeline_error( error: Exception, run_params: dict[str, Any], pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, ) -> None: """Hook to be invoked if a pipeline run throws an uncaught Exception. The signature of this error hook should match the signature of ``before_pipeline_run`` diff --git a/kedro/io/__init__.py b/kedro/io/__init__.py index 4b4a2e1b52..c4d968c2ba 100644 --- a/kedro/io/__init__.py +++ b/kedro/io/__init__.py @@ -9,6 +9,7 @@ from .core import ( AbstractDataset, AbstractVersionedDataset, + CatalogProtocol, DatasetAlreadyExistsError, DatasetError, DatasetNotFoundError, @@ -23,6 +24,7 @@ "AbstractDataset", "AbstractVersionedDataset", "CachedDataset", + "CatalogProtocol", "DataCatalog", "CatalogConfigResolver", "DatasetAlreadyExistsError", diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 62d7e1216b..903c9ece99 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -22,7 +22,7 @@ ) from kedro.framework.project import settings from kedro.io import ( - DataCatalog, + CatalogProtocol, DatasetNotFoundError, MemoryDataset, SharedMemoryDataset, @@ -60,7 +60,7 @@ def _bootstrap_subprocess( def _run_node_synchronization( # noqa: PLR0913 node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, is_async: bool = False, session_id: str | None = None, package_name: str | None = None, @@ -73,7 +73,7 @@ def _run_node_synchronization( # noqa: PLR0913 Args: node: The ``Node`` to run. - catalog: A ``DataCatalog`` containing the node's inputs and outputs. + catalog: A ``CatalogProtocol`` containing the node's inputs and outputs. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. session_id: The session id of the pipeline run. @@ -118,7 +118,7 @@ def __init__( cannot be larger than 61 and will be set to min(61, max_workers). is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol during the run. This is used to set the default datasets to SharedMemoryDataset for `ParallelRunner`. @@ -168,7 +168,7 @@ def _validate_nodes(cls, nodes: Iterable[Node]) -> None: ) @classmethod - def _validate_catalog(cls, catalog: DataCatalog, pipeline: Pipeline) -> None: + def _validate_catalog(cls, catalog: CatalogProtocol, pipeline: Pipeline) -> None: """Ensure that all data sets are serialisable and that we do not have any non proxied memory data sets being used as outputs as their content will not be synchronized across threads. @@ -213,7 +213,9 @@ def _validate_catalog(cls, catalog: DataCatalog, pipeline: Pipeline) -> None: f"MemoryDatasets" ) - def _set_manager_datasets(self, catalog: DataCatalog, pipeline: Pipeline) -> None: + def _set_manager_datasets( + self, catalog: CatalogProtocol, pipeline: Pipeline + ) -> None: for dataset in pipeline.datasets(): try: catalog.exists(dataset) @@ -240,7 +242,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -248,7 +250,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: The ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 6f165e87c0..db397e5f84 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -21,7 +21,7 @@ from more_itertools import interleave from kedro.framework.hooks.manager import _NullPluginManager -from kedro.io import DataCatalog, MemoryDataset +from kedro.io import CatalogProtocol, MemoryDataset from kedro.pipeline import Pipeline if TYPE_CHECKING: @@ -45,7 +45,7 @@ def __init__( Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol during the run. This is used to set the default datasets on the Runner instances. """ @@ -59,7 +59,7 @@ def _logger(self) -> logging.Logger: def run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager | None = None, session_id: str | None = None, ) -> dict[str, Any]: @@ -68,7 +68,7 @@ def run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: The ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. @@ -76,7 +76,7 @@ def run( ValueError: Raised when ``Pipeline`` inputs cannot be satisfied. Returns: - Any node outputs that cannot be processed by the ``DataCatalog``. + Any node outputs that cannot be processed by the ``CatalogProtocol``. These are returned in a dictionary, where the keys are defined by the node outputs. @@ -94,7 +94,7 @@ def run( if unsatisfied: raise ValueError( - f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" + f"Pipeline input(s) {unsatisfied} not found in the CatalogProtocol" ) # Identify MemoryDataset in the catalog @@ -124,7 +124,7 @@ def run( return {ds_name: catalog.load(ds_name) for ds_name in free_outputs} def run_only_missing( - self, pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager + self, pipeline: Pipeline, catalog: CatalogProtocol, hook_manager: PluginManager ) -> dict[str, Any]: """Run only the missing outputs from the ``Pipeline`` using the datasets provided by ``catalog``, and save results back to the @@ -132,7 +132,7 @@ def run_only_missing( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: The ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be @@ -140,7 +140,7 @@ def run_only_missing( Returns: Any node outputs that cannot be processed by the - ``DataCatalog``. These are returned in a dictionary, where + ``CatalogProtocol``. These are returned in a dictionary, where the keys are defined by the node outputs. """ @@ -164,7 +164,7 @@ def run_only_missing( def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -173,7 +173,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: The ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. @@ -184,7 +184,7 @@ def _suggest_resume_scenario( self, pipeline: Pipeline, done_nodes: Iterable[Node], - catalog: DataCatalog, + catalog: CatalogProtocol, ) -> None: """ Suggest a command to the user to resume a run after it fails. @@ -194,7 +194,7 @@ def _suggest_resume_scenario( Args: pipeline: the ``Pipeline`` of the run. done_nodes: the ``Node``s that executed successfully. - catalog: the ``DataCatalog`` of the run. + catalog: the ``CatalogProtocol`` of the run. """ remaining_nodes = set(pipeline.nodes) - set(done_nodes) @@ -223,7 +223,7 @@ def _suggest_resume_scenario( def _find_nodes_to_resume_from( - pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: DataCatalog + pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: CatalogProtocol ) -> set[str]: """Given a collection of unfinished nodes in a pipeline using a certain catalog, find the node names to pass to pipeline.from_nodes() @@ -233,7 +233,7 @@ def _find_nodes_to_resume_from( Args: pipeline: the ``Pipeline`` to find starting nodes for. unfinished_nodes: collection of ``Node``s that have not finished yet - catalog: the ``DataCatalog`` of the run. + catalog: the ``CatalogProtocol`` of the run. Returns: Set of node names to pass to pipeline.from_nodes() to continue @@ -251,7 +251,7 @@ def _find_nodes_to_resume_from( def _find_all_nodes_for_resumed_pipeline( - pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog + pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: CatalogProtocol ) -> set[Node]: """Breadth-first search approach to finding the complete set of ``Node``s which need to run to cover all unfinished nodes, @@ -261,7 +261,7 @@ def _find_all_nodes_for_resumed_pipeline( Args: pipeline: the ``Pipeline`` to analyze. unfinished_nodes: the iterable of ``Node``s which have not finished yet. - catalog: the ``DataCatalog`` of the run. + catalog: the ``CatalogProtocol`` of the run. Returns: A set containing all input unfinished ``Node``s and all remaining @@ -309,12 +309,12 @@ def _nodes_with_external_inputs(nodes_of_interest: Iterable[Node]) -> set[Node]: return set(p_nodes_with_external_inputs.nodes) -def _enumerate_non_persistent_inputs(node: Node, catalog: DataCatalog) -> set[str]: +def _enumerate_non_persistent_inputs(node: Node, catalog: CatalogProtocol) -> set[str]: """Enumerate non-persistent input datasets of a ``Node``. Args: node: the ``Node`` to check the inputs of. - catalog: the ``DataCatalog`` of the run. + catalog: the ``CatalogProtocol`` of the run. Returns: Set of names of non-persistent inputs of given ``Node``. @@ -379,7 +379,7 @@ def _find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[ def run_node( node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, is_async: bool = False, session_id: str | None = None, @@ -388,7 +388,7 @@ def run_node( Args: node: The ``Node`` to run. - catalog: A ``DataCatalog`` containing the node's inputs and outputs. + catalog: A ``CatalogProtocol`` containing the node's inputs and outputs. hook_manager: The ``PluginManager`` to activate hooks. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. @@ -422,7 +422,7 @@ def run_node( def _collect_inputs_from_hook( # noqa: PLR0913 node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, hook_manager: PluginManager, @@ -455,7 +455,7 @@ def _collect_inputs_from_hook( # noqa: PLR0913 def _call_node_run( # noqa: PLR0913 node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, hook_manager: PluginManager, @@ -486,7 +486,7 @@ def _call_node_run( # noqa: PLR0913 def _run_node_sequential( node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> Node: @@ -533,7 +533,7 @@ def _run_node_sequential( def _run_node_async( node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> Node: diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 48dac3cd54..fd5ec26834 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -14,7 +14,7 @@ if TYPE_CHECKING: from pluggy import PluginManager - from kedro.io import DataCatalog + from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline @@ -34,7 +34,7 @@ def __init__( Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol during the run. This is used to set the default datasets to MemoryDataset for `SequentialRunner`. @@ -48,7 +48,7 @@ def __init__( def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -56,7 +56,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: The ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index b4751a602a..39a726db46 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -16,7 +16,7 @@ if TYPE_CHECKING: from pluggy import PluginManager - from kedro.io import DataCatalog + from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline from kedro.pipeline.node import Node @@ -43,7 +43,7 @@ def __init__( is_async: If True, set to False, because `ThreadRunner` doesn't support loading and saving the node inputs and outputs asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol during the run. This is used to set the default datasets to MemoryDataset for `ThreadRunner`. @@ -87,7 +87,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -95,7 +95,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: The ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. From 18d2ba0100595b70a54688e4b9d45f1b19ed8c5d Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 11:47:58 +0100 Subject: [PATCH 09/15] Fixed linter Signed-off-by: Elena Khaustova --- kedro/io/core.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kedro/io/core.py b/kedro/io/core.py index ecdb85d505..443081e2de 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -905,7 +905,10 @@ def from_config(cls, catalog: dict[str, dict[str, Any]] | None) -> _C: ... def _get_dataset( - self, ds_name: str, suggest: bool = True, version: Any = None + self, + dataset_name: str, + version: Any = None, + suggest: bool = True, ) -> AbstractDataset: """Retrieve a dataset by its name.""" ... @@ -918,7 +921,7 @@ def save(self, name: str, data: Any) -> None: """Save data to a registered dataset.""" ... - def load(self, name: str, version: str | None = None) -> _DO: + def load(self, name: str, version: str | None = None) -> Any: """Load data from a registered dataset.""" ... From d48c6d3e024bd924854c5e5fd0173089c6bf74b5 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 13:05:28 +0100 Subject: [PATCH 10/15] Added _ImplementsCatalogProtocolValidator Signed-off-by: Elena Khaustova --- kedro/framework/project/__init__.py | 25 +++++++++++++++++++++++-- kedro/io/core.py | 2 +- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index a3248b9daf..195fa077f6 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -20,6 +20,7 @@ from dynaconf import LazySettings from dynaconf.validator import ValidationError, Validator +from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline, pipeline if TYPE_CHECKING: @@ -59,6 +60,25 @@ def validate( ) +class _ImplementsCatalogProtocolValidator(Validator): + """A validator to check if the supplied setting value is a subclass of the default class""" + + def validate( + self, settings: dynaconf.base.Settings, *args: Any, **kwargs: Any + ) -> None: + super().validate(settings, *args, **kwargs) + + protocol = CatalogProtocol + for name in self.names: + setting_value = getattr(settings, name) + if not isinstance(setting_value(), protocol): + raise ValidationError( + f"Invalid value '{setting_value.__module__}.{setting_value.__qualname__}' " + f"received for setting '{name}'. It must implement " + f"'{protocol.__module__}.{protocol.__qualname__}'." + ) + + class _HasSharedParentClassValidator(Validator): """A validator to check that the parent of the default class is an ancestor of the settings value.""" @@ -115,8 +135,9 @@ class _ProjectSettings(LazySettings): _CONFIG_LOADER_ARGS = Validator( "CONFIG_LOADER_ARGS", default={"base_env": "base", "default_run_env": "local"} ) - _DATA_CATALOG_CLASS = _IsSubclassValidator( - "DATA_CATALOG_CLASS", default=_get_default_class("kedro.io.DataCatalog") + _DATA_CATALOG_CLASS = _ImplementsCatalogProtocolValidator( + "DATA_CATALOG_CLASS", + default=_get_default_class("kedro.io.DataCatalog"), ) def __init__(self, *args: Any, **kwargs: Any): diff --git a/kedro/io/core.py b/kedro/io/core.py index 443081e2de..c0966ea984 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -887,7 +887,7 @@ def validate_on_forbidden_chars(**kwargs: Any) -> None: @runtime_checkable -class CatalogProtocol(Protocol["_C"]): +class CatalogProtocol(Protocol[_C]): _datasets: dict[str, AbstractDataset] def __contains__(self, ds_name: str) -> bool: From 45ce6bcabaf7408cd8b92461ecbd10f7d9f5986f Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 15:20:03 +0100 Subject: [PATCH 11/15] Updated docstrings Signed-off-by: Elena Khaustova --- kedro/framework/context/context.py | 12 ++++++------ kedro/runner/parallel_runner.py | 6 +++--- kedro/runner/runner.py | 24 ++++++++++++------------ kedro/runner/sequential_runner.py | 4 ++-- kedro/runner/thread_runner.py | 4 ++-- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index 25995eb63f..5c14cbae38 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -179,12 +179,12 @@ class KedroContext: @property def catalog(self) -> CatalogProtocol: - """Read-only property referring to Kedro's ``DataCatalog`` for this context. + """Read-only property referring to Kedro's catalog` for this context. Returns: - DataCatalog defined in `catalog.yml`. + catalog defined in `catalog.yml`. Raises: - KedroContextError: Incorrect ``DataCatalog`` registered for the project. + KedroContextError: Incorrect catalog registered for the project. """ return self._get_catalog() @@ -214,12 +214,12 @@ def _get_catalog( save_version: str | None = None, load_versions: dict[str, str] | None = None, ) -> CatalogProtocol: - """A hook for changing the creation of a DataCatalog instance. + """A hook for changing the creation of a catalog instance. Returns: - DataCatalog defined in `catalog.yml`. + catalog defined in `catalog.yml`. Raises: - KedroContextError: Incorrect ``DataCatalog`` registered for the project. + KedroContextError: Incorrect catalog registered for the project. """ # '**/catalog*' reads modular pipeline configs diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 903c9ece99..6c56a54b4e 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -73,7 +73,7 @@ def _run_node_synchronization( # noqa: PLR0913 Args: node: The ``Node`` to run. - catalog: A ``CatalogProtocol`` containing the node's inputs and outputs. + catalog: A catalog containing the node's inputs and outputs. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. session_id: The session id of the pipeline run. @@ -118,7 +118,7 @@ def __init__( cannot be larger than 61 and will be set to min(61, max_workers). is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets to SharedMemoryDataset for `ParallelRunner`. @@ -250,7 +250,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``CatalogProtocol`` from which to fetch data. + catalog: The `catalog from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index db397e5f84..48cdef7d20 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -45,7 +45,7 @@ def __init__( Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets on the Runner instances. """ @@ -68,7 +68,7 @@ def run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``CatalogProtocol`` from which to fetch data. + catalog: The catalog from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. @@ -76,7 +76,7 @@ def run( ValueError: Raised when ``Pipeline`` inputs cannot be satisfied. Returns: - Any node outputs that cannot be processed by the ``CatalogProtocol``. + Any node outputs that cannot be processed by the catalog. These are returned in a dictionary, where the keys are defined by the node outputs. @@ -94,7 +94,7 @@ def run( if unsatisfied: raise ValueError( - f"Pipeline input(s) {unsatisfied} not found in the CatalogProtocol" + f"Pipeline input(s) {unsatisfied} not found in the {catalog.__class__.__name__}" ) # Identify MemoryDataset in the catalog @@ -132,7 +132,7 @@ def run_only_missing( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``CatalogProtocol`` from which to fetch data. + catalog: The `catalog from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be @@ -140,7 +140,7 @@ def run_only_missing( Returns: Any node outputs that cannot be processed by the - ``CatalogProtocol``. These are returned in a dictionary, where + catalog. These are returned in a dictionary, where the keys are defined by the node outputs. """ @@ -173,7 +173,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``CatalogProtocol`` from which to fetch data. + catalog: The `catalog from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. @@ -194,7 +194,7 @@ def _suggest_resume_scenario( Args: pipeline: the ``Pipeline`` of the run. done_nodes: the ``Node``s that executed successfully. - catalog: the ``CatalogProtocol`` of the run. + catalog: the catalog of the run. """ remaining_nodes = set(pipeline.nodes) - set(done_nodes) @@ -233,7 +233,7 @@ def _find_nodes_to_resume_from( Args: pipeline: the ``Pipeline`` to find starting nodes for. unfinished_nodes: collection of ``Node``s that have not finished yet - catalog: the ``CatalogProtocol`` of the run. + catalog: the catalog of the run. Returns: Set of node names to pass to pipeline.from_nodes() to continue @@ -261,7 +261,7 @@ def _find_all_nodes_for_resumed_pipeline( Args: pipeline: the ``Pipeline`` to analyze. unfinished_nodes: the iterable of ``Node``s which have not finished yet. - catalog: the ``CatalogProtocol`` of the run. + catalog: the catalog of the run. Returns: A set containing all input unfinished ``Node``s and all remaining @@ -314,7 +314,7 @@ def _enumerate_non_persistent_inputs(node: Node, catalog: CatalogProtocol) -> se Args: node: the ``Node`` to check the inputs of. - catalog: the ``CatalogProtocol`` of the run. + catalog: the catalog of the run. Returns: Set of names of non-persistent inputs of given ``Node``. @@ -388,7 +388,7 @@ def run_node( Args: node: The ``Node`` to run. - catalog: A ``CatalogProtocol`` containing the node's inputs and outputs. + catalog: A catalog containing the node's inputs and outputs. hook_manager: The ``PluginManager`` to activate hooks. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index fd5ec26834..3f60414b2b 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -34,7 +34,7 @@ def __init__( Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets to MemoryDataset for `SequentialRunner`. @@ -56,7 +56,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``CatalogProtocol`` from which to fetch data. + catalog: The catalog from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 39a726db46..d2c0c251dc 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -43,7 +43,7 @@ def __init__( is_async: If True, set to False, because `ThreadRunner` doesn't support loading and saving the node inputs and outputs asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the CatalogProtocol + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets to MemoryDataset for `ThreadRunner`. @@ -95,7 +95,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``CatalogProtocol`` from which to fetch data. + catalog: The catalog from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. From 6ca972f89a695943764da91dff6b1a792f3501f0 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 22:45:31 +0100 Subject: [PATCH 12/15] Fixed tests Signed-off-by: Elena Khaustova --- tests/framework/context/test_context.py | 2 +- tests/runner/test_sequential_runner.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/framework/context/test_context.py b/tests/framework/context/test_context.py index 61e4bbaa6f..ea62cb04c9 100644 --- a/tests/framework/context/test_context.py +++ b/tests/framework/context/test_context.py @@ -261,7 +261,7 @@ def test_wrong_catalog_type(self, mock_settings_file_bad_data_catalog_class): pattern = ( "Invalid value 'tests.framework.context.test_context.BadCatalog' received " "for setting 'DATA_CATALOG_CLASS'. " - "It must be a subclass of 'kedro.io.data_catalog.DataCatalog'." + "It must implement 'kedro.io.core.CatalogProtocol'." ) mock_settings = _ProjectSettings( settings_file=str(mock_settings_file_bad_data_catalog_class) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index dbc73a30f0..4f22bab296 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -130,7 +130,9 @@ def test_conflict_feed_catalog( def test_unsatisfied_inputs(self, is_async, unfinished_outputs_pipeline, catalog): """ds1, ds2 and ds3 were not specified.""" - with pytest.raises(ValueError, match=r"not found in the DataCatalog"): + with pytest.raises( + ValueError, match=rf"not found in the {catalog.__class__.__name__}" + ): SequentialRunner(is_async=is_async).run( unfinished_outputs_pipeline, catalog ) From fdce5eadeab58a70a063ba0ada21805a9369c38a Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 22:46:44 +0100 Subject: [PATCH 13/15] Fixed docs Signed-off-by: Elena Khaustova --- docs/source/conf.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/conf.py b/docs/source/conf.py index 2c3a2c4c00..a61ba1b08f 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -130,6 +130,7 @@ "kedro.io.catalog_config_resolver.CatalogConfigResolver", "kedro.io.core.AbstractDataset", "kedro.io.core.AbstractVersionedDataset", + "kedro.io.core.CatalogProtocol", "kedro.io.core.DatasetError", "kedro.io.core.Version", "kedro.io.data_catalog.DataCatalog", @@ -170,6 +171,7 @@ "None. Update D from mapping/iterable E and F.", "Patterns", "CatalogConfigResolver", + "CatalogProtocol", ), "py:data": ( "typing.Any", From 3029963a250097e0f4f83a0f730a6d92ef27db97 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 22:47:39 +0100 Subject: [PATCH 14/15] Excluded Potocol from coverage Signed-off-by: Elena Khaustova --- kedro/io/core.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro/io/core.py b/kedro/io/core.py index c0966ea984..1ed35a871b 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -939,7 +939,7 @@ def add_feed_dict(self, datasets: dict[str, Any], replace: bool = False) -> None def exists(self, name: str) -> bool: """Checks whether registered data set exists by calling its `exists()` method.""" - pass + ... def release(self, name: str) -> None: """Release any cached data associated with a dataset.""" diff --git a/pyproject.toml b/pyproject.toml index 8b7b4cb09b..d9ebbfd70b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,7 +134,7 @@ omit = [ "kedro/runner/parallel_runner.py", "*/site-packages/*", ] -exclude_also = ["raise NotImplementedError", "if TYPE_CHECKING:"] +exclude_also = ["raise NotImplementedError", "if TYPE_CHECKING:", "class CatalogProtocol"] [tool.pytest.ini_options] addopts=""" From 0833a843fdf189ab10023bcc0d278b562405a6b8 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 12 Sep 2024 23:00:37 +0100 Subject: [PATCH 15/15] Fixed docs Signed-off-by: Elena Khaustova --- kedro/runner/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 48cdef7d20..bb680aefe6 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -132,7 +132,7 @@ def run_only_missing( Args: pipeline: The ``Pipeline`` to run. - catalog: The `catalog from which to fetch data. + catalog: The catalog from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be