Skip to content
22 changes: 21 additions & 1 deletion docs/evaluation.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,25 @@ api_client_type = "anthropic_messages"

Each endpoint entry supports an optional `api_client_type` field to select the client implementation (defaults to `"openai_chat_completions"`). Use `"anthropic_messages"` for Anthropic models when calling the Anthropic API directly.

To define equivalent replicas, add multiple `[[endpoint]]` entries with the same `endpoint_id`.
To define equivalent replicas, add multiple `[[endpoint]]` entries with the same `endpoint_id`. You can optionally set `max_concurrent` on each variant to limit how many requests it handles simultaneously:

```toml
[[endpoint]]
endpoint_id = "my-model"
model = "my-org/my-model"
url = "https://fast-host.example.com/v1"
key = "API_KEY"
max_concurrent = 64

[[endpoint]]
endpoint_id = "my-model"
model = "my-org/my-model"
url = "https://slow-host.example.com/v1"
key = "API_KEY"
max_concurrent = 16
```

When variants have `max_concurrent` set, the evaluator uses least-loaded dispatch: each request is routed to the variant with the most available capacity. Per-variant concurrency is all-or-nothing — either every variant in an endpoint group sets `max_concurrent`, or none does. When no variant has `max_concurrent`, requests are distributed round-robin and the global `--max-concurrent` flag controls concurrency.

Then use the alias directly:

Expand Down Expand Up @@ -145,6 +163,8 @@ Multiple rollouts per example enable metrics like pass@k and help measure varian

By default, scoring runs interleaved with generation. Use `--no-interleave-scoring` to score all rollouts after generation completes.

When per-variant `max_concurrent` limits are configured in the endpoint registry, the endpoint dispatcher manages concurrency globally across all variants and the `--max-concurrent` flag is ignored.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elastic mode feature undocumented in evaluation docs and skills

Low Severity

The PR adds a new user-facing elastic endpoint pool feature (with elastic, elastic_poll_interval, and endpoints_path config fields), but neither docs/evaluation.md nor skills/evaluate-environments/SKILL.md documents the elastic mode itself. Only per-variant max_concurrent is documented. Users have no documentation for how to enable or configure elastic polling.

Additional Locations (1)

Fix in Cursor Fix in Web

Triggered by project rule: BugBot Instructions


The `--max-retries` flag enables automatic retry with exponential backoff when rollouts fail due to transient infrastructure errors (e.g., sandbox timeouts, API failures).

### Output and Saving
Expand Down
13 changes: 10 additions & 3 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -701,9 +701,10 @@ class EndpointClientConfig(BaseModel):
max_keepalive_connections: int = 28000
max_retries: int = 10
extra_headers: dict[str, str] = {}
max_concurrent: int | None = None
```

Leaf endpoint configuration used inside `ClientConfig.endpoint_configs`. Has the same fields as `ClientConfig` except `endpoint_configs` itself, preventing recursive nesting.
Leaf endpoint configuration used inside `ClientConfig.endpoint_configs`. Has the same fields as `ClientConfig` except `endpoint_configs` itself, preventing recursive nesting. The optional `max_concurrent` field limits how many concurrent requests this variant handles; see [Per-Variant Concurrency](evaluation.md#concurrency).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EvalConfig docs missing new elastic pool fields

Low Severity

The EvalConfig section in docs/reference.md is missing the three new fields added by this PR: elastic, elastic_poll_interval, and endpoints_path. These are user-facing configuration options for the elastic endpoint pool feature. The documentation rule requires updating reference docs when core user-facing functionality is modified.

Additional Locations (1)

Fix in Cursor Fix in Web

Triggered by project rule: BugBot Instructions


### EvalConfig

Expand Down Expand Up @@ -733,11 +734,17 @@ class EvalConfig(BaseModel):
### Endpoint

```python
Endpoint = TypedDict("Endpoint", {"key": str, "url": str, "model": str})
class Endpoint(TypedDict, total=False):
key: str # required
url: str # required
model: str # required
api_client_type: ClientType
max_concurrent: int

Endpoints = dict[str, list[Endpoint]]
```

`Endpoints` maps an endpoint id to one or more endpoint variants. A single variant is represented as a one-item list.
`Endpoints` maps an endpoint id to one or more endpoint variants. A single variant is represented as a one-item list. The `max_concurrent` field enables per-variant concurrency limiting with least-loaded dispatch; if set on any variant in a group, it must be set on all.

---

Expand Down
16 changes: 16 additions & 0 deletions skills/evaluate-environments/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ model = "qwen/qwen3-32b-instruct"
url = "https://api.pinference.ai/api/v1"
key = "PRIME_API_KEY"
```
7. For multi-host setups, set `max_concurrent` on every variant to enable least-loaded dispatch:
```toml
[[endpoint]]
endpoint_id = "qwen3-235b"
model = "qwen/qwen3-235b"
url = "https://fast-host.example/v1"
key = "API_KEY"
max_concurrent = 64

[[endpoint]]
endpoint_id = "qwen3-235b"
model = "qwen/qwen3-235b"
url = "https://slow-host.example/v1"
key = "API_KEY"
max_concurrent = 16
```

## Publish Gate Before Large Runs
1. After smoke tests pass and results look stable, proactively suggest pushing the environment to Hub before large eval sweeps or RL work.
Expand Down
287 changes: 287 additions & 0 deletions tests/test_elastic_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
"""Tests for update_variants() and ElasticEndpointPool."""

from __future__ import annotations

import asyncio
from pathlib import Path

import pytest

from verifiers.types import ClientConfig, EndpointClientConfig
from verifiers.utils.async_utils import EndpointSlot, LeastLoadedDispatcher
from verifiers.utils.elastic import ElasticEndpointPool


def _make_config(url: str = "https://a.example/v1") -> ClientConfig:
return ClientConfig(api_base_url=url)


def _make_slot(url: str, max_concurrent: int = 4) -> EndpointSlot:
return EndpointSlot(config=_make_config(url), max_concurrent=max_concurrent)


# ---------------------------------------------------------------------------
# update_variants tests
# ---------------------------------------------------------------------------


class TestUpdateVariants:
@pytest.mark.asyncio
async def test_adds_new(self):
slot_a = _make_slot("https://a.example/v1")
dispatcher = LeastLoadedDispatcher([slot_a])

slot_b = _make_slot("https://b.example/v1")
added, removed = await dispatcher.update_variants(
[_make_slot("https://a.example/v1"), slot_b]
)

assert added == 1
assert removed == 0

@pytest.mark.asyncio
async def test_removes(self):
slot_a = _make_slot("https://a.example/v1")
slot_b = _make_slot("https://b.example/v1")
dispatcher = LeastLoadedDispatcher([slot_a, slot_b])

added, removed = await dispatcher.update_variants(
[_make_slot("https://a.example/v1")]
)

assert added == 0
assert removed == 1

@pytest.mark.asyncio
async def test_preserves_active(self):
slot_a = _make_slot("https://a.example/v1", max_concurrent=4)
dispatcher = LeastLoadedDispatcher([slot_a])

# Simulate in-flight request
async with dispatcher.acquire() as got:
assert got.active == 1

# Update with same URL — should preserve the slot object and active count
added, removed = await dispatcher.update_variants(
[_make_slot("https://a.example/v1", max_concurrent=8)]
)
assert added == 0
assert removed == 0
# Active count preserved on the original object
assert got.active == 1
# max_concurrent updated
assert got.max_concurrent == 8

@pytest.mark.asyncio
async def test_wakes_waiters(self):
slot_a = _make_slot("https://a.example/v1", max_concurrent=1)
dispatcher = LeastLoadedDispatcher([slot_a])

acquired = asyncio.Event()
unblocked = asyncio.Event()

async def holder():
async with dispatcher.acquire():
acquired.set()
# Hold slot while waiter tries to acquire
await unblocked.wait()

async def waiter():
await acquired.wait()
# This blocks because slot_a is full, but adding slot_b should unblock it
async with dispatcher.acquire() as got:
assert got.config.api_base_url == "https://b.example/v1"
unblocked.set()

holder_task = asyncio.create_task(holder())
waiter_task = asyncio.create_task(waiter())

await acquired.wait()
await asyncio.sleep(0.05) # let waiter block

# Add a new endpoint — should wake the waiter
await dispatcher.update_variants(
[
_make_slot("https://a.example/v1", max_concurrent=1),
_make_slot("https://b.example/v1", max_concurrent=1),
]
)

await asyncio.wait_for(waiter_task, timeout=2.0)
await holder_task

@pytest.mark.asyncio
async def test_rejects_empty(self):
dispatcher = LeastLoadedDispatcher([_make_slot("https://a.example/v1")])
with pytest.raises(ValueError, match="at least one variant"):
await dispatcher.update_variants([])


# ---------------------------------------------------------------------------
# ElasticEndpointPool tests
# ---------------------------------------------------------------------------


def _write_endpoints_toml(path: Path, entries: list[dict]) -> None:
"""Write a minimal endpoints.toml file."""
lines: list[str] = []
for entry in entries:
lines.append("[[endpoint]]")
for k, v in entry.items():
if isinstance(v, int):
lines.append(f'{k} = {v}')
else:
lines.append(f'{k} = "{v}"')
lines.append("")
path.write_text("\n".join(lines))


class TestElasticEndpointPool:
@pytest.mark.asyncio
async def test_reload_updates_dispatcher(self, tmp_path: Path):
toml_file = tmp_path / "endpoints.toml"
_write_endpoints_toml(
toml_file,
[
{
"endpoint_id": "my-ep",
"url": "https://a.example/v1",
"key": "KEY",
"model": "m1",
"max_concurrent": 4,
},
],
)

# Build initial dispatcher
slot = _make_slot("https://a.example/v1", max_concurrent=4)
dispatcher = LeastLoadedDispatcher([slot])

base_config = ClientConfig(
api_key_var="KEY",
api_base_url="https://a.example/v1",
endpoint_configs=[
EndpointClientConfig(
api_key_var="KEY",
api_base_url="https://a.example/v1",
max_concurrent=4,
)
],
)

pool = ElasticEndpointPool(
dispatcher=dispatcher,
endpoints_path=str(toml_file),
endpoint_id="my-ep",
poll_interval=1.0,
base_client_config=base_config,
)

# Now update the file with a second endpoint
_write_endpoints_toml(
toml_file,
[
{
"endpoint_id": "my-ep",
"url": "https://a.example/v1",
"key": "KEY",
"model": "m1",
"max_concurrent": 4,
},
{
"endpoint_id": "my-ep",
"url": "https://b.example/v1",
"key": "KEY",
"model": "m1",
"max_concurrent": 8,
},
],
)

await pool._reload()

# Dispatcher should now have 2 variants
assert len(dispatcher._variants) == 2

@pytest.mark.asyncio
async def test_reload_failure_keeps_previous(self, tmp_path: Path):
toml_file = tmp_path / "endpoints.toml"
_write_endpoints_toml(
toml_file,
[
{
"endpoint_id": "my-ep",
"url": "https://a.example/v1",
"key": "KEY",
"model": "m1",
"max_concurrent": 4,
},
],
)

slot = _make_slot("https://a.example/v1", max_concurrent=4)
dispatcher = LeastLoadedDispatcher([slot])

base_config = ClientConfig(
api_key_var="KEY",
api_base_url="https://a.example/v1",
endpoint_configs=[
EndpointClientConfig(
api_key_var="KEY",
api_base_url="https://a.example/v1",
max_concurrent=4,
)
],
)

pool = ElasticEndpointPool(
dispatcher=dispatcher,
endpoints_path=str(tmp_path / "nonexistent.toml"),
endpoint_id="my-ep",
poll_interval=1.0,
base_client_config=base_config,
)

# _reload should not raise — it logs a warning and keeps previous
await pool._reload()
assert len(dispatcher._variants) == 1

@pytest.mark.asyncio
async def test_start_stop(self, tmp_path: Path):
toml_file = tmp_path / "endpoints.toml"
_write_endpoints_toml(
toml_file,
[
{
"endpoint_id": "my-ep",
"url": "https://a.example/v1",
"key": "KEY",
"model": "m1",
"max_concurrent": 4,
},
],
)

slot = _make_slot("https://a.example/v1", max_concurrent=4)
dispatcher = LeastLoadedDispatcher([slot])

base_config = ClientConfig(
api_key_var="KEY",
api_base_url="https://a.example/v1",
)

pool = ElasticEndpointPool(
dispatcher=dispatcher,
endpoints_path=str(toml_file),
endpoint_id="my-ep",
poll_interval=0.05,
base_client_config=base_config,
)

assert pool._task is None
pool.start()
assert pool._task is not None
assert not pool._task.done()

await pool.stop()
assert pool._task is None
Loading
Loading