Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion verifiers/utils/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import inspect
from typing import AsyncContextManager, Callable, Optional
import logging
from time import perf_counter
from typing import Any, AsyncContextManager, Callable, Optional


async def maybe_await(func: Callable, *args, **kwargs):
Expand Down Expand Up @@ -34,3 +36,52 @@ async def maybe_semaphore(
return asyncio.Semaphore(limit)
else:
return NullAsyncContext()


class EventLoopLagMonitor:
"""A class to monitor how busy the main event loop is."""

def __init__(
self,
measure_interval: float = 0.1,
max_measurements: int = int(1e5),
logger: Any | None = None,
):
assert measure_interval > 0 and max_measurements > 0
self.measure_interval = measure_interval
self.max_measurements = max_measurements
self.logger = logger or logging.getLogger(
f"{__name__}.{self.__class__.__name__}"
)
self.lags = []
self.logger.info(
f"Event loop lag monitor initialized with measure_interval={self.measure_interval} and max_measurements={self.max_measurements}"
)

async def measure_lag(self):
"""Measures event loop lag by asynchronously sleeping for interval seconds"""
next_time = perf_counter() + self.measure_interval
await asyncio.sleep(self.measure_interval)
now = perf_counter()
lag = now - next_time
return lag

def get_lags(self) -> list[float]:
"""Get the list of measured event loop lags."""
return self.lags

def reset_lags(self):
"""Reset the list of measured event loop lags."""
self.lags = []

async def run(self):
"""Loop to measure event loop lag. Should be started as background task."""
while True:
lag = await self.measure_lag()
self.lags.append(lag)
if len(self.lags) > self.max_measurements:
self.lags.pop(0)

def run_in_background(self):
"""Run the event loop lag monitor as a background task."""
return asyncio.create_task(self.run())
28 changes: 25 additions & 3 deletions verifiers/utils/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import verifiers as vf
from verifiers.types import Endpoints, EvalConfig, GenerateMetadata, GenerateOutputs
from verifiers.utils.async_utils import EventLoopLagMonitor
from verifiers.utils.client_utils import setup_client
from verifiers.utils.error_utils import ErrorChain
from verifiers.utils.logging_utils import print_prompt_completions_sample, print_time
Expand Down Expand Up @@ -58,7 +59,11 @@ def load_endpoints(endpoints_path: str):
return endpoints


def print_results(results: GenerateOutputs, num_samples: int = 1):
def print_results(
results: GenerateOutputs,
event_loop_lags: list[float] | None = None,
num_samples: int = 1,
):
assert results["metadata"] is not None
print("--- Evaluation ---")
print(f"Environment: {results['metadata']['env_id']}")
Expand Down Expand Up @@ -117,7 +122,6 @@ def print_results(results: GenerateOutputs, num_samples: int = 1):
counter = Counter(error_chains)
for error_chain, count in counter.items():
print(f" - {repr(error_chain)}: {count / counter.total():.3f}")

generation_ms_arr = np.array(
[s["timing"]["generation_ms"] for s in results["state"]]
)
Expand All @@ -136,6 +140,17 @@ def print_results(results: GenerateOutputs, num_samples: int = 1):
print(
f"total: min - {print_time(float(np.min(total_arr)))}, mean - {print_time(float(np.mean(total_arr)))}, max - {print_time(float(np.max(total_arr)))}"
)
if event_loop_lags is not None:
print("Performance:")
event_loop_lags_arr = np.array(event_loop_lags)
med_lag, p90_lag, max_lag = (
np.median(event_loop_lags_arr),
np.percentile(event_loop_lags_arr, 90),
np.max(event_loop_lags_arr),
)
print(
f"event_loop_lag: med - {print_time(float(med_lag))}, p90 - {print_time(float(p90_lag))}, max - {print_time(float(max_lag))}"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty event loop lags array causes crash

The code checks if event_loop_lags is not None: but doesn't handle the case where event_loop_lags is an empty list. If the evaluation completes faster than the 0.1-second measure_interval before any lag measurements are recorded, get_lags() returns an empty list. Calling np.max(), np.median(), or np.percentile() on an empty numpy array raises a ValueError because these reduction operations cannot handle zero-size arrays. The condition needs to also check that the list is non-empty before computing statistics.

Fix in Cursor Fix in Web



async def run_evaluation(config: EvalConfig) -> GenerateOutputs:
Expand All @@ -150,6 +165,10 @@ async def run_evaluation(config: EvalConfig) -> GenerateOutputs:
# load environment
vf_env = vf.load_environment(env_id=config.env_id, **config.env_args)

# load event loop lag monitor
event_loop_lag_monitor = EventLoopLagMonitor()
event_loop_lag_monitor.run_in_background()

# set extra environment kwargs
if config.extra_env_kwargs:
logger.info(f"Setting extra environment kwargs: {config.extra_env_kwargs}")
Expand Down Expand Up @@ -178,8 +197,11 @@ async def run_evaluation(config: EvalConfig) -> GenerateOutputs:
)
end_time = time.time()
logger.info(f"Evaluation completed in {end_time - start_time:.2f} seconds")

event_loop_lags = event_loop_lag_monitor.get_lags()

if config.print_results:
print_results(results)
print_results(results, event_loop_lags)
if config.save_results:
save_rollout_results(results, config.save_to_hf_hub, config.hf_hub_dataset_name)
return results
Expand Down
Loading