diff --git a/confhub/builder.py b/confhub/builder.py index 1f8e79e..6b8376f 100644 --- a/confhub/builder.py +++ b/confhub/builder.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Any, Dict, Optional, Type +from typing import List, Any, Dict, Type import yaml import structlog @@ -13,53 +13,37 @@ def has_configuration_fields(select_class: BlockCore) -> bool: - if any(isinstance(item, ConfigurationField) for item in [ - value for name, value in select_class.__dict__.items() - ]): + if any(isinstance(item, ConfigurationField) for item in select_class.__dict__.values()): return True - else: - if any(isinstance(item, ConfigurationField) for item in [ - value for name, value in select_class.__class__.__dict__.items() - ]): - return False - else: - raise ConfhubError("Cannot find field in model object", select_class=select_class) + if any(isinstance(item, ConfigurationField) for item in select_class.__class__.__dict__.values()): + return False + raise ConfhubError("Cannot find field in model object", select_class=select_class) class ConfigurationBuilder: def __init__(self, *blocks: BlockCore): - self.blocks = [*blocks] + self.blocks = list(blocks) self.datafiles: Dict[str, Any] = {'settings': {}, '.secrets': {}} self.generate_filenames() def data_typing(self, field: ConfigurationField) -> Any: - if field.is_list: - if isinstance(field.data_type, BlockCore): - nested_block = {} - for nested_field_name, nested_field in field.data_type.__class__.__dict__.items(): - if isinstance(nested_field, ConfigurationField): - nested_block[nested_field_name] = self.data_typing(nested_field) - return [nested_block] - else: - return [field.get_default_value()] - else: - if isinstance(field.data_type, BlockCore): - nested_block = {} - for nested_field_name, nested_field in field.data_type.__class__.__dict__.items(): - if isinstance(nested_field, ConfigurationField): - nested_block[nested_field_name] = self.data_typing(nested_field) - return nested_block - else: - return field.get_default_value() + if isinstance(field.data_type, BlockCore): + nested_block = { + nested_field_name: self.data_typing(nested_field) + for nested_field_name, nested_field in field.data_type.__class__.__dict__.items() + if isinstance(nested_field, ConfigurationField) + } + return [nested_block] if field.is_list else nested_block + return [field.get_default_value()] if field.is_list else field.get_default_value() def new_nested(self, nested_model: Type[BlockCore]): nested_block = {nested_model.__block__: {}} for nested_field_name, nested_field in nested_model.__dict__.items(): if isinstance(nested_field, ConfigurationField): - if isinstance(nested_field.data_type, BlockCore): - nested_block[nested_model.__block__][nested_field_name] = [self.new_nested(nested_field.data_type.__class__)] - else: - nested_block[nested_model.__block__][nested_field_name] = self.data_typing(nested_field) + nested_block[nested_model.__block__][nested_field_name] = ( + [self.new_nested(nested_field.data_type.__class__)] if isinstance(nested_field.data_type, BlockCore) else + self.data_typing(nested_field) + ) elif isinstance(nested_field, BlockCore): nested_block[nested_model.__block__][nested_field_name] = [self.new_nested(nested_field.__class__)] return nested_block @@ -70,29 +54,24 @@ def add_field_to_datafiles( if field.secret: target = self.datafiles['.secrets'] elif field.filename: - if field.filename not in self.datafiles: - self.datafiles[field.filename] = {} - target = self.datafiles[field.filename] + target = self.datafiles.setdefault(field.filename, {}) else: target = self.datafiles['settings'] current = target for part in parent_path: - if part not in current: - current[part] = {} - current = current[part] + current = current.setdefault(part, {}) if field.is_list and isinstance(field.data_type, BlockCore): - data = self.new_nested(field.data_type.__class__) - current[field_name] = [data] + current[field_name] = [self.new_nested(field.data_type.__class__)] elif field.is_list: current[field_name] = [field.get_default_value()] elif isinstance(field.data_type, BlockCore): - nested_block = {} - for nested_field_name, nested_field in field.data_type.__class__.__dict__.items(): - if isinstance(nested_field, ConfigurationField): - nested_block[nested_field_name] = self.data_typing(nested_field) - current[field_name] = nested_block + current[field_name] = { + nested_field_name: self.data_typing(nested_field) + for nested_field_name, nested_field in field.data_type.__class__.__dict__.items() + if isinstance(nested_field, ConfigurationField) + } else: current[field_name] = field.get_default_value() @@ -100,19 +79,13 @@ def process_block(self, _block: BlockCore, parent_path: List[str], parent: str = if hasattr(_block, '__exclude__') and _block.__exclude__ and not parent: return - current_path = parent_path + [_block.__block__ if not parent else parent] - if parent: - current_path += [_block.__block__] + current_path = parent_path + ([_block.__block__] if not parent else [parent]) for field_name, field in (_block.__dict__.items() if has_configuration_fields(_block) else _block.__class__.__dict__.items()): if isinstance(field, ConfigurationField): if isinstance(field.data_type, BlockCore): - if field.is_list: - self.add_field_to_datafiles(field_name, field, current_path) - self.process_block(field.data_type, current_path + [field_name]) - else: - self.add_field_to_datafiles(field_name, field, current_path) - self.process_block(field.data_type, current_path, parent=field_name) + self.add_field_to_datafiles(field_name, field, current_path) + self.process_block(field.data_type, current_path + [field_name] if field.is_list else current_path, parent=field_name if not field.is_list else None) else: self.add_field_to_datafiles(field_name, field, current_path) elif isinstance(field, BlockCore): @@ -123,31 +96,27 @@ def generate_filenames(self): if block != BlockCore: self.process_block(block, []) - def remove_empty_dicts(self, data): + @staticmethod + def remove_empty_dicts(data): if isinstance(data, dict): - return {k: self.remove_empty_dicts(v) for k, v in data.items() if v and self.remove_empty_dicts(v)} + return {k: ConfigurationBuilder.remove_empty_dicts(v) for k, v in data.items() if v and ConfigurationBuilder.remove_empty_dicts(v)} elif isinstance(data, list): - return [self.remove_empty_dicts(v) for v in data if v and self.remove_empty_dicts(v)] - else: - return data + return [ConfigurationBuilder.remove_empty_dicts(v) for v in data if v and ConfigurationBuilder.remove_empty_dicts(v)] + return data def create_files(self, config_path: Path) -> None: datafiles = self.remove_empty_dicts(self.datafiles) for filename, data in datafiles.items(): - file_path = config_path / Path(f'{filename}.yml') + file_path = config_path / f'{filename}.yml' if file_path.exists(): with open(file_path, 'r', encoding='utf-8') as file: yaml_data = yaml.safe_load(file) - if yaml_data: - for key in data: - if key in yaml_data: - if not isinstance(yaml_data[key], Dict): - for inner_key in data[key]: - if inner_key in yaml_data[key]: - if isinstance(data[key][inner_key], type(yaml_data[key][inner_key])): - data[key][inner_key] = yaml_data[key][inner_key] + for key, value in data.items(): + if key in yaml_data and isinstance(yaml_data[key], dict): + yaml_data[key].update(value) + data[key] = yaml_data[key] with open(file_path, 'w', encoding='utf-8') as file: yaml.dump(data, file, default_flow_style=False) @@ -155,4 +124,4 @@ def create_files(self, config_path: Path) -> None: if filename.startswith('.'): add_to_gitignore(f"{filename}.*") - logger.info("Create file", path=file_path) \ No newline at end of file + logger.info("Create file", path=file_path)