Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] - Refactor the OmegaConfigLoader load_and_merge_dir_config method #2591

Closed
wants to merge 12 commits into from
85 changes: 52 additions & 33 deletions kedro/config/omegaconf_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,48 +236,23 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments
"""
# pylint: disable=too-many-locals

if not self._fs.isdir(Path(conf_path).as_posix()):
raise MissingConfigException(
f"Given configuration path either does not exist "
f"or is not a valid directory: {conf_path}"
)

paths = [
Path(each)
for pattern in patterns
for each in self._fs.glob(Path(f"{str(conf_path)}/{pattern}").as_posix())
]
deduplicated_paths = set(paths)
config_files_filtered = [
path for path in deduplicated_paths if self._is_valid_config_path(path)
]
config_files_filtered = self._build_conf_paths(conf_path, patterns)

config_per_file = {}
for config_filepath in config_files_filtered:
try:
with self._fs.open(str(config_filepath.as_posix())) as open_config:
# As fsspec doesn't allow the file to be read as StringIO,
# this is a workaround to read it as a binary file and decode it back to utf8.
tmp_fo = io.StringIO(open_config.read().decode("utf8"))
config = OmegaConf.load(tmp_fo)
processed_files.add(config_filepath)
if read_environment_variables:
self._resolve_environment_variables(config)
config_per_file[config_filepath] = config
except (ParserError, ScannerError) as exc:
line = exc.problem_mark.line # type: ignore
cursor = exc.problem_mark.column # type: ignore
raise ParserError(
f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()},"
f" unable to read line {line}, position {cursor}."
) from exc
config_per_file[config_filepath] = self._load_omega_config(
conf_path, processed_files, read_environment_variables, config_filepath
)

seen_file_to_keys = {
file: set(config.keys()) for file, config in config_per_file.items()
}
aggregate_config = config_per_file.values()
self._check_duplicates(seen_file_to_keys)

aggregate_config = config_per_file.values()
return self._merge_and_resolve_config(key, aggregate_config)

def _merge_and_resolve_config(self, key, aggregate_config):
if not aggregate_config:
return {}

Expand All @@ -288,6 +263,50 @@ def load_and_merge_dir_config( # pylint: disable=too-many-arguments
)
return OmegaConf.to_container(OmegaConf.merge(*aggregate_config), resolve=True)

def _load_omega_config(
self,
conf_path,
processed_files,
read_environment_variables,
config_filepath,
):
try:
with self._fs.open(str(config_filepath.as_posix())) as open_config:
# As fsspec doesn't allow the file to be read as StringIO,
# this is a workaround to read it as a binary file and decode it back to utf8.
tmp_fo = io.StringIO(open_config.read().decode("utf8"))
config = OmegaConf.load(tmp_fo)
processed_files.add(config_filepath)
if read_environment_variables:
self._resolve_environment_variables(config)
except (ParserError, ScannerError) as exc:
line = exc.problem_mark.line # type: ignore
cursor = exc.problem_mark.column # type: ignore
raise ParserError(
f"Invalid YAML or JSON file {Path(conf_path, config_filepath.name).as_posix()},"
f" unable to read line {line}, position {cursor}."
) from exc
return config

def _build_conf_paths(self, conf_path, patterns):
if not self._fs.isdir(Path(conf_path).as_posix()):
raise MissingConfigException(
f"Given configuration path either does not exist "
f"or is not a valid directory: {conf_path}"
)

paths = [
Path(each)
for pattern in patterns
for each in self._fs.glob(Path(f"{str(conf_path)}/{pattern}").as_posix())
]
deduplicated_paths = set(paths)
config_files_filtered = [
path for path in deduplicated_paths if self._is_valid_config_path(path)
]

return config_files_filtered
Comment on lines +291 to +308
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy with this as this is quite clean already


def _is_valid_config_path(self, path):
"""Check if given path is a file path and file type is yaml or json."""
posix_path = path.as_posix()
Expand Down