Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
497 changes: 497 additions & 0 deletions examples/vault_migration.py

Large diffs are not rendered by default.

243 changes: 199 additions & 44 deletions navconfig/kardex.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import (
Any,
Dict,
List,
)
import os
import contextlib
Expand All @@ -24,22 +26,19 @@
## memcache:
try:
from .readers.memcache import mcache

MEMCACHE_LOADER = mcache
except ModuleNotFoundError:
MEMCACHE_LOADER = None
## redis:
try:
from .readers.redis import mredis

REDIS_LOADER = mredis
except ModuleNotFoundError:
REDIS_LOADER = None

## Hashicorp Vault:
try:
from .readers.vault import VaultReader

HVAULT_LOADER = VaultReader
except ModuleNotFoundError:
HVAULT_LOADER = None
Expand Down Expand Up @@ -69,16 +68,22 @@ def __init__(
# create the required directories:
self._create: bool = strtobool(os.getenv("CONFIG_CREATE", False))
self._auto_env: bool = strtobool(os.getenv("AUTO_DISCOVERY", "True"))

# Core components
self._site_path: Path = None
self._env_loader: Callable = None
self._ini: Callable = None
lazy_load = strtobool(os.getenv('LAZY_LOAD', 'False'))
self._current_env: str = None
# Cache for multiple environments
self._env_cache: Dict[str, Dict] = {}

# asyncio loop
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

# this only load at first time
if not site_root:
# TODO: better discovery of Project Root
Expand All @@ -89,45 +94,66 @@ def __init__(
else:
self._site_path = site_root
# then: configure the instance:
lazy_load = strtobool(os.getenv('LAZY_LOAD', 'False'))
if lazy_load is False:
self.configure(env, **kwargs)

def configure(
self,
env: str = None,
env_type: str = "file",
env_type: str = "vault",
override: bool = False
):
"""_summary_
"""
Configure Kardex with enhanced vault + file loading.

Args:
env (str, optional): Environment name (dev, prod).
Defaults to None.
env_type (str, optional): type of enviroment.
Defaults to "file".
override (bool, optional): override current .env variables.
Defaults to False.
env (str, optional): Environment name (dev, prod, staging).
env_type (str, optional): Loader type - defaults to "vault" (unified vault+file).
override (bool, optional): Override current environment variables.

Raises:
ConfigError: Error on Configuration.
"""
# Environment Configuration:
if env is not None:
self.ENV = env
self._current_env = env
else:
environment = os.getenv("ENV", "")
self.ENV = environment
self._current_env = environment
# getting type of environment consumer:
try:
self.load_environment(
env_type,
override=override
)
except FileNotFoundError:
logging.error(
"NavConfig Error: Environment (.env) File is Missing."
)
# Get External Readers:
logging.error("NavConfig Error: Environment configuration is missing.")
# Try fallback to file-only loading
if env_type == "vault":
logging.info("Falling back to file-only loading...")
try:
self.load_environment("file", override=override)
except Exception as err:
logging.error(f"Fallback loading also failed: {err}")
raise ConfigError(
"NavConfig Error: Unable to load environment configuration"
) from err
# Initialize external readers (redis, memcache, vault as reader)
self._init_external_readers()
# Load INI configuration
self._load_ini_config()
# Running Load PyProject:
self.load_pyproject()
# Defined as initialized:
self.__initialized__ = True

def _init_external_readers(self):
"""Initialize external readers (redis, memcache, vault as reader)."""

# Redis reader
self._use_redis: bool = strtobool(os.environ.get("USE_REDIS", False))
if self._use_redis and REDIS_LOADER:
try:
Expand All @@ -138,9 +164,9 @@ def configure(
except Exception as err:
logging.warning(f"Redis error: {err}")
raise ConfigError(str(err)) from err
self._use_memcache: bool = strtobool(
os.environ.get("USE_MEMCACHED", False)
)

# Memcache reader
self._use_memcache: bool = strtobool(os.environ.get("USE_MEMCACHED", False))
if self._use_memcache and MEMCACHE_LOADER:
try:
self._readers["memcache"] = MEMCACHE_LOADER()
Expand All @@ -149,52 +175,43 @@ def configure(
self._use_memcache = False
except Exception as err:
raise ConfigError(str(err)) from err
## Hashicorp Vault:
self._use_vault: bool = strtobool(os.environ.get("USE_VAULT", False))

# Vault as external reader (different from vault loader)
self._use_vault: bool = strtobool(os.environ.get("VAULT_ENABLED", False))
if self._use_vault and HVAULT_LOADER:
try:
self._readers["vault"] = HVAULT_LOADER(
env=self.ENV
)
self._readers["vault"] = HVAULT_LOADER(env=self.ENV)
except ReaderNotSet as err:
logging.error(f"{err}")
except Exception as err:
logging.warning(f"Vault error: {err}")
raise ConfigError(str(err)) from err
# define debug

def _load_ini_config(self):
"""Load INI configuration file."""
self._debug = bool(self.getboolean("DEBUG", fallback=False))
# and get the config file declared in the environment file
config_file = self.get("CONFIG_FILE", fallback=self._conffile)
self._ini = ConfigParser()

cf = Path(config_file)
if not cf.is_absolute():
cf = self._site_path.joinpath(config_file)
if not cf.exists():
# try ini file from etc/ directory.
cf = self._site_path.joinpath(self._conffile)

self._ini_path = cf
if cf.exists():
try:
self._ini.read(cf)
except IOError as err:
logging.exception(
f"NavConfig: INI file doesn't exist: {err}"
)
logging.exception(f"NavConfig: INI file doesn't exist: {err}")
except ParsingError as ex:
logging.exception(
f"Navconfig: unable to parse INI file: {ex}"
)
logging.exception(f"Navconfig: unable to parse INI file: {ex}")
else:
logging.warning(
f"Navconfig: INI file doesn't exists on path: {cf!s}"
)
logging.warning(f"Navconfig: INI file doesn't exists on path: {cf!s}")
if self._create is True:
with contextlib.suppress(IOError):
cf.mkdir(parents=True, exist_ok=True)
# Running Load PyProject:
self.load_pyproject()
# Defined as initialized:
self.__initialized__ = True
cf.parent.mkdir(parents=True, exist_ok=True)

@property
def initialized(self) -> bool:
Expand Down Expand Up @@ -251,7 +268,7 @@ def save_environment(self, env_type: str = "drive"):
if self._env_loader.downloadable is True:
self._env_loader.save_enviroment(env_path)

def load_environment(self, env_type: str = "file", override: bool = False):
def load_environment(self, env_type: str = "vault", override: bool = False):
"""load_environment.
Load an environment from a File or any pluggable Origin.
"""
Expand Down Expand Up @@ -546,8 +563,8 @@ def _unserialize(self, value: Any) -> str:
def set(self, key: str, value: Any) -> None:
"""
set.
Set an enviroment variable on REDIS, based on Strategy
TODO: add cloudpickle to serialize and unserialize data first.
Set an enviroment variable on REDIS, based on Strategy
TODO: add cloudpickle to serialize and unserialize data first.
"""
if key in self._mapping_:
self._mapping_[key] = value
Expand Down Expand Up @@ -601,3 +618,141 @@ def setext(
)
else:
return False

def set_env(self, new_env: str, reload: bool = True) -> bool:
"""
Switch environment at runtime.

Args:
new_env: Target environment (dev, prod, staging, etc.)
reload: Whether to reload configuration immediately

Returns:
bool: True if switch was successful
"""
if new_env == self._current_env:
logging.debug(f"Already in environment: {new_env}")
return True

old_env = self._current_env

try:
# Check cache first
if new_env in self._env_cache and not reload:
self._mapping_ = self._env_cache[new_env].copy()
self._current_env = new_env
self.ENV = new_env
logging.info(f"Switched to cached environment: {new_env}")
return True

# Switch environment in loader if supported
if hasattr(self._env_loader, 'set_environment'):
self._env_loader.set_environment(new_env)
self._current_env = new_env
self.ENV = new_env
else:
# Reinitialize loader for new environment
self._current_env = new_env
self.ENV = new_env
self.load_environment(override=False)

logging.info(f"Environment switched from {old_env} to {new_env}")
return True

except Exception as e:
# Rollback on error
self._current_env = old_env
self.ENV = old_env
logging.error(f"Failed to switch to environment {new_env}: {e}")
raise RuntimeError(f"Environment switch failed: {e}") from e

def get_current_env(self) -> str:
"""Get currently active environment."""
return self._current_env

def list_available_envs(self) -> List[str]:
"""List all available environments from filesystem."""
envs = set()

try:
env_base = self.site_root / "env"
if env_base.exists():
envs.update(d.name for d in env_base.iterdir() if d.is_dir())
except Exception as e:
logging.debug(f"Error scanning filesystem environments: {e}")

# Add any cached environments
envs.update(self._env_cache.keys())

return sorted(envs)

def get_env_info(self) -> Dict[str, Any]:
"""Get comprehensive information about current environment."""
info = {
'current_env': self._current_env,
'loader_type': type(self._env_loader).__name__ if self._env_loader else None,
'available_envs': self.list_available_envs(),
'cached_envs': list(self._env_cache.keys()),
'site_root': str(self.site_root),
'total_variables': len(self._mapping_),
}

# Add vault-specific information if available
if hasattr(self._env_loader, 'get_vault_status'):
info['vault_status'] = self._env_loader.get_vault_status()

# Add file loading information if available
if hasattr(self._env_loader, 'get_loaded_files'):
loaded_files = self._env_loader.get_loaded_files()
info['loaded_files'] = [str(f) for f in loaded_files]
info['file_count'] = len(loaded_files)

return info

def get_with_env(self, key: str, env: str = None, fallback: Any = None) -> Any:
"""
Get a variable from a specific environment without switching.

Usage:
prod_db = config.get_with_env('DATABASE_URL', 'prod')
"""
if env is None or env == self._current_env:
return self.get(key, fallback=fallback)

# Check cache first
if env in self._env_cache:
return self._env_cache[env].get(key, fallback)

# Load environment temporarily (simplified version)
try:
temp_env_path = self.site_root.joinpath("env", env)
if temp_env_path.exists():
from .loaders.vault import vaultLoader
temp_loader = vaultLoader(
env_path=temp_env_path,
env=env,
override=False,
create=False,
)
if temp_data := temp_loader.load_environment():
# Cache for future use
self._env_cache[env] = temp_data.copy()
return temp_data.get(key, fallback)

except Exception as e:
logging.debug(f"Failed to load environment {env}: {e}")

return fallback

def clear_env_cache(self, env: str = None):
"""Clear cached environment data."""
if env:
self._env_cache.pop(env, None)
logging.debug(f"Cleared cache for environment: {env}")
else:
self._env_cache.clear()
logging.debug("Cleared all environment cache")

def reload_current_env(self):
"""Reload current environment from source."""
self.set_env(self._current_env, reload=True)
Loading