Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions verifiers/envs/sandbox_env.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import asyncio
import functools
import logging
import sys
import time
from typing import Any
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable

from verifiers.utils.thread_utils import (
get_or_create_thread_attr,
get_or_create_thread_loop,
)

if sys.version_info < (3, 12):
from typing_extensions import TypedDict
Expand All @@ -28,6 +36,57 @@
)


class ThreadedAsyncSandboxClient:
"""
Mirrors AsyncSandboxClient's interface but dispatches each method call to a
ThreadPoolExecutor where each thread maintains its own client via
thread-local storage.
"""

def __init__(
self,
max_workers: int = 100,
max_connections: int = 100,
max_keepalive_connections: int = 50,
**client_kwargs,
):
"""Initialize the threaded sandbox client."""
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self.executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="sandbox-client-executor",
)
self.client_kwargs = {
"max_connections": max_connections,
"max_keepalive_connections": max_keepalive_connections,
**client_kwargs,
}

def __getattr__(self, name: str) -> Callable[..., Any]:
"""Dynamically proxy attribute access to dispatch method calls to the thread pool."""

@functools.wraps(getattr(AsyncSandboxClient, name, lambda: None))
async def wrapper(*args, **kwargs):
def run_in_thread():
loop = get_or_create_thread_loop()
sandbox_client = get_or_create_thread_attr(
"sandbox_client",
AsyncSandboxClient,
**self.client_kwargs,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Thread-local storage key collision ignores different client configurations

The hardcoded key "sandbox_client" in get_or_create_thread_attr causes all ThreadedAsyncSandboxClient instances to share the same thread-local AsyncSandboxClient. When multiple instances are created with different client_kwargs (e.g., different max_connections values), the first instance to make a call on a thread creates the client with its config, and subsequent instances silently reuse that same client, ignoring their own configurations. The factory kwargs are only used on creation, not on retrieval.

Additional Locations (1)

Fix in Cursor Fix in Web

method = getattr(sandbox_client, name)
return loop.run_until_complete(method(*args, **kwargs))

loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, run_in_thread)

return wrapper

def teardown(self, wait: bool = True) -> None:
"""Teardown the thread pool executor."""
self.executor.shutdown(wait=wait)


class SandboxState(TypedDict):
ready: bool

Expand Down Expand Up @@ -59,14 +118,21 @@ def __init__(
max_backoff_seconds: float = 30.0,
jitter: float = 1e-3,
stop_errors: list[type[Exception]] | None = None,
sandbox_client_max_workers: int = 10,
sandbox_client_max_connections: int = 100,
sandbox_client_max_keepalive_connections: int = 50,
**kwargs,
):
super().__init__(
stop_errors=stop_errors if stop_errors is not None else [vf.SandboxError],
**kwargs,
)
self.timeout_per_command_seconds = timeout_per_command_seconds
self.sandbox_client = AsyncSandboxClient()
self.sandbox_client = ThreadedAsyncSandboxClient(
max_workers=sandbox_client_max_workers,
max_connections=sandbox_client_max_connections,
max_keepalive_connections=sandbox_client_max_keepalive_connections,
)
self.sandbox_request = CreateSandboxRequest(
name=sandbox_name,
docker_image=docker_image,
Expand Down Expand Up @@ -203,7 +269,7 @@ async def bulk_delete_sandboxes(self, global_ids: list[str]) -> None:
except Exception as e:
self.logger.error(f"Failed to bulk delete sandboxes {global_ids}: {e}")

@vf.teardown # type: ignore
@vf.teardown
async def teardown_sandboxes(self):
"""Delete all active sandboxes using sync client.

Expand All @@ -229,3 +295,7 @@ async def teardown_sandboxes(self):
self.logger.debug(f"Bulk deleted batch of {len(batch)} sandboxes")
except Exception as e:
self.logger.warning(f"Bulk delete failed for batch: {e}")

@vf.teardown
async def teardown_sandbox_client(self):
self.sandbox_client.teardown()
29 changes: 29 additions & 0 deletions verifiers/utils/thread_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
import threading
from typing import Any, Callable

THREAD_LOCAL_STORAGE = threading.local()


def get_thread_local_storage() -> threading.local:
"""Get the thread-local storage for the current thread."""
return THREAD_LOCAL_STORAGE


def get_or_create_thread_attr(
key: str, factory: Callable[..., Any], *args, **kwargs
) -> Any:
"""Get value from thread-local storage, creating it if it doesn't exist."""
thread_local = get_thread_local_storage()
value = getattr(thread_local, key, None)
if value is None:
value = factory(*args, **kwargs)
setattr(thread_local, key, value)
return value


def get_or_create_thread_loop() -> asyncio.AbstractEventLoop:
"""Get or create event loop for current thread. Reuses loop to avoid closing it."""
thread_local_loop = get_or_create_thread_attr("loop", asyncio.new_event_loop)
asyncio.set_event_loop(thread_local_loop)
return thread_local_loop
Loading