Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 193 additions & 62 deletions src/nullscope/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
metrics when enabled via environment flags.
"""

import inspect
import logging
import os
import re
import time
from collections import deque
from collections.abc import Iterator
from contextlib import AbstractContextManager, contextmanager
from collections.abc import Callable
from contextlib import AbstractContextManager
from contextvars import ContextVar
from dataclasses import dataclass
from functools import wraps
from types import TracebackType
from typing import Any, Final, Protocol, TypeAlias, TypedDict, runtime_checkable
from typing import Any, Final, Protocol, TypeAlias, TypedDict, TypeVar, cast, runtime_checkable

log = logging.getLogger(__name__)

Expand All @@ -41,6 +43,14 @@
END_WALL_TIME_S: Final[str] = "end_wall_time_s"

_SCOPE_NAME_PATTERN = re.compile(r"^[a-z][a-z0-9_]*(?:\.[a-z][a-z0-9_]*)*$")
_REQUIRED_REPORTER_METHODS: Final[tuple[str, ...]] = ("record_timing", "record_metric")

F = TypeVar("F", bound=Callable[..., Any])


def _identity(func: F) -> F:
"""Identity function for no-op decorator (avoids lambda allocation)."""
return func


class _TelemetryMetadata(TypedDict, total=False):
Expand All @@ -61,6 +71,36 @@ def record_timing(self, scope: str, duration: float, **metadata: Any) -> None: .
def record_metric(self, scope: str, value: Any, **metadata: Any) -> None: ... # noqa: D102


def _validate_scope_name(name: str, *, kind: str) -> str:
"""Validate user-provided scope names with explicit errors."""
if not isinstance(name, str):
raise TypeError(f"{kind} must be a string, got {type(name).__name__}")
if not name:
raise ValueError(f"{kind} must be a non-empty string")
if _STRICT_SCOPES and not _SCOPE_NAME_PATTERN.fullmatch(name):
raise ValueError(
f"Invalid {kind} '{name}'. Expected dot-separated lowercase segments "
"using [a-z0-9_], e.g. 'http.request'.",
)
return name


def _validate_reporters(reporters: tuple[Any, ...]) -> None:
"""Check reporter shape early so failures are obvious at setup time."""
for index, reporter in enumerate(reporters):
missing = [
method
for method in _REQUIRED_REPORTER_METHODS
if not callable(getattr(reporter, method, None))
]
if missing:
missing_list = ", ".join(missing)
raise TypeError(
f"Reporter at position {index} ({type(reporter).__name__}) is invalid. "
f"Missing required callable method(s): {missing_list}.",
)


@dataclass(frozen=True, slots=True)
class _NoOpTelemetryContext:
"""An immutable and stateless no-op context, optimized for negligible overhead."""
Expand Down Expand Up @@ -95,6 +135,97 @@ def count(self, name: str, increment: int = 1, **metadata: Any) -> None:
def gauge(self, name: str, value: float, **metadata: Any) -> None:
pass

def timed(self, name: str, **metadata: Any) -> Callable[[F], F]: # noqa: ARG002
"""No-op decorator that returns the original function unchanged."""
return _identity

def flush(self) -> None:
pass

def shutdown(self) -> None:
pass


class _Scope:
"""Explicit context manager for scope timing (replaces @contextmanager for performance)."""

__slots__ = (
"_ctx",
"_scope_path",
"_metadata",
"_start_monotonic_s",
"_start_wall_time_s",
"_scope_token",
"_count_token",
)

def __init__(
self, ctx: "_EnabledTelemetryContext", name: str, metadata: dict[str, Any]
) -> None:
_validate_scope_name(name, kind="Scope name")
self._ctx = ctx
self._metadata = metadata

# Capture current state and compute new stack once
scope_stack = _scope_stack_var.get()
new_stack = (*scope_stack, name)
self._scope_path = ".".join(new_stack)

# Set up context for entry
self._scope_token = _scope_stack_var.set(new_stack)
self._count_token = _call_count_var.set(_call_count_var.get() + 1)

# Capture timing at construction (will be finalized in __enter__)
self._start_monotonic_s: float = 0.0
self._start_wall_time_s: float = 0.0

def __enter__(self) -> "_EnabledTelemetryContext":
self._start_monotonic_s = time.perf_counter()
self._start_wall_time_s = time.time()
return self._ctx

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
end_monotonic_s = time.perf_counter()
duration = end_monotonic_s - self._start_monotonic_s
end_wall_time_s = self._start_wall_time_s + duration

# Reset context vars
_scope_stack_var.reset(self._scope_token)
_call_count_var.reset(self._count_token)

# Capture final state for metadata
final_stack = _scope_stack_var.get()
final_count = _call_count_var.get()

# Build metadata (user metadata overrides built-in)
built: _TelemetryMetadata = {
"depth": len(final_stack),
"call_count": final_count,
"parent_scope": ".".join(final_stack) if final_stack else None,
"start_monotonic_s": self._start_monotonic_s,
"end_monotonic_s": end_monotonic_s,
"start_wall_time_s": self._start_wall_time_s,
"end_wall_time_s": end_wall_time_s,
}
enhanced_metadata: dict[str, Any] = {**built, **self._metadata}

# Report to all reporters
for reporter in self._ctx.reporters:
try:
reporter.record_timing(self._scope_path, duration, **enhanced_metadata)
except Exception as e:
log.error(
"Telemetry reporter '%s' failed: %s",
type(reporter).__name__,
e,
exc_info=True,
)


class _EnabledTelemetryContext:
"""Full-featured telemetry context when enabled."""
Expand All @@ -107,71 +238,16 @@ def __init__(self, *reporters: TelemetryReporter):
def __call__(
self, name: str, **metadata: Any
) -> AbstractContextManager["_EnabledTelemetryContext"]:
return self._create_scope(name, **metadata)

@contextmanager
def _create_scope(
self, name: str, **metadata: Any
) -> Iterator["_EnabledTelemetryContext"]:
"""The actual scope implementation when enabled."""
if not name or not isinstance(name, str):
raise ValueError("Scope name must be a non-empty string")
if _STRICT_SCOPES and not _SCOPE_NAME_PATTERN.fullmatch(name):
raise ValueError(
"Invalid scope name. Use lowercase letters, digits, and underscores; "
"segments separated by dots.",
)

scope_stack = _scope_stack_var.get()
call_count = _call_count_var.get()
scope_path = ".".join((*scope_stack, name))
start_monotonic_s = time.perf_counter()
start_wall_time_s = time.time()

scope_token = _scope_stack_var.set((*scope_stack, name))
count_token = _call_count_var.set(call_count + 1)

try:
yield self # Return self so chained methods work
finally:
end_monotonic_s = time.perf_counter()
duration = end_monotonic_s - start_monotonic_s
end_wall_time_s = start_wall_time_s + duration
_scope_stack_var.reset(scope_token)
_call_count_var.reset(count_token)

final_stack = _scope_stack_var.get()
final_count = _call_count_var.get()

built: _TelemetryMetadata = {
"depth": len(final_stack),
"call_count": final_count,
"parent_scope": ".".join(final_stack) if final_stack else None,
"start_monotonic_s": start_monotonic_s,
"end_monotonic_s": end_monotonic_s,
"start_wall_time_s": start_wall_time_s,
"end_wall_time_s": end_wall_time_s,
}
enhanced_metadata: dict[str, Any] = {**built, **metadata}

for reporter in self.reporters:
try:
reporter.record_timing(scope_path, duration, **enhanced_metadata)
except Exception as e:
log.error(
"Telemetry reporter '%s' failed: %s",
type(reporter).__name__,
e,
exc_info=True,
)
return _Scope(self, name, metadata)

def metric(self, name: str, value: Any, **metadata: Any) -> None:
"""Record a metric within current scope context."""
scope_stack = _scope_stack_var.get()
scope_path = ".".join((*scope_stack, name))
parent_scope = ".".join(scope_stack) if scope_stack else None
scope_path = f"{parent_scope}.{name}" if parent_scope else name
built: _TelemetryMetadata = {
"depth": len(scope_stack),
"parent_scope": ".".join(scope_stack) if scope_stack else None,
"parent_scope": parent_scope,
}
enhanced_metadata: dict[str, Any] = {**built, **metadata}
for reporter in self.reporters:
Expand Down Expand Up @@ -200,6 +276,53 @@ def gauge(self, name: str, value: float, **metadata: Any) -> None:
"""Record a gauge metric."""
self.metric(name, value, metric_type="gauge", **metadata)

def timed(self, name: str, **metadata: Any) -> Callable[[F], F]:
"""Decorator that times function execution under a fixed scope name."""
scope_name = _validate_scope_name(name, kind="Decorator scope name")

def decorator(func: F) -> F:
if inspect.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
with self(scope_name, **metadata):
return await func(*args, **kwargs)

return cast("F", async_wrapper)

@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with self(scope_name, **metadata):
return func(*args, **kwargs)

return cast("F", wrapper)

return decorator

def _call_reporter_hook(self, hook_name: str) -> None:
"""Call an optional lifecycle hook on all reporters."""
for reporter in self.reporters:
hook = getattr(reporter, hook_name, None)
if hook is None or not callable(hook):
continue
try:
hook()
except Exception as e:
log.error(
"Reporter '%s' failed during %s: %s",
type(reporter).__name__,
hook_name,
e,
exc_info=True,
)

def flush(self) -> None:
"""Flush reporters that implement optional lifecycle hooks."""
self._call_reporter_hook("flush")

def shutdown(self) -> None:
"""Shutdown reporters that implement optional lifecycle hooks."""
self._call_reporter_hook("shutdown")

@property
def is_enabled(self) -> bool:
return True
Expand All @@ -221,6 +344,8 @@ def TelemetryContext(*reporters: TelemetryReporter) -> TelemetryContextProtocol:
overhead.
"""
if _NULLSCOPE_ENABLED:
if reporters:
_validate_reporters(reporters)
reps = reporters or (SimpleReporter(),)
return _EnabledTelemetryContext(*reps)
# Always return the same, pre-existing no-op instance.
Expand Down Expand Up @@ -259,6 +384,12 @@ def reset(self) -> None:
self.timings.clear()
self.metrics.clear()

def flush(self) -> None:
"""No-op lifecycle method for API compatibility."""

def shutdown(self) -> None:
"""No-op lifecycle method for API compatibility."""

def as_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable snapshot of collected data (testing)."""
return {
Expand Down