Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions verifiers/envs/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@
Messages,
MessageType,
ModelResponse,
ProgressCallback,
TaskDoneCallback,
RolloutInput,
TaskStartCallback,
RolloutOutput,
RolloutTiming,
SamplingArgs,
Expand Down Expand Up @@ -975,8 +976,9 @@ async def generate(
independent_scoring: bool = False,
max_retries: int = 0,
on_start: StartCallback | None = None,
on_progress: ProgressCallback | None = None,
on_task_done: TaskDoneCallback | None = None,
on_log: LogCallback | None = None,
on_task_start: TaskStartCallback | None = None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed parameter breaks existing test caller

High Severity

Renaming the on_progress parameter to on_task_done in generate() breaks an existing test in tests/test_environment_extra.py (line 327) that still calls generate(on_progress=no_op). This will raise a TypeError at runtime since on_progress is no longer a valid keyword argument. The refactoring is incomplete — all callers need to be updated to use the new name.

Fix in Cursor Fix in Web

) -> GenerateOutputs:
"""
Generate rollouts for a set of inputs.
Expand Down Expand Up @@ -1029,7 +1031,7 @@ def default_on_start(
postfix=dict(reward="?"),
)

def default_on_progress(
def default_on_task_done(
all_outputs: list[RolloutOutput],
new_outputs: list[RolloutOutput],
new_metadata: GenerateMetadata,
Expand All @@ -1045,7 +1047,7 @@ def default_on_log(message: str) -> None:
self.logger.info(message)

on_start = on_start or cast(StartCallback, default_on_start)
on_progress = on_progress or cast(ProgressCallback, default_on_progress)
on_task_done = on_task_done or cast(TaskDoneCallback, default_on_task_done)
on_log = on_log or cast(LogCallback, default_on_log)

if isinstance(inputs, Dataset):
Expand Down Expand Up @@ -1142,6 +1144,20 @@ def get_client_for_group() -> AsyncOpenAI | ClientConfig:
on_log(f"Saving results to {builder.results_path}")

tasks: dict[asyncio.Task, int] = {}

def make_on_acquire(num_rollouts: int):
"""Create an on_acquire callback for a task.

Calls on_task_start with the number of rollouts that just
began executing (1 for independent scoring, len(group) for grouped).
"""

def on_acquire() -> None:
if on_task_start is not None:
on_task_start(num_rollouts)

return on_acquire

try:
# create tasks based on mode
if independent_scoring:
Expand All @@ -1158,6 +1174,7 @@ def get_client_for_group() -> AsyncOpenAI | ClientConfig:
max_retries=max_retries,
state_columns=state_columns,
),
on_acquire=make_on_acquire(1),
),
)
tasks[task] = i
Expand Down Expand Up @@ -1186,6 +1203,7 @@ def get_client_for_group() -> AsyncOpenAI | ClientConfig:
max_retries=max_retries,
state_columns=state_columns,
),
on_acquire=make_on_acquire(len(group_input)),
),
)
tasks[task] = i
Expand All @@ -1198,7 +1216,7 @@ def get_client_for_group() -> AsyncOpenAI | ClientConfig:
builder.add_outputs(new_outputs)
metadata = builder.build_metadata()

on_progress(builder.outputs, new_outputs, metadata)
on_task_done(builder.outputs, new_outputs, metadata)

# incrementally save outputs
if save_results:
Expand Down Expand Up @@ -1299,8 +1317,9 @@ async def evaluate(
independent_scoring: bool = False,
max_retries: int = 0,
on_start: StartCallback | None = None,
on_progress: ProgressCallback | None = None,
on_task_done: TaskDoneCallback | None = None,
on_log: LogCallback | None = None,
on_task_start: TaskStartCallback | None = None,
**kwargs,
) -> GenerateOutputs:
"""
Expand All @@ -1321,8 +1340,9 @@ async def evaluate(
independent_scoring=independent_scoring,
max_retries=max_retries,
on_start=on_start,
on_progress=on_progress,
on_task_done=on_task_done,
on_log=on_log,
on_task_start=on_task_start,
**kwargs,
)

Expand Down
2 changes: 1 addition & 1 deletion verifiers/gepa/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def do_nothing(*args, **kwargs) -> None:
max_concurrent=self.max_concurrent,
state_columns=self.state_columns,
on_start=do_nothing,
on_progress=do_nothing,
on_task_done=do_nothing,
)
)

Expand Down
5 changes: 4 additions & 1 deletion verifiers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,13 @@ def get(self, key: str, default: Any = None) -> Any:
StartCallback = Callable[
[list[RolloutInput], list[RolloutInput] | list[list[RolloutInput]]], None
]
ProgressCallback = Callable[
TaskDoneCallback = Callable[
[list[RolloutOutput], list[RolloutOutput], "GenerateMetadata"], None
] # all_outputs, new_outputs, new_metadata
LogCallback = Callable[[str], None] # log messages
TaskStartCallback = Callable[
[int], None
] # called with number of rollouts that just began executing


class GenerateMetadata(TypedDict):
Expand Down
17 changes: 15 additions & 2 deletions verifiers/utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@
T = TypeVar("T")


async def with_sem(sem: AsyncContextManager, coro: Coroutine[Any, Any, T]) -> T:
"""Wrap a coroutine with a context manager (typically a semaphore)."""
async def with_sem(
sem: AsyncContextManager,
coro: Coroutine[Any, Any, T],
on_acquire: Optional[Callable[[], None]] = None,
) -> T:
"""Wrap a coroutine with a context manager (typically a semaphore).

Args:
sem: Context manager (typically an asyncio.Semaphore) to acquire before running.
coro: The coroutine to run once the semaphore is acquired.
on_acquire: Optional callback invoked immediately after the semaphore is acquired,
before the wrapped coroutine starts executing. Useful for tracking in-flight work.
"""
try:
async with sem:
if on_acquire is not None:
on_acquire()
return await coro
finally:
# closes the coroutine if it was never awaited (e.g. cancelled while acquiring sem)
Expand Down
164 changes: 152 additions & 12 deletions verifiers/utils/eval_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,142 @@
from typing import Literal

from rich.columns import Columns
from rich.console import Group
from rich.console import Console, ConsoleOptions, Group, RenderResult
from rich.panel import Panel
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn
from rich.table import Table
from rich.progress import Progress, ProgressColumn, SpinnerColumn, Task, TextColumn
from rich.progress_bar import ProgressBar
from rich.segment import Segment
from rich.style import StyleType
from rich.table import Column, Table
from rich.text import Text

from verifiers.types import EvalConfig, GenerateOutputs, TokenUsage
from verifiers.utils.display_utils import BaseDisplay, format_numeric, make_aligned_row
from verifiers.utils.message_utils import format_messages

# Status colors — used for panel borders and progress bar segments
COLOR_PENDING = "dim"
COLOR_RUNNING = "yellow"
COLOR_COMPLETED = "green"
COLOR_FAILED = "red"


class _DualBar(ProgressBar):
"""Three-segment progress bar extending Rich's ProgressBar.

Adds an in-progress segment between the completed and remaining regions.
Inherits responsive width (__rich_measure__) and pulse animation from ProgressBar.
"""

def __init__(
self,
in_progress: float = 0,
in_progress_style: StyleType = COLOR_RUNNING,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.in_progress = in_progress
self.in_progress_style = in_progress_style

def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
width = min(self.width or options.max_width, options.max_width)
ascii = options.legacy_windows or options.ascii_only

if self.pulse or self.total is None:
yield from self._render_pulse(console, width, ascii=ascii)
return

bar = "-" if ascii else "━"
half_bar_right = " " if ascii else "╸"

total = self.total or 0
if total <= 0:
yield Segment(bar * width, console.get_style(self.style))
return

completed = min(total, max(0, self.completed))
in_prog = max(0, self.in_progress)

# Half-char precision (2x resolution), matching ProgressBar's approach
c_halves = int(width * 2 * completed / total)
p_halves = int(width * 2 * in_prog / total)

# Ensure at least 1 half-char for non-zero segments
if completed > 0 and c_halves == 0:
c_halves = 1
if in_prog > 0 and p_halves == 0:
p_halves = 1

# Clamp to total bar width
total_halves = width * 2
if c_halves + p_halves > total_halves:
p_halves = total_halves - c_halves

c_full, c_half = divmod(c_halves, 2)
p_full, p_half = divmod(p_halves, 2)

c_style = console.get_style(self.complete_style)
p_style = console.get_style(self.in_progress_style)
r_style = console.get_style(self.style)

# Completed segment
if c_full:
yield Segment(bar * c_full, c_style)
if c_half:
yield Segment(half_bar_right, c_style)

# In-progress segment
if p_full:
yield Segment(bar * p_full, p_style)
if p_half:
yield Segment(half_bar_right, p_style)

# Remaining segment
used = c_full + c_half + p_full + p_half
remaining = width - used
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dual-segment bar overflows width by one character

Medium Severity

The _DualBar rendering can overflow the declared width by one character when both the completed and in-progress segments have an odd number of half-chars that sum to total_halves. Each half-char () occupies a full terminal cell, so two trailing halves (one per segment) consume 2 cells but the halves-based clamping only budgets for 1 cell. For example, with width=40, total=100, completed=99, in_progress=1: c_halves=79, p_halves=1 yields used = 39+1+0+1 = 41 > 40. The character-position total needs to be checked and adjusted after divmod, not just the halves sum.

Fix in Cursor Fix in Web

if remaining > 0 and not console.no_color and console.color_system is not None:
yield Segment(bar * remaining, r_style)


class DualBarColumn(ProgressColumn):
"""Progress column showing completed, in-progress, and remaining segments.

Follows the same constructor pattern as Rich's BarColumn, with an additional
in_progress_style for the active-work segment.
"""

def __init__(
self,
bar_width: int | None = None,
style: StyleType = "bar.back",
complete_style: StyleType = COLOR_COMPLETED,
in_progress_style: StyleType = COLOR_RUNNING,
table_column: Column | None = None,
) -> None:
if table_column is None:
table_column = Column(no_wrap=True, ratio=2)
super().__init__(table_column=table_column)
self.bar_width = bar_width
self.style = style
self.complete_style = complete_style
self.in_progress_style = in_progress_style

def render(self, task: Task) -> _DualBar:
"""Render the dual-segment bar."""
return _DualBar(
total=max(0, task.total) if task.total is not None else None,
completed=max(0, task.completed),
in_progress=max(0, task.fields.get("in_progress", 0)),
width=None if self.bar_width is None else max(1, self.bar_width),
pulse=not task.started,
animation_time=task.get_time(),
style=self.style,
complete_style=self.complete_style,
in_progress_style=self.in_progress_style,
)


@dataclass
class EnvEvalState:
Expand All @@ -35,8 +161,11 @@ class EnvEvalState:
start_time: float | None = None
end_time: float | None = None

# updated by on_progress callback
# updated by on_progress / on_rollout_start callbacks
progress: int = 0 # completed rollouts
started: int = (
0 # rollouts that have acquired the semaphore (in-progress + completed)
)
total: int = 0 # total rollouts
num_examples: int = -1 # num examples (-1 means "all", updated by on_start)
rollouts_per_example: int = 1 # rollouts per example (from config)
Expand Down Expand Up @@ -204,6 +333,7 @@ def update_env_state(
env_idx: int,
status: Literal["pending", "running", "completed", "failed"] | None = None,
progress: int | None = None,
started: int | None = None,
total: int | None = None,
num_examples: int | None = None,
reward: float | None = None,
Expand All @@ -229,6 +359,9 @@ def update_env_state(
if progress is not None:
env_state.progress = progress

if started is not None:
env_state.started = started

if total is not None:
env_state.total = total

Expand Down Expand Up @@ -402,6 +535,7 @@ def fmt_concurrency(val: int) -> str:
# use env_state.total which gets updated by on_start callback
total_rollouts = env_state.total
completed_rollouts = env_state.progress # always rollout-based
in_progress_rollouts = max(0, env_state.started - env_state.progress)
pct = (completed_rollouts / total_rollouts * 100) if total_rollouts > 0 else 0

# format elapsed time
Expand All @@ -411,17 +545,23 @@ def fmt_concurrency(val: int) -> str:

# show "..." for total if not yet known
total_str = "..." if total_rollouts <= 0 else str(total_rollouts)

rollout_count_text = f"({completed_rollouts}/{total_str} rollouts)"

progress = Progress(
SpinnerColumn() if env_state.status == "running" else TextColumn(""),
BarColumn(bar_width=None),
DualBarColumn(bar_width=None),
TextColumn(f"[bold]{pct:.0f}%"),
TextColumn(f"({completed_rollouts}/{total_str} rollouts)"),
TextColumn(rollout_count_text),
TextColumn(f"| {time_str}"),
console=self.console,
expand=True,
)
task = progress.add_task(
"env", total=total_rollouts, completed=completed_rollouts
"env",
total=total_rollouts,
completed=completed_rollouts,
in_progress=in_progress_rollouts,
)
progress.update(task, completed=completed_rollouts)

Expand Down Expand Up @@ -467,12 +607,12 @@ def fmt_concurrency(val: int) -> str:

# border style based on status
border_styles = {
"pending": "dim",
"running": "yellow",
"completed": "green",
"failed": "red",
"pending": COLOR_PENDING,
"running": COLOR_RUNNING,
"completed": COLOR_COMPLETED,
"failed": COLOR_FAILED,
}
border_style = border_styles.get(env_state.status, "dim")
border_style = border_styles.get(env_state.status, COLOR_PENDING)

# build title with env name only
title = Text()
Expand Down
Loading
Loading