Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions components/clp-py-utils/clp_py_utils/profiling_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
"""
Profiling utilities for CLP query execution performance analysis.
This module provides lightweight profiling decorators using pyinstrument.
Profile outputs include:
- HTML files with interactive flame graphs and call trees.
- Text summaries showing call hierarchy and timing.
"""

import datetime
import functools
import inspect
import logging
import os
from pathlib import Path
from typing import Any, Callable, Tuple, TypeVar

from pyinstrument import Profiler

logger = logging.getLogger(__name__)

F = TypeVar("F", bound=Callable[..., Any])

Comment on lines +20 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Type‑hints modernisation and stricter types (Ruff UP035, annotations)

  • Import Callable from collections.abc.
  • Prefer built‑in generic tuple[str, str] over typing.Tuple.

Apply this diff:

-from typing import Any, Callable, Optional, Tuple, TypeVar
+from typing import Any, Optional, TypeVar
+from collections.abc import Callable
@@
-) -> Tuple[str, str]:
+) -> tuple[str, str]:

Also applies to: 17-17, 128-128

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 17,
20-24 and 128, update type imports and tuple annotations: replace Callable
imported from typing with Callable from collections.abc, remove typing.Tuple
usage and use built-in generic tuple[str, str] where applicable, and adjust any
related imports (drop unused typing names) so the file uses modern annotations
consistent with Ruff UP035; ensure F = TypeVar("F", bound=Callable[..., Any])
still references the new Callable import.

PROFILING_INTERVAL = 0.001


def profile(
section_name: str | None = None,
job_id_param: str = "job_id",
task_id_param: str = "task_id",
) -> Callable[[F], F]:
"""
Profiles function execution as decorator with automatic context extraction.
Output files are written to $CLP_LOGS_DIR/profiles/ (e.g., clp-package/var/log/query_worker/
profiles/).
:param section_name: Override for profile section name. If None, uses function name.
:param job_id_param: Parameter name to extract job_id from (default: "job_id").
Can use dot notation for attributes, e.g., "job.id".
:param task_id_param: Parameter name to extract task_id from (default: "task_id").
Can use dot notation for attributes, e.g., "task.id".
:return: Decorated function with profiling capabilities.
"""

def decorator(func: F) -> F:
name = section_name or func.__name__
is_async = inspect.iscoroutinefunction(func)

if is_async:

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if not _is_profiling_enabled():
return await func(*args, **kwargs)

# Profiling enabled: extract context and profile execution
job_id, task_id = _extract_context_from_args(
func, args, kwargs, job_id_param, task_id_param
)

profiler = Profiler(interval=PROFILING_INTERVAL)
try:
profiler.start()
except RuntimeError as e:
# Skip profiling this function to avoid conflicts
if "already a profiler running" in str(e):
logger.debug(
f"Skipping nested profiling for {name} "
f"(parent profiler already active)"
)
return await func(*args, **kwargs)
raise

try:
result = await func(*args, **kwargs)
return result
finally:
profiler.stop()
_save_profile(profiler, name, job_id, task_id)

return async_wrapper # type: ignore

@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if not _is_profiling_enabled():
return func(*args, **kwargs)

# Profiling enabled: extract context and profile execution
job_id, task_id = _extract_context_from_args(
func, args, kwargs, job_id_param, task_id_param
)

profiler = Profiler(interval=PROFILING_INTERVAL)
try:
profiler.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

CRITICAL: Black formatting failure blocks CI.

The pipeline reports a Black formatting check failed at this line. Run black components/clp-py-utils/clp_py_utils/profiling_utils.py locally to auto-format the file and commit the result.

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around line 97, Black
formatting failed at the call to profiler.start(); run the Black formatter on
this file (e.g., black components/clp-py-utils/clp_py_utils/profiling_utils.py),
ensure the file is auto-formatted and any resulting changes are committed, then
push the commit to satisfy the CI formatting check.

except RuntimeError as e:
# Skip profiling this function to avoid conflicts
if "already a profiler running" in str(e):
logger.debug(
f"Skipping nested profiling for {name} (parent profiler already active)"
)
return func(*args, **kwargs)
raise

