diff --git a/src/west/configuration.py b/src/west/configuration.py index 8c87e75a..2676b779 100644 --- a/src/west/configuration.py +++ b/src/west/configuration.py @@ -40,20 +40,77 @@ from pathlib import PureWindowsPath, Path import platform from enum import Enum -from typing import Any, Optional, List +from typing import Any, Dict, Iterable, List, Optional, Tuple, TYPE_CHECKING from west.util import west_dir, WestNotFound, PathType def _configparser(): # for internal use return configparser.ConfigParser(allow_no_value=True) -# Configuration values. -# -# Initially empty, populated in read_config(). Always having this available is -# nice in case something checks configuration values before the configuration -# file has been read (e.g. the log.py functions, to check color settings, and -# tests). -config = _configparser() +class _InternalCF: + # For internal use only; convenience interface for reading and + # writing INI-style [section] key = value configuration files, + # but presenting a west-style section.key = value style API. + + @staticmethod + def from_path(path: Optional[Path]) -> Optional['_InternalCF']: + return _InternalCF(path) if path and path.exists() else None + + def __init__(self, path: Path): + self.path = path + self.cp = _configparser() + read_files = self.cp.read(path, encoding='utf-8') + if len(read_files) != 1: + raise FileNotFoundError(path) + + def __contains__(self, option: str) -> bool: + section, key = option.split('.', 1) + + return section in self.cp and key in self.cp[section] + + def get(self, option: str): + return self._get(option, self.cp.get) + + def getboolean(self, option: str): + return self._get(option, self.cp.getboolean) + + def getint(self, option: str): + return self._get(option, self.cp.getint) + + def getfloat(self, option: str): + return self._get(option, self.cp.getfloat) + + def _get(self, option, getter): + section, key = option.split('.', 1) + + try: + return getter(section, key) + except (configparser.NoOptionError, configparser.NoSectionError): + raise KeyError(option) + + def set(self, option: str, value: Any): + section, key = option.split('.', 1) + + if section not in self.cp: + self.cp[section] = {} + + self.cp[section][key] = value + + with open(self.path, 'w', encoding='utf-8') as f: + self.cp.write(f) + + def delete(self, option: str): + section, key = option.split('.', 1) + + if section not in self.cp: + raise KeyError(option) + + del self.cp[section][key] + if not self.cp[section].items(): + del self.cp[section] + + with open(self.path, 'w', encoding='utf-8') as f: + self.cp.write(f) class ConfigFile(Enum): '''Types of west configuration file. @@ -70,6 +127,278 @@ class ConfigFile(Enum): GLOBAL = 3 LOCAL = 4 +class Configuration: + '''Represents the available configuration options and their values. + + Allows getting, setting, and deleting configuration options + in the system, global, and local files. + + Sets take effect immediately and are not protected against + concurrent gets. The caller is responsible for any necessary + mutual exclusion. + ''' + + def __init__(self, topdir: Optional[PathType] = None): + '''Load the system, global, and workspace configurations and + make them available for the user. + + :param topdir: workspace location; may be None + ''' + + local_path = _location(ConfigFile.LOCAL, topdir=topdir, + search_for_local=False) or None + + self._system_path = Path(_location(ConfigFile.SYSTEM)) + self._global_path = Path(_location(ConfigFile.GLOBAL)) + self._local_path = Path(local_path) if local_path is not None else None + + self._system = _InternalCF.from_path(self._system_path) + self._global = _InternalCF.from_path(self._global_path) + self._local = _InternalCF.from_path(self._local_path) + + def get(self, option: str, + default: Optional[str] = None, + configfile: ConfigFile = ConfigFile.ALL) -> Optional[str]: + '''Get a configuration option's value as a string. + + :param option: option to get, in 'foo.bar' form + :param default: default value to return if option is missing + :param configfile: type of config file look for the value in + ''' + return self._get(lambda cf: cf.get(option), default, configfile) + + def getboolean(self, option: str, + default: bool = False, + configfile: ConfigFile = ConfigFile.ALL) -> bool: + '''Get a configuration option's value as a bool. + + The configparser module's conversion to boolean is applied + to any value discovered. Invalid values raise ValueError. + + :param option: option to get, in 'foo.bar' form + :param default: default value to return if option is missing + :param configfile: type of config file to look for the value in + ''' + return self._get(lambda cf: cf.getboolean(option), default, configfile) + + def getint(self, option: str, + default: Optional[int] = None, + configfile: ConfigFile = ConfigFile.ALL) -> Optional[int]: + '''Get a configuration option's value as an int. + + :param option: option to get, in 'foo.bar' form + :param default: default value to return if option is missing + :param configfile: type of config file to look for the value in + ''' + return self._get(lambda cf: cf.getint(option), default, configfile) + + def getfloat(self, option: str, + default: Optional[float] = None, + configfile: ConfigFile = ConfigFile.ALL) -> Optional[float]: + '''Get a configuration option's value as a float. + + :param option: option to get, in 'foo.bar' form + :param default: default value to return if option is missing + :param configfile: type of config file to look for the value in + ''' + return self._get(lambda cf: cf.getfloat(option), default, configfile) + + def _get(self, getter, default, configfile): + for cf in self._whence(configfile): + if cf is None: + continue + try: + return getter(cf) + except KeyError: + pass + + return default + + def _whence(self, configfile): + if configfile == ConfigFile.ALL: + if self._local is not None: + return [self._local, self._global, self._system] + return [self._global, self._system] + elif configfile == ConfigFile.SYSTEM: + return [self._system] + elif configfile == ConfigFile.GLOBAL: + return [self._global] + elif configfile == ConfigFile.LOCAL: + if self._local is None: + raise RuntimeError('local configuration file not found') + return [self._local] + else: + raise ValueError(configfile) + + def set(self, option: str, value: Any, + configfile: ConfigFile = ConfigFile.LOCAL) -> None: + '''Set a configuration option's value. + + The write to the configuration file takes effect + immediately. No concurrency protection is performed against + concurrent access from the time that this Configuration object + was created. If the file may have been modified since that + time, either create a new Configuration object before using + this method or lose the intervening modifications. + + :param option: option to set, in 'foo.bar' form + :param value: value to set option to + :param configfile: type of config file to set the value in + ''' + + if configfile == ConfigFile.ALL: + # We need a real configuration file; ALL doesn't make sense here. + raise ValueError(configfile) + elif configfile == ConfigFile.LOCAL: + if self._local_path is None: + raise ValueError(f'{configfile}: file not found; retry in a ' + 'workspace or set WEST_CONFIG_LOCAL') + if not self._local_path.exists(): + self._local = self._create(self._local_path) + if TYPE_CHECKING: + assert self._local + self._local.set(option, value) + elif configfile == ConfigFile.GLOBAL: + if not self._global_path.exists(): + self._global = self._create(self._global_path) + if TYPE_CHECKING: + assert self._global + self._global.set(option, value) + elif configfile == ConfigFile.SYSTEM: + if not self._system_path.exists(): + self._system = self._create(self._system_path) + if TYPE_CHECKING: + assert self._system + self._system.set(option, value) + else: + # Shouldn't happen. + assert False, configfile + + @staticmethod + def _create(path: Path) -> _InternalCF: + path.parent.mkdir(parents=True, exist_ok=True) + path.touch(exist_ok=True) + ret = _InternalCF.from_path(path) + if TYPE_CHECKING: + assert ret + return ret + + def delete(self, option: str, + configfile: Optional[ConfigFile] = None) -> None: + '''Delete an option from the given file or files. + + If *option* is not set in the given *configfile*, KeyError is raised. + + :param option: option to delete, in 'foo.bar' form + :param configfile: If ConfigFile.ALL, delete *option* in all files + where it is set. + + If None, delete *option* only in the highest + precedence file where it is set. + + Otherwise, delete from the given ConfigFile. + ''' + + if configfile == ConfigFile.ALL or configfile is None: + found = False + for cf in [self._local, self._global, self._system]: + if cf and option in cf: + cf.delete(option) + if configfile is None: + return + found = True + if not found: + raise KeyError(option) + elif configfile == ConfigFile.LOCAL: + if not self._local: + raise KeyError(option) + self._local.delete(option) + elif configfile == ConfigFile.GLOBAL: + if not self._global: + raise KeyError(option) + self._global.delete(option) + elif configfile == ConfigFile.SYSTEM: + if not self._system: + raise KeyError(option) + self._system.delete(option) + else: + raise RuntimeError(f'bad configfile {configfile}') + + def _copy_to_configparser(self, cp: configparser.ConfigParser) -> None: + # Internal API for main to use to maintain backwards + # compatibility for existing extensions using the legacy + # function-and-global-state APIs. + + def load(cf: _InternalCF): + for section, contents in cf.cp.items(): + if section == 'DEFAULT': + continue + if section not in cp: + cp.add_section(section) + for key, value in contents.items(): + cp[section][key] = value + + if self._system: + load(self._system) + if self._global: + load(self._global) + if self._local: + load(self._local) + + def items(self, configfile: ConfigFile = ConfigFile.ALL + ) -> Iterable[Tuple[str, Any]]: + '''Iterator of option, value pairs.''' + if configfile == ConfigFile.ALL: + ret = {} + ret.update(self._system_as_dict) + ret.update(self._global_as_dict) + ret.update(self._local_as_dict) + return ret.items() + + if configfile == ConfigFile.SYSTEM: + return self._system_as_dict.items() + + if configfile == ConfigFile.GLOBAL: + return self._global_as_dict.items() + + if configfile == ConfigFile.LOCAL: + return self._local_as_dict.items() + + raise RuntimeError(configfile) + + @property + def _system_as_dict(self): + return self._cf_to_dict(self._system) + + @property + def _global_as_dict(self): + return self._cf_to_dict(self._global) + + @property + def _local_as_dict(self): + return self._cf_to_dict(self._local) + + @staticmethod + def _cf_to_dict(cf: Optional[_InternalCF]) -> Dict[str, Any]: + ret: Dict[str, Any] = {} + if cf is None: + return ret + for section, contents in cf.cp.items(): + if section == 'DEFAULT': + continue + for key, value in contents.items(): + ret[f'{section}.{key}'] = value + return ret + + +# Configuration values. +# +# Initially empty, populated in read_config(). Always having this available is +# nice in case something checks configuration values before the configuration +# file has been read (e.g. the log.py functions, to check color settings, and +# tests). +config = _configparser() + def read_config(configfile: Optional[ConfigFile] = None, config: configparser.ConfigParser = config, topdir: Optional[PathType] = None) -> None: