Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft - Resolves configuration earlier & separate runtimes_params and meta_params #2504

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
# Upcoming Release 0.18.8

## Major features and improvements
* Added support for overiding global parameters via `kedro run --params`
*

## Bug fixes and other changes
* Improvements to documentation about configuration.
* Improvements to Jupyter E2E tests.
* Improvements to documentation on visualising Kedro projects on Databricks.
* Push OmegaConfigLoader variable interpolation earlier in the resolving process,
OmegaConfigLoader["parameters"] now return a `Dict` instead of `DictConfig`.

## Breaking changes to the API

Expand Down
4 changes: 2 additions & 2 deletions kedro/config/abstract_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ def __init__(
conf_source: str,
env: str = None,
runtime_params: Dict[str, Any] = None,
**kwargs
**kwargs,
):
super().__init__()
self.conf_source = conf_source
self.env = env
self.runtime_params = runtime_params
self.runtime_params = runtime_params or {}


class BadConfigException(Exception):
Expand Down
29 changes: 22 additions & 7 deletions kedro/config/omegaconf_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, Dict, Iterable, List, Optional, Set # noqa

import fsspec
from omegaconf import OmegaConf
from omegaconf import DictConfig, OmegaConf
from omegaconf.resolvers import oc
from yaml.parser import ParserError
from yaml.scanner import ScannerError
Expand Down Expand Up @@ -76,6 +76,7 @@ def __init__(
conf_source: str,
env: str = None,
runtime_params: Dict[str, Any] = None,
meta_params: Dict[str, Any] = None,
*,
config_patterns: Dict[str, List[str]] = None,
base_env: str = "base",
Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__(
self._protocol = "file"
self._fs = fsspec.filesystem(protocol=self._protocol, fo=conf_source)

self.meta_params = meta_params
super().__init__(
conf_source=conf_source,
env=env,
Expand Down Expand Up @@ -166,7 +168,7 @@ def __getitem__(self, key) -> Dict[str, Any]:
else:
base_path = str(Path(self._fs.ls("", detail=False)[-1]) / self.base_env)
base_config = self.load_and_merge_dir_config(
base_path, patterns, read_environment_variables
base_path, patterns, read_environment_variables, key
)
config = base_config

Expand All @@ -177,7 +179,7 @@ def __getitem__(self, key) -> Dict[str, Any]:
else:
env_path = str(Path(self._fs.ls("", detail=False)[-1]) / run_env)
env_config = self.load_and_merge_dir_config(
env_path, patterns, read_environment_variables
env_path, patterns, read_environment_variables, key
)

# Destructively merge the two env dirs. The chosen env will override base.
Expand Down Expand Up @@ -210,6 +212,7 @@ def load_and_merge_dir_config(
conf_path: str,
patterns: Iterable[str],
read_environment_variables: Optional[bool] = False,
key=None,
) -> Dict[str, Any]:
"""Recursively load and merge all configuration files in a directory using OmegaConf,
which satisfy a given list of glob patterns from a specific path.
Expand Down Expand Up @@ -254,6 +257,8 @@ def load_and_merge_dir_config(
# this is a workaround to read it as a binary file and decode it back to utf8.
tmp_fo = io.StringIO(open_config.read().decode("utf8"))
config = OmegaConf.load(tmp_fo)
if not OmegaConf.is_dict(config):
raise TypeError("Only DictConfig is supported")
if read_environment_variables:
self._resolve_environment_variables(config)
config_per_file[config_filepath] = config
Expand All @@ -264,6 +269,8 @@ def load_and_merge_dir_config(
f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()},"
f" unable to read line {line}, position {cursor}."
) from exc
if not OmegaConf.is_dict(config):
raise TypeError("Only DictConfig is supported")

seen_file_to_keys = {
file: set(config.keys()) for file, config in config_per_file.items()
Expand All @@ -273,9 +280,17 @@ def load_and_merge_dir_config(

if not aggregate_config:
return {}
if len(aggregate_config) == 1:
return list(aggregate_config)[0]
return dict(OmegaConf.merge(*aggregate_config))

if key == "parameters":
if self.meta_params:
return OmegaConf.to_container(
OmegaConf.merge(*aggregate_config, self.meta_params), resolve=True
)
else:
return OmegaConf.to_container(
OmegaConf.merge(*aggregate_config, self.runtime_params),
resolve=True,
)

def _is_valid_config_path(self, path):
"""Check if given path is a file path and file type is yaml or json."""
Expand Down Expand Up @@ -311,7 +326,7 @@ def _check_duplicates(seen_files_to_keys: Dict[Path, Set[Any]]):
raise ValueError(f"{dup_str}")

@staticmethod
def _resolve_environment_variables(config: Dict[str, Any]) -> None:
def _resolve_environment_variables(config: DictConfig) -> None:
"""Use the ``oc.env`` resolver to read environment variables and replace
them in-place, clearing the resolver after the operation is complete if
it was not registered beforehand.
Expand Down
13 changes: 12 additions & 1 deletion kedro/framework/cli/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ def activate_nbstripout(
help=PARAMS_ARG_HELP,
callback=_split_params,
)
@click.option(
"--meta-params",
type=click.UNPROCESSED,
default="",
help=PARAMS_ARG_HELP,
callback=_split_params,
)
# pylint: disable=too-many-arguments,unused-argument,too-many-locals
def run(
tag,
Expand All @@ -450,6 +457,7 @@ def run(
conf_source,
params,
namespace,
meta_params,
):
"""Run the pipeline."""

Expand All @@ -467,7 +475,10 @@ def run(
load_version = {**load_version, **load_versions}

with KedroSession.create(
env=env, conf_source=conf_source, extra_params=params
env=env,
conf_source=conf_source,
extra_params=params,
meta_params=meta_params,
) as session:
session.run(
tags=tag,
Expand Down
14 changes: 13 additions & 1 deletion kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def create( # pylint: disable=too-many-arguments
save_on_close: bool = True,
env: str = None,
extra_params: Dict[str, Any] = None,
meta_params: Dict[str, Any] = None,
conf_source: Optional[str] = None,
) -> "KedroSession":
"""Create a new instance of ``KedroSession`` with the session data.
Expand Down Expand Up @@ -178,7 +179,8 @@ def create( # pylint: disable=too-many-arguments

if extra_params:
session_data["extra_params"] = extra_params

if meta_params:
session_data["meta_params"] = meta_params
try:
session_data["username"] = getpass.getuser()
except Exception as exc: # pylint: disable=broad-except
Expand Down Expand Up @@ -282,12 +284,22 @@ def _get_config_loader(self) -> ConfigLoader:
"""An instance of the config loader."""
env = self.store.get("env")
extra_params = self.store.get("extra_params")
meta_params = self.store.get("meta_params")

config_loader_class = settings.CONFIG_LOADER_CLASS
if not meta_params:
return config_loader_class(
conf_source=self._conf_source,
env=env,
runtime_params=extra_params,
**settings.CONFIG_LOADER_ARGS,
)

return config_loader_class(
conf_source=self._conf_source,
env=env,
runtime_params=extra_params,
meta_params=meta_params,
**settings.CONFIG_LOADER_ARGS,
)

Expand Down