-
Notifications
You must be signed in to change notification settings - Fork 504
add elastic endpoint pool for dynamic GPU scavengin #957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
51b05c7
a39017a
2338043
58dcaa5
05e7f0b
06c9ec0
042aea7
47e8607
a95ddd5
fbf26b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -701,9 +701,10 @@ class EndpointClientConfig(BaseModel): | |
| max_keepalive_connections: int = 28000 | ||
| max_retries: int = 10 | ||
| extra_headers: dict[str, str] = {} | ||
| max_concurrent: int | None = None | ||
| ``` | ||
|
|
||
| Leaf endpoint configuration used inside `ClientConfig.endpoint_configs`. Has the same fields as `ClientConfig` except `endpoint_configs` itself, preventing recursive nesting. | ||
| Leaf endpoint configuration used inside `ClientConfig.endpoint_configs`. Has the same fields as `ClientConfig` except `endpoint_configs` itself, preventing recursive nesting. The optional `max_concurrent` field limits how many concurrent requests this variant handles; see [Per-Variant Concurrency](evaluation.md#concurrency). | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EvalConfig docs missing new elastic pool fieldsLow Severity The Additional Locations (1)Triggered by project rule: BugBot Instructions |
||
|
|
||
| ### EvalConfig | ||
|
|
||
|
|
@@ -733,11 +734,17 @@ class EvalConfig(BaseModel): | |
| ### Endpoint | ||
|
|
||
| ```python | ||
| Endpoint = TypedDict("Endpoint", {"key": str, "url": str, "model": str}) | ||
| class Endpoint(TypedDict, total=False): | ||
| key: str # required | ||
| url: str # required | ||
| model: str # required | ||
| api_client_type: ClientType | ||
| max_concurrent: int | ||
|
|
||
| Endpoints = dict[str, list[Endpoint]] | ||
| ``` | ||
|
|
||
| `Endpoints` maps an endpoint id to one or more endpoint variants. A single variant is represented as a one-item list. | ||
| `Endpoints` maps an endpoint id to one or more endpoint variants. A single variant is represented as a one-item list. The `max_concurrent` field enables per-variant concurrency limiting with least-loaded dispatch; if set on any variant in a group, it must be set on all. | ||
|
|
||
| --- | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,287 @@ | ||
| """Tests for update_variants() and ElasticEndpointPool.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| from pathlib import Path | ||
|
|
||
| import pytest | ||
|
|
||
| from verifiers.types import ClientConfig, EndpointClientConfig | ||
| from verifiers.utils.async_utils import EndpointSlot, LeastLoadedDispatcher | ||
| from verifiers.utils.elastic import ElasticEndpointPool | ||
|
|
||
|
|
||
| def _make_config(url: str = "https://a.example/v1") -> ClientConfig: | ||
| return ClientConfig(api_base_url=url) | ||
|
|
||
|
|
||
| def _make_slot(url: str, max_concurrent: int = 4) -> EndpointSlot: | ||
| return EndpointSlot(config=_make_config(url), max_concurrent=max_concurrent) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # update_variants tests | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| class TestUpdateVariants: | ||
| @pytest.mark.asyncio | ||
| async def test_adds_new(self): | ||
| slot_a = _make_slot("https://a.example/v1") | ||
| dispatcher = LeastLoadedDispatcher([slot_a]) | ||
|
|
||
| slot_b = _make_slot("https://b.example/v1") | ||
| added, removed = await dispatcher.update_variants( | ||
| [_make_slot("https://a.example/v1"), slot_b] | ||
| ) | ||
|
|
||
| assert added == 1 | ||
| assert removed == 0 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_removes(self): | ||
| slot_a = _make_slot("https://a.example/v1") | ||
| slot_b = _make_slot("https://b.example/v1") | ||
| dispatcher = LeastLoadedDispatcher([slot_a, slot_b]) | ||
|
|
||
| added, removed = await dispatcher.update_variants( | ||
| [_make_slot("https://a.example/v1")] | ||
| ) | ||
|
|
||
| assert added == 0 | ||
| assert removed == 1 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_preserves_active(self): | ||
| slot_a = _make_slot("https://a.example/v1", max_concurrent=4) | ||
| dispatcher = LeastLoadedDispatcher([slot_a]) | ||
|
|
||
| # Simulate in-flight request | ||
| async with dispatcher.acquire() as got: | ||
| assert got.active == 1 | ||
|
|
||
| # Update with same URL — should preserve the slot object and active count | ||
| added, removed = await dispatcher.update_variants( | ||
| [_make_slot("https://a.example/v1", max_concurrent=8)] | ||
| ) | ||
| assert added == 0 | ||
| assert removed == 0 | ||
| # Active count preserved on the original object | ||
| assert got.active == 1 | ||
| # max_concurrent updated | ||
| assert got.max_concurrent == 8 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_wakes_waiters(self): | ||
| slot_a = _make_slot("https://a.example/v1", max_concurrent=1) | ||
| dispatcher = LeastLoadedDispatcher([slot_a]) | ||
|
|
||
| acquired = asyncio.Event() | ||
| unblocked = asyncio.Event() | ||
|
|
||
| async def holder(): | ||
| async with dispatcher.acquire(): | ||
| acquired.set() | ||
| # Hold slot while waiter tries to acquire | ||
| await unblocked.wait() | ||
|
|
||
| async def waiter(): | ||
| await acquired.wait() | ||
| # This blocks because slot_a is full, but adding slot_b should unblock it | ||
| async with dispatcher.acquire() as got: | ||
| assert got.config.api_base_url == "https://b.example/v1" | ||
| unblocked.set() | ||
|
|
||
| holder_task = asyncio.create_task(holder()) | ||
| waiter_task = asyncio.create_task(waiter()) | ||
|
|
||
| await acquired.wait() | ||
| await asyncio.sleep(0.05) # let waiter block | ||
|
|
||
| # Add a new endpoint — should wake the waiter | ||
| await dispatcher.update_variants( | ||
| [ | ||
| _make_slot("https://a.example/v1", max_concurrent=1), | ||
| _make_slot("https://b.example/v1", max_concurrent=1), | ||
| ] | ||
| ) | ||
|
|
||
| await asyncio.wait_for(waiter_task, timeout=2.0) | ||
| await holder_task | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_rejects_empty(self): | ||
| dispatcher = LeastLoadedDispatcher([_make_slot("https://a.example/v1")]) | ||
| with pytest.raises(ValueError, match="at least one variant"): | ||
| await dispatcher.update_variants([]) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # ElasticEndpointPool tests | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| def _write_endpoints_toml(path: Path, entries: list[dict]) -> None: | ||
| """Write a minimal endpoints.toml file.""" | ||
| lines: list[str] = [] | ||
| for entry in entries: | ||
| lines.append("[[endpoint]]") | ||
| for k, v in entry.items(): | ||
| if isinstance(v, int): | ||
| lines.append(f'{k} = {v}') | ||
| else: | ||
| lines.append(f'{k} = "{v}"') | ||
| lines.append("") | ||
| path.write_text("\n".join(lines)) | ||
|
|
||
|
|
||
| class TestElasticEndpointPool: | ||
| @pytest.mark.asyncio | ||
| async def test_reload_updates_dispatcher(self, tmp_path: Path): | ||
| toml_file = tmp_path / "endpoints.toml" | ||
| _write_endpoints_toml( | ||
| toml_file, | ||
| [ | ||
| { | ||
| "endpoint_id": "my-ep", | ||
| "url": "https://a.example/v1", | ||
| "key": "KEY", | ||
| "model": "m1", | ||
| "max_concurrent": 4, | ||
| }, | ||
| ], | ||
| ) | ||
|
|
||
| # Build initial dispatcher | ||
| slot = _make_slot("https://a.example/v1", max_concurrent=4) | ||
| dispatcher = LeastLoadedDispatcher([slot]) | ||
|
|
||
| base_config = ClientConfig( | ||
| api_key_var="KEY", | ||
| api_base_url="https://a.example/v1", | ||
| endpoint_configs=[ | ||
| EndpointClientConfig( | ||
| api_key_var="KEY", | ||
| api_base_url="https://a.example/v1", | ||
| max_concurrent=4, | ||
| ) | ||
| ], | ||
| ) | ||
|
|
||
| pool = ElasticEndpointPool( | ||
| dispatcher=dispatcher, | ||
| endpoints_path=str(toml_file), | ||
| endpoint_id="my-ep", | ||
| poll_interval=1.0, | ||
| base_client_config=base_config, | ||
| ) | ||
|
|
||
| # Now update the file with a second endpoint | ||
| _write_endpoints_toml( | ||
| toml_file, | ||
| [ | ||
| { | ||
| "endpoint_id": "my-ep", | ||
| "url": "https://a.example/v1", | ||
| "key": "KEY", | ||
| "model": "m1", | ||
| "max_concurrent": 4, | ||
| }, | ||
| { | ||
| "endpoint_id": "my-ep", | ||
| "url": "https://b.example/v1", | ||
| "key": "KEY", | ||
| "model": "m1", | ||
| "max_concurrent": 8, | ||
| }, | ||
| ], | ||
| ) | ||
|
|
||
| await pool._reload() | ||
|
|
||
| # Dispatcher should now have 2 variants | ||
| assert len(dispatcher._variants) == 2 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_reload_failure_keeps_previous(self, tmp_path: Path): | ||
| toml_file = tmp_path / "endpoints.toml" | ||
| _write_endpoints_toml( | ||
| toml_file, | ||
| [ | ||
| { | ||
| "endpoint_id": "my-ep", | ||
| "url": "https://a.example/v1", | ||
| "key": "KEY", | ||
| "model": "m1", | ||
| "max_concurrent": 4, | ||
| }, | ||
| ], | ||
| ) | ||
|
|
||
| slot = _make_slot("https://a.example/v1", max_concurrent=4) | ||
| dispatcher = LeastLoadedDispatcher([slot]) | ||
|
|
||
| base_config = ClientConfig( | ||
| api_key_var="KEY", | ||
| api_base_url="https://a.example/v1", | ||
| endpoint_configs=[ | ||
| EndpointClientConfig( | ||
| api_key_var="KEY", | ||
| api_base_url="https://a.example/v1", | ||
| max_concurrent=4, | ||
| ) | ||
| ], | ||
| ) | ||
|
|
||
| pool = ElasticEndpointPool( | ||
| dispatcher=dispatcher, | ||
| endpoints_path=str(tmp_path / "nonexistent.toml"), | ||
| endpoint_id="my-ep", | ||
| poll_interval=1.0, | ||
| base_client_config=base_config, | ||
| ) | ||
|
|
||
| # _reload should not raise — it logs a warning and keeps previous | ||
| await pool._reload() | ||
| assert len(dispatcher._variants) == 1 | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_start_stop(self, tmp_path: Path): | ||
| toml_file = tmp_path / "endpoints.toml" | ||
| _write_endpoints_toml( | ||
| toml_file, | ||
| [ | ||
| { | ||
| "endpoint_id": "my-ep", | ||
| "url": "https://a.example/v1", | ||
| "key": "KEY", | ||
| "model": "m1", | ||
| "max_concurrent": 4, | ||
| }, | ||
| ], | ||
| ) | ||
|
|
||
| slot = _make_slot("https://a.example/v1", max_concurrent=4) | ||
| dispatcher = LeastLoadedDispatcher([slot]) | ||
|
|
||
| base_config = ClientConfig( | ||
| api_key_var="KEY", | ||
| api_base_url="https://a.example/v1", | ||
| ) | ||
|
|
||
| pool = ElasticEndpointPool( | ||
| dispatcher=dispatcher, | ||
| endpoints_path=str(toml_file), | ||
| endpoint_id="my-ep", | ||
| poll_interval=0.05, | ||
| base_client_config=base_config, | ||
| ) | ||
|
|
||
| assert pool._task is None | ||
| pool.start() | ||
| assert pool._task is not None | ||
| assert not pool._task.done() | ||
|
|
||
| await pool.stop() | ||
| assert pool._task is None |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elastic mode feature undocumented in evaluation docs and skills
Low Severity
The PR adds a new user-facing
elasticendpoint pool feature (withelastic,elastic_poll_interval, andendpoints_pathconfig fields), but neitherdocs/evaluation.mdnorskills/evaluate-environments/SKILL.mddocuments the elastic mode itself. Only per-variantmax_concurrentis documented. Users have no documentation for how to enable or configure elastic polling.Additional Locations (1)
skills/evaluate-environments/SKILL.md#L45-L61Triggered by project rule: BugBot Instructions