try:
result = func(*args, **kwargs)
return result
finally:
profiler.stop()
_save_profile(profiler, name, job_id, task_id)

return sync_wrapper # type: ignore

return decorator


def _extract_context_from_args(
func: Callable,
args: tuple,
kwargs: dict,
job_id_param: str = "job_id",
task_id_param: str = "task_id",
) -> Tuple[str, str]:
"""
Extracts job_id and task_id from function arguments.
:param func: The function being profiled.
:param args: Positional arguments passed to the function.
:param kwargs: Keyword arguments passed to the function.
:param job_id_param: Name/path of the parameter containing job_id (default: "job_id").
:param task_id_param: Name/path of the parameter containing task_id (default: "task_id").
:return: Tuple of (job_id, task_id) as strings. Empty strings if not found.
"""
job_id = ""
task_id = ""

try:
# Get function signature
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())

def extract_value(param_spec: str) -> str:
"""Extract value from parameter, supporting dot notation for attributes."""
if not param_spec:
return ""

# Split on '.' to handle attribute access
parts = param_spec.split(".")
param_name = parts[0]

# Find the parameter value
value = None
if param_name in kwargs:
value = kwargs[param_name]
elif param_name in param_names:
idx = param_names.index(param_name)
if idx < len(args):
value = args[idx]

if value is None:
return ""

# Navigate through attributes if dot notation was used
for attr_name in parts[1:]:
if hasattr(value, attr_name):
value = getattr(value, attr_name)
else:
return ""

return str(value)

Comment on lines +144 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Optional: support dict-style context extraction

If callers pass dict-like objects, walk keys as well as attributes.

Apply this diff:

-            for attr_name in parts[1:]:
-                if hasattr(value, attr_name):
-                    value = getattr(value, attr_name)
-                else:
-                    return ""
+            for attr_name in parts[1:]:
+                if isinstance(value, dict) and attr_name in value:
+                    value = value[attr_name]
+                elif hasattr(value, attr_name):
+                    value = getattr(value, attr_name)
+                else:
+                    return ""
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def extract_value(param_spec: str) -> str:
"""Extract value from parameter, supporting dot notation for attributes."""
if not param_spec:
return ""
# Split on '.' to handle attribute access
parts = param_spec.split(".")
param_name = parts[0]
# Find the parameter value
value = None
if param_name in kwargs:
value = kwargs[param_name]
elif param_name in param_names:
idx = param_names.index(param_name)
if idx < len(args):
value = args[idx]
if value is None:
return ""
# Navigate through attributes if dot notation was used
for attr_name in parts[1:]:
if hasattr(value, attr_name):
value = getattr(value, attr_name)
else:
return ""
return str(value)
def extract_value(param_spec: str) -> str:
"""Extract value from parameter, supporting dot notation for attributes."""
if not param_spec:
return ""
# Split on '.' to handle attribute access
parts = param_spec.split(".")
param_name = parts[0]
# Find the parameter value
value = None
if param_name in kwargs:
value = kwargs[param_name]
elif param_name in param_names:
idx = param_names.index(param_name)
if idx < len(args):
value = args[idx]
if value is None:
return ""
# Navigate through attributes if dot notation was used
- for attr_name in parts[1:]:
- if hasattr(value, attr_name):
- value = getattr(value, attr_name)
- else:
for attr_name in parts[1:]:
if isinstance(value, dict) and attr_name in value:
value = value[attr_name]
elif hasattr(value, attr_name):
value = getattr(value, attr_name)
else:
return ""
return str(value)

# Extract job_id and task_id
job_id = extract_value(job_id_param)
task_id = extract_value(task_id_param)

except Exception as e:
logger.debug(f"Failed to extract context from {func.__name__}: {e}")

return job_id, task_id


def _is_profiling_enabled() -> bool:
"""
Checks if profiling is enabled.
TODO: Add `CLPConfig` mechanism to enable/disable profiling for each component.
:return: Whether the profiler is enabled.
"""
return False


def _save_profile(
profiler: Profiler, section_name: str, job_id: str = "", task_id: str = ""
) -> None:
"""
Saves profiler output to HTML and text formats. Generates .html and .txt files.
:param profiler: The pyinstrument Profiler object containing profiling data.
:param section_name: Name identifying this profiling section.
:param job_id: Optional job identifier for filename.
:param task_id: Optional task identifier for filename.
"""
try:
# Get the session for logging
session = profiler.last_session
if not session:
logger.debug(f"No profiling session for {section_name}")
return

duration = session.duration
sample_count = session.sample_count

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename_parts = [section_name]

if job_id:
filename_parts.append(f"job{job_id}")
if task_id:
filename_parts.append(f"task{task_id}")

filename_parts.append(timestamp)
base_filename = "_".join(filename_parts)

output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)

Comment on lines +226 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: handle missing CLP_LOGS_DIR to avoid crash when enabled

Path(os.getenv("CLP_LOGS_DIR")) raises a TypeError if the env var is unset. Provide a safe fallback and warn.

Apply this diff:

-        output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
+        logs_dir = os.getenv("CLP_LOGS_DIR")
+        if not logs_dir:
+            logger.warning("CLP_LOGS_DIR is not set; writing profiles to CWD ./profiles")
+            output_dir = Path.cwd() / "profiles"
+        else:
+            output_dir = Path(logs_dir) / "profiles"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)
logs_dir = os.getenv("CLP_LOGS_DIR")
if not logs_dir:
logger.warning("CLP_LOGS_DIR is not set; writing profiles to CWD ./profiles")
output_dir = Path.cwd() / "profiles"
else:
output_dir = Path(logs_dir) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)
🤖 Prompt for AI Agents
components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 233-235:
currently Path(os.getenv("CLP_LOGS_DIR")) will raise a TypeError if the env var
is unset; instead read the env var into a variable, check if it's None or empty,
log a warning (use the module logger or warnings.warn) and choose a safe
fallback directory (e.g. tempfile.mkdtemp(prefix="clp_logs_") or
os.getcwd()/".clp_logs"), then construct a Path from that string and call
output_dir.mkdir(exist_ok=True, parents=True); ensure the code uses the
checked/fallback path variable rather than calling Path on a possibly None
value.

# Save HTML with interactive visualization
html_path = output_dir / f"{base_filename}.html"
with open(html_path, "w", encoding="utf-8") as f:
f.write(profiler.output_html())

# Save human-readable text summary with call hierarchy
txt_path = output_dir / f"{base_filename}.txt"
with open(txt_path, "w", encoding="utf-8") as f:
# Header
f.write("=" * 80 + "\n")
f.write(f"CLP Query Profiling Report (pyinstrument)\n")
f.write(f"Section: {section_name}\n")
if job_id:
f.write(f"Job ID: {job_id}\n")
if task_id:
f.write(f"Task ID: {task_id}\n")
f.write(f"Timestamp: {timestamp}\n")
f.write("=" * 80 + "\n\n")
f.write(profiler.output_text(unicode=True, color=False))

Comment on lines +239 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Logging/text nits: remove stray f, use logger.exception, timezone

  • Remove the stray f-string with no placeholders.
  • Prefer logger.exception over error(..., exc_info=True).
  • Consider timezone-aware timestamps or time.time() for filenames.

Apply this diff:

-            f.write(f"CLP Query Profiling Report (pyinstrument)\n")
+            f.write("CLP Query Profiling Report (pyinstrument)\n")
@@
-        logger.info(
+        logger.info(
             f"Profile saved: {section_name} "
             f"(duration={duration:.6f}s, samples={sample_count}) "
             f"HTML={html_path}, TXT={txt_path}"
         )
@@
-    except Exception as e:
-        logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
+    except Exception:
+        logger.exception("Failed to save profile for %s", section_name)

Optionally:

-        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
+        timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H%M%S_%fZ")

Also applies to: 256-260, 263-263, 222-222

🧰 Tools
🪛 Ruff (0.14.0)

246-246: f-string without any placeholders

Remove extraneous f prefix

(F541)

logger.info(
f"Profile saved: {section_name} "
f"(duration={duration:.6f}s, samples={sample_count}) "
f"HTML={html_path}, TXT={txt_path}"
)

except Exception as e:
logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
Loading
Loading