Skip to content

Commit

Permalink
Enrich logging through context vars (#452)
Browse files Browse the repository at this point in the history
The implementation makes use of python `contextvars.ContextVar` to store
experiment-specific state. The state is used to dynamically modify
experiment-level logging.

For example, this driver:

```py
exp1 = smartsim.Experiment('exp-1')
rs1 = exp1.create_runsettings(...)
model1 = exp1.create_model(..., rs1)

exp2 = smartsim.Experiment('other-exp')
rs2 = exp2.create_runsettings(...)
model2 = exp2.create_model(..., rs2)

exp1.start(model1)
exp1.start(model2)
```

Results in each experiment dynamically registering `logging.FileHandler`
instances that write logs to separate files:

- `/exp-1/.telemetry/smartsim/smartsim.out`
- `/other-exp/.telemetry/smartsim/smartsim.out`

### Key changes:

1. Decorated experiment API w/contextualizer to enrich log context
2. Create/Use `ContextThread` to ensure threads include current context
information
3. Create/Use `ContextAwareLogger` to dynamically add file handlers for
experiment logs
4. Updated manifest serialization to include paths to
experiment-specific log files
5. Added `LowPassFilter` to enable splitting experiment logs across
`xxx.out` and `xxx.err`

### Additional minor changes:

1. Moved `serialize.TELMON_SUBDIR` constant to `Config.telemetry_subdir`
to make it more universally available

---------

Co-authored-by: Matt Drozt <drozt@hpe.com>
Co-authored-by: Matt Drozt <matthew.drozt@gmail.com>

[ committed by @ankona ]
[ reviewed by @al-rigazzi @MattToast  ]
  • Loading branch information
ankona authored Jan 19, 2024
1 parent c38f73f commit f683521
Show file tree
Hide file tree
Showing 16 changed files with 544 additions and 81 deletions.
4 changes: 4 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,7 @@ def setup_test_colo(
assert colo_model.colocated
# Check to make sure that limit_db_cpus made it into the colo settings
return colo_model

@pytest.fixture
def config() -> smartsim._core.config.Config:
return CONFIG
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def has_ext_modules(_placeholder):
"types-tqdm",
"types-tensorflow==2.12.0.9",
"types-setuptools",
"typing_extensions>=4.9.0",
],
# see smartsim/_core/_install/buildenv.py for more details
**versions.ml_extras_required()
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/_cli/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def check_py_torch_version(versions: Versioner, device_in: _TDeviceStr = "cpu")
"Torch version not found in python environment. "
"Attempting to install via `pip`"
)
wheel_device = device if device == "cpu" else device_suffix.replace("+","")
wheel_device = device if device == "cpu" else device_suffix.replace("+", "")
pip(
"install",
"--extra-index-url",
Expand Down
4 changes: 4 additions & 0 deletions smartsim/_core/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def telemetry_enabled(self) -> bool:
def telemetry_cooldown(self) -> int:
return int(os.environ.get("SMARTSIM_TELEMETRY_COOLDOWN", 90))

@property
def telemetry_subdir(self) -> str:
return ".smartsim/telemetry"


@lru_cache(maxsize=128, typed=False)
def get_config() -> Config:
Expand Down
3 changes: 2 additions & 1 deletion smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,11 +842,11 @@ def _start_telemetry_monitor(self, exp_dir: str) -> None:
:param exp_dir: An experiment directory
:type exp_dir: str
"""
logger.debug("Starting telemetry monitor process")
if (
self._telemetry_monitor is None
or self._telemetry_monitor.returncode is not None
):
logger.debug("Starting telemetry monitor process")
cmd = [
sys.executable,
"-m",
Expand All @@ -866,6 +866,7 @@ def _start_telemetry_monitor(self, exp_dir: str) -> None:
cwd=str(pathlib.Path(__file__).parent.parent.parent),
shell=False,
)
logger.debug("Telemetry monitor started")


class _AnonymousBatchJob(EntityList[Model]):
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/control/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from ...database import Orchestrator
from ...entity import DBNode, EntitySequence, SmartSimEntity
from ...log import get_logger
from ...log import ContextThread, get_logger
from ...status import STATUS_NEVER_STARTED, TERMINAL_STATUSES
from ..config import CONFIG
from ..launcher import Launcher, LocalLauncher
Expand Down Expand Up @@ -80,7 +80,7 @@ def __init__(self, lock: RLock, launcher: t.Optional[Launcher] = None) -> None:

def start(self) -> None:
"""Start a thread for the job manager"""
self.monitor = Thread(name="JobManager", daemon=True, target=self.run)
self.monitor = ContextThread(name="JobManager", daemon=True, target=self.run)
self.monitor.start()

def run(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion smartsim/_core/control/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from ...database import Orchestrator
from ...entity import DBNode, Ensemble, EntitySequence, Model, SmartSimEntity
from ...error import SmartSimError
from ..config import CONFIG
from ..utils import helpers as _helpers
from ..utils import serialize as _serialize

Expand Down Expand Up @@ -343,7 +344,7 @@ def finalize(self) -> LaunchedManifest[_T]:
def _format_exp_telemetry_path(
exp_path: t.Union[str, "os.PathLike[str]"]
) -> pathlib.Path:
return pathlib.Path(exp_path, _serialize.TELMON_SUBDIR)
return pathlib.Path(exp_path, CONFIG.telemetry_subdir)


def _format_run_telemetry_path(
Expand Down
8 changes: 5 additions & 3 deletions smartsim/_core/entrypoints/telemetrymonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from smartsim._core.launcher.slurm.slurmLauncher import SlurmLauncher
from smartsim._core.launcher.stepInfo import StepInfo
from smartsim._core.utils.helpers import get_ts
from smartsim._core.utils.serialize import MANIFEST_FILENAME, TELMON_SUBDIR
from smartsim._core.utils.serialize import MANIFEST_FILENAME
from smartsim.error.errors import SmartSimError
from smartsim.status import STATUS_COMPLETED, TERMINAL_STATUSES

Expand Down Expand Up @@ -582,7 +582,7 @@ def main(
poll for new jobs before attempting to shutdown
:type cooldown_duration: int
"""
manifest_relpath = pathlib.Path(TELMON_SUBDIR) / MANIFEST_FILENAME
manifest_relpath = pathlib.Path(CONFIG.telemetry_subdir) / MANIFEST_FILENAME
manifest_path = experiment_dir / manifest_relpath
monitor_pattern = str(manifest_relpath)

Expand Down Expand Up @@ -667,7 +667,9 @@ def get_parser() -> argparse.ArgumentParser:
log.setLevel(logging.DEBUG)
log.propagate = False

log_path = os.path.join(args.exp_dir, TELMON_SUBDIR, "telemetrymonitor.log")
log_path = os.path.join(
args.exp_dir, CONFIG.telemetry_subdir, "telemetrymonitor.log"
)
fh = logging.FileHandler(log_path, "a")
log.addHandler(fh)

Expand Down
6 changes: 3 additions & 3 deletions smartsim/_core/launcher/taskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import time
import typing as t
from subprocess import PIPE
from threading import RLock, Thread
from threading import RLock

import psutil

from ...error import LauncherError
from ...log import get_logger
from ...log import ContextThread, get_logger
from ..utils.helpers import check_dev_log_level
from .util.shell import execute_async_cmd, execute_cmd

Expand Down Expand Up @@ -74,7 +74,7 @@ def start(self) -> None:
The TaskManager is run as a daemon thread meaning
that it will die when the main thread dies.
"""
monitor = Thread(name="TaskManager", daemon=True, target=self.run)
monitor = ContextThread(name="TaskManager", daemon=True, target=self.run)
monitor.start()

def run(self) -> None:
Expand Down
5 changes: 4 additions & 1 deletion smartsim/_core/utils/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
TStepLaunchMetaData = t.Tuple[
t.Optional[str], t.Optional[str], t.Optional[bool], str, str, Path
]
TELMON_SUBDIR: t.Final[str] = ".smartsim/telemetry"

MANIFEST_FILENAME: t.Final[str] = "manifest.json"

_LOGGER = smartsim.log.get_logger(__name__)
Expand All @@ -58,6 +58,7 @@ def save_launch_manifest(manifest: _Manifest[TStepLaunchMetaData]) -> None:
return

manifest.metadata.run_telemetry_subdirectory.mkdir(parents=True, exist_ok=True)
exp_out, exp_err = smartsim.log.get_exp_log_paths()

new_run = {
"run_id": manifest.metadata.run_id,
Expand Down Expand Up @@ -87,6 +88,8 @@ def save_launch_manifest(manifest: _Manifest[TStepLaunchMetaData]) -> None:
"name": manifest.metadata.exp_name,
"path": manifest.metadata.exp_path,
"launcher": manifest.metadata.launcher_name,
"out_file": str(exp_out),
"err_file": str(exp_err),
},
"runs": [new_run],
}
Expand Down
32 changes: 27 additions & 5 deletions smartsim/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,23 @@
from .database import Orchestrator
from .entity import Ensemble, Model, SmartSimEntity
from .error import SmartSimError
from .log import get_logger
from .log import ctx_exp_path, get_logger, method_contextualizer
from .settings import Container, base, settings
from .wlm import detect_launcher

logger = get_logger(__name__)


def _exp_path_map(exp: "Experiment") -> str:
"""Mapping function for use by method contextualizer to place the path of
the currently-executing experiment into context for log enrichment"""
return exp.exp_path


_contextualize = method_contextualizer(ctx_exp_path, _exp_path_map)


# pylint: disable=no-self-use
class Experiment:
"""Experiments are the Python user interface for SmartSim.
Expand Down Expand Up @@ -123,7 +133,7 @@ def __init__(
if not osp.isdir(osp.abspath(exp_path)):
raise NotADirectoryError("Experiment path provided does not exist")
exp_path = osp.abspath(exp_path)
self.exp_path = init_default(osp.join(getcwd(), name), exp_path, str)
self.exp_path: str = init_default(osp.join(getcwd(), name), exp_path, str)

if launcher == "auto":
launcher = detect_launcher()
Expand All @@ -132,6 +142,7 @@ def __init__(
self._launcher = launcher.lower()
self.db_identifiers: t.Set[str] = set()

@_contextualize
def start(
self,
*args: t.Any,
Expand Down Expand Up @@ -205,6 +216,7 @@ def start(
logger.error(e)
raise

@_contextualize
def stop(self, *args: t.Any) -> None:
"""Stop specific instances launched by this ``Experiment``
Expand Down Expand Up @@ -241,6 +253,7 @@ def stop(self, *args: t.Any) -> None:
logger.error(e)
raise

@_contextualize
def generate(
self,
*args: t.Any,
Expand Down Expand Up @@ -278,6 +291,7 @@ def generate(
logger.error(e)
raise

@_contextualize
def poll(
self, interval: int = 10, verbose: bool = True, kill_on_interrupt: bool = True
) -> None:
Expand Down Expand Up @@ -321,6 +335,7 @@ def poll(
logger.error(e)
raise

@_contextualize
def finished(self, entity: SmartSimEntity) -> bool:
"""Query if a job has completed.
Expand All @@ -344,6 +359,7 @@ def finished(self, entity: SmartSimEntity) -> bool:
logger.error(e)
raise

@_contextualize
def get_status(self, *args: t.Any) -> t.List[str]:
"""Query the status of launched instances
Expand Down Expand Up @@ -382,8 +398,9 @@ def get_status(self, *args: t.Any) -> t.List[str]:
logger.error(e)
raise

@staticmethod
@_contextualize
def create_ensemble(
self,
name: str,
params: t.Optional[t.Dict[str, t.Any]] = None,
batch_settings: t.Optional[base.BatchSettings] = None,
Expand Down Expand Up @@ -456,8 +473,9 @@ def create_ensemble(
logger.error(e)
raise

@staticmethod
@_contextualize
def create_model(
self,
name: str,
run_settings: base.RunSettings,
params: t.Optional[t.Dict[str, t.Any]] = None,
Expand Down Expand Up @@ -553,7 +571,6 @@ def create_model(
"""
path = init_default(getcwd(), path, str)

# mcb
if path is None:
path = getcwd()
if params is None:
Expand All @@ -570,6 +587,7 @@ def create_model(
logger.error(e)
raise

@_contextualize
def create_run_settings(
self,
exe: str,
Expand Down Expand Up @@ -634,6 +652,7 @@ class in SmartSim. If found, the class corresponding
logger.error(e)
raise

@_contextualize
def create_batch_settings(
self,
nodes: int = 1,
Expand Down Expand Up @@ -694,6 +713,7 @@ def create_batch_settings(
logger.error(e)
raise

@_contextualize
def create_database(
self,
port: int = 6379,
Expand Down Expand Up @@ -777,6 +797,7 @@ def create_database(
**kwargs,
)

@_contextualize
def reconnect_orchestrator(self, checkpoint: str) -> Orchestrator:
"""Reconnect to a running ``Orchestrator``
Expand All @@ -797,6 +818,7 @@ def reconnect_orchestrator(self, checkpoint: str) -> Orchestrator:
logger.error(e)
raise

@_contextualize
def summary(self, style: str = "github") -> str:
"""Return a summary of the ``Experiment``
Expand Down
Loading

0 comments on commit f683521

Please sign in to comment.