Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
morington committed Jul 1, 2024
1 parent 421248e commit fc056b0
Showing 1 changed file with 40 additions and 71 deletions.
111 changes: 40 additions & 71 deletions confhub/builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import List, Any, Dict, Optional, Type
from typing import List, Any, Dict, Type

import yaml
import structlog
Expand All @@ -13,53 +13,37 @@


def has_configuration_fields(select_class: BlockCore) -> bool:
if any(isinstance(item, ConfigurationField) for item in [
value for name, value in select_class.__dict__.items()
]):
if any(isinstance(item, ConfigurationField) for item in select_class.__dict__.values()):
return True
else:
if any(isinstance(item, ConfigurationField) for item in [
value for name, value in select_class.__class__.__dict__.items()
]):
return False
else:
raise ConfhubError("Cannot find field in model object", select_class=select_class)
if any(isinstance(item, ConfigurationField) for item in select_class.__class__.__dict__.values()):
return False
raise ConfhubError("Cannot find field in model object", select_class=select_class)


class ConfigurationBuilder:
def __init__(self, *blocks: BlockCore):
self.blocks = [*blocks]
self.blocks = list(blocks)
self.datafiles: Dict[str, Any] = {'settings': {}, '.secrets': {}}
self.generate_filenames()

def data_typing(self, field: ConfigurationField) -> Any:
if field.is_list:
if isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = self.data_typing(nested_field)
return [nested_block]
else:
return [field.get_default_value()]
else:
if isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = self.data_typing(nested_field)
return nested_block
else:
return field.get_default_value()
if isinstance(field.data_type, BlockCore):
nested_block = {
nested_field_name: self.data_typing(nested_field)
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items()
if isinstance(nested_field, ConfigurationField)
}
return [nested_block] if field.is_list else nested_block
return [field.get_default_value()] if field.is_list else field.get_default_value()

def new_nested(self, nested_model: Type[BlockCore]):
nested_block = {nested_model.__block__: {}}
for nested_field_name, nested_field in nested_model.__dict__.items():
if isinstance(nested_field, ConfigurationField):
if isinstance(nested_field.data_type, BlockCore):
nested_block[nested_model.__block__][nested_field_name] = [self.new_nested(nested_field.data_type.__class__)]
else:
nested_block[nested_model.__block__][nested_field_name] = self.data_typing(nested_field)
nested_block[nested_model.__block__][nested_field_name] = (
[self.new_nested(nested_field.data_type.__class__)] if isinstance(nested_field.data_type, BlockCore) else
self.data_typing(nested_field)
)
elif isinstance(nested_field, BlockCore):
nested_block[nested_model.__block__][nested_field_name] = [self.new_nested(nested_field.__class__)]
return nested_block
Expand All @@ -70,49 +54,38 @@ def add_field_to_datafiles(
if field.secret:
target = self.datafiles['.secrets']
elif field.filename:
if field.filename not in self.datafiles:
self.datafiles[field.filename] = {}
target = self.datafiles[field.filename]
target = self.datafiles.setdefault(field.filename, {})
else:
target = self.datafiles['settings']

current = target
for part in parent_path:
if part not in current:
current[part] = {}
current = current[part]
current = current.setdefault(part, {})

if field.is_list and isinstance(field.data_type, BlockCore):
data = self.new_nested(field.data_type.__class__)
current[field_name] = [data]
current[field_name] = [self.new_nested(field.data_type.__class__)]
elif field.is_list:
current[field_name] = [field.get_default_value()]
elif isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = self.data_typing(nested_field)
current[field_name] = nested_block
current[field_name] = {
nested_field_name: self.data_typing(nested_field)
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items()
if isinstance(nested_field, ConfigurationField)
}
else:
current[field_name] = field.get_default_value()

def process_block(self, _block: BlockCore, parent_path: List[str], parent: str = None) -> None:
if hasattr(_block, '__exclude__') and _block.__exclude__ and not parent:
return

current_path = parent_path + [_block.__block__ if not parent else parent]
if parent:
current_path += [_block.__block__]
current_path = parent_path + ([_block.__block__] if not parent else [parent])

for field_name, field in (_block.__dict__.items() if has_configuration_fields(_block) else _block.__class__.__dict__.items()):
if isinstance(field, ConfigurationField):
if isinstance(field.data_type, BlockCore):
if field.is_list:
self.add_field_to_datafiles(field_name, field, current_path)
self.process_block(field.data_type, current_path + [field_name])
else:
self.add_field_to_datafiles(field_name, field, current_path)
self.process_block(field.data_type, current_path, parent=field_name)
self.add_field_to_datafiles(field_name, field, current_path)
self.process_block(field.data_type, current_path + [field_name] if field.is_list else current_path, parent=field_name if not field.is_list else None)
else:
self.add_field_to_datafiles(field_name, field, current_path)
elif isinstance(field, BlockCore):
Expand All @@ -123,36 +96,32 @@ def generate_filenames(self):
if block != BlockCore:
self.process_block(block, [])

def remove_empty_dicts(self, data):
@staticmethod
def remove_empty_dicts(data):
if isinstance(data, dict):
return {k: self.remove_empty_dicts(v) for k, v in data.items() if v and self.remove_empty_dicts(v)}
return {k: ConfigurationBuilder.remove_empty_dicts(v) for k, v in data.items() if v and ConfigurationBuilder.remove_empty_dicts(v)}
elif isinstance(data, list):
return [self.remove_empty_dicts(v) for v in data if v and self.remove_empty_dicts(v)]
else:
return data
return [ConfigurationBuilder.remove_empty_dicts(v) for v in data if v and ConfigurationBuilder.remove_empty_dicts(v)]
return data

def create_files(self, config_path: Path) -> None:
datafiles = self.remove_empty_dicts(self.datafiles)
for filename, data in datafiles.items():
file_path = config_path / Path(f'{filename}.yml')
file_path = config_path / f'{filename}.yml'

if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as file:
yaml_data = yaml.safe_load(file)

if yaml_data:
for key in data:
if key in yaml_data:
if not isinstance(yaml_data[key], Dict):
for inner_key in data[key]:
if inner_key in yaml_data[key]:
if isinstance(data[key][inner_key], type(yaml_data[key][inner_key])):
data[key][inner_key] = yaml_data[key][inner_key]
for key, value in data.items():
if key in yaml_data and isinstance(yaml_data[key], dict):
yaml_data[key].update(value)
data[key] = yaml_data[key]

with open(file_path, 'w', encoding='utf-8') as file:
yaml.dump(data, file, default_flow_style=False)

if filename.startswith('.'):
add_to_gitignore(f"{filename}.*")

logger.info("Create file", path=file_path)
logger.info("Create file", path=file_path)

0 comments on commit fc056b0

Please sign in to comment.