Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
morington committed Jul 1, 2024
1 parent c814e4e commit 421248e
Showing 1 changed file with 105 additions and 98 deletions.
203 changes: 105 additions & 98 deletions confhub/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,121 +12,128 @@
logger: structlog.BoundLogger = structlog.get_logger("confhub")


def has_configuration_fields(select_class: BlockCore) -> bool:
if any(isinstance(item, ConfigurationField) for item in [
value for name, value in select_class.__dict__.items()
]):
return True
else:
if any(isinstance(item, ConfigurationField) for item in [
value for name, value in select_class.__class__.__dict__.items()
]):
return False
else:
raise ConfhubError("Cannot find field in model object", select_class=select_class)


class ConfigurationBuilder:
def __init__(self, *blocks: BlockCore):
self.blocks = [*blocks]
self.datafiles: Dict[str, Any] = self.generate_filenames()

def generate_filenames(self) -> Dict[str, Any]:
_datafiles = {'settings': {}, '.secrets': {}}

def data_typing(field: ConfigurationField) -> Any:
if field.is_list:
if isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = data_typing(nested_field)
return [nested_block]
else:
return [field.get_default_value()]
else:
if isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = data_typing(nested_field)
return nested_block
else:
return field.get_default_value()

def add_field_to_datafiles(
field_name: str, field: ConfigurationField, parent_path: List[str]
) -> None:
if field.secret:
target = _datafiles['.secrets']
elif field.filename:
if field.filename not in _datafiles:
_datafiles[field.filename] = {}
target = _datafiles[field.filename]
else:
target = _datafiles['settings']

current = target
for part in parent_path:
if part not in current:
current[part] = {}
current = current[part]
self.datafiles: Dict[str, Any] = {'settings': {}, '.secrets': {}}
self.generate_filenames()

def new_nested(nested_model: Type[BlockCore]):
nested_block = {nested_model.__block__: {}}
for nested_field_name, nested_field in nested_model.__dict__.items():
if isinstance(nested_field, ConfigurationField):
if isinstance(nested_field.data_type, BlockCore):
nested_block[nested_model.__block__][nested_field_name] = [new_nested(nested_field.data_type.__class__)]
else:
nested_block[nested_model.__block__][nested_field_name] = data_typing(nested_field)
elif isinstance(nested_field, BlockCore):
nested_block[nested_model.__block__][nested_field_name] = [new_nested(nested_field.__class__)]
return nested_block

if field.is_list and isinstance(field.data_type, BlockCore):
data = new_nested(field.data_type.__class__)
current[field_name] = [data]
elif field.is_list:
current[field_name] = [field.get_default_value()]
elif isinstance(field.data_type, BlockCore):
def data_typing(self, field: ConfigurationField) -> Any:
if field.is_list:
if isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = data_typing(nested_field)
current[field_name] = nested_block
nested_block[nested_field_name] = self.data_typing(nested_field)
return [nested_block]
else:
current[field_name] = field.get_default_value()

def has_configuration_fields(select_class: BlockCore) -> bool:
if any(isinstance(item, ConfigurationField) for item in [
value for name, value in select_class.__dict__.items()
]):
return True
return [field.get_default_value()]
else:
if isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = self.data_typing(nested_field)
return nested_block
else:
if any(isinstance(item, ConfigurationField) for item in [
value for name, value in select_class.__class__.__dict__.items()
]):
return False
return field.get_default_value()

def new_nested(self, nested_model: Type[BlockCore]):
nested_block = {nested_model.__block__: {}}
for nested_field_name, nested_field in nested_model.__dict__.items():
if isinstance(nested_field, ConfigurationField):
if isinstance(nested_field.data_type, BlockCore):
nested_block[nested_model.__block__][nested_field_name] = [self.new_nested(nested_field.data_type.__class__)]
else:
raise ConfhubError("Cannot find field in model object", select_class=select_class)

def process_block(_block: BlockCore, parent_path: List[str], parent: str = None) -> None:
if hasattr(_block, '__exclude__') and _block.__exclude__ and not parent:
return

current_path = parent_path + [_block.__block__ if not parent else parent]
if parent:
current_path += [_block.__block__]

for field_name, field in (_block.__dict__.items() if has_configuration_fields(_block) else _block.__class__.__dict__.items()):
if isinstance(field, ConfigurationField):
if isinstance(field.data_type, BlockCore):
if field.is_list:
add_field_to_datafiles(field_name, field, current_path)
process_block(field.data_type, current_path + [field_name])
else:
add_field_to_datafiles(field_name, field, current_path)
process_block(field.data_type, current_path, parent=field_name)
nested_block[nested_model.__block__][nested_field_name] = self.data_typing(nested_field)
elif isinstance(nested_field, BlockCore):
nested_block[nested_model.__block__][nested_field_name] = [self.new_nested(nested_field.__class__)]
return nested_block

def add_field_to_datafiles(
self, field_name: str, field: ConfigurationField, parent_path: List[str]
) -> None:
if field.secret:
target = self.datafiles['.secrets']
elif field.filename:
if field.filename not in self.datafiles:
self.datafiles[field.filename] = {}
target = self.datafiles[field.filename]
else:
target = self.datafiles['settings']

current = target
for part in parent_path:
if part not in current:
current[part] = {}
current = current[part]

if field.is_list and isinstance(field.data_type, BlockCore):
data = self.new_nested(field.data_type.__class__)
current[field_name] = [data]
elif field.is_list:
current[field_name] = [field.get_default_value()]
elif isinstance(field.data_type, BlockCore):
nested_block = {}
for nested_field_name, nested_field in field.data_type.__class__.__dict__.items():
if isinstance(nested_field, ConfigurationField):
nested_block[nested_field_name] = self.data_typing(nested_field)
current[field_name] = nested_block
else:
current[field_name] = field.get_default_value()

def process_block(self, _block: BlockCore, parent_path: List[str], parent: str = None) -> None:
if hasattr(_block, '__exclude__') and _block.__exclude__ and not parent:
return

current_path = parent_path + [_block.__block__ if not parent else parent]
if parent:
current_path += [_block.__block__]

for field_name, field in (_block.__dict__.items() if has_configuration_fields(_block) else _block.__class__.__dict__.items()):
if isinstance(field, ConfigurationField):
if isinstance(field.data_type, BlockCore):
if field.is_list:
self.add_field_to_datafiles(field_name, field, current_path)
self.process_block(field.data_type, current_path + [field_name])
else:
add_field_to_datafiles(field_name, field, current_path)
elif isinstance(field, BlockCore):
process_block(field, current_path, parent=field_name)
self.add_field_to_datafiles(field_name, field, current_path)
self.process_block(field.data_type, current_path, parent=field_name)
else:
self.add_field_to_datafiles(field_name, field, current_path)
elif isinstance(field, BlockCore):
self.process_block(field, current_path, parent=field_name)

def generate_filenames(self):
for block in self.blocks:
if block != BlockCore:
process_block(block, [])
self.process_block(block, [])

return _datafiles
def remove_empty_dicts(self, data):
if isinstance(data, dict):
return {k: self.remove_empty_dicts(v) for k, v in data.items() if v and self.remove_empty_dicts(v)}
elif isinstance(data, list):
return [self.remove_empty_dicts(v) for v in data if v and self.remove_empty_dicts(v)]
else:
return data

def create_files(self, config_path: Path) -> None:
for filename, data in self.datafiles.items():
datafiles = self.remove_empty_dicts(self.datafiles)
for filename, data in datafiles.items():
file_path = config_path / Path(f'{filename}.yml')

if file_path.exists():
Expand Down

0 comments on commit 421248e

Please sign in to comment.