Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions verifiers/envs/experimental/cli_agent_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
BackgroundJob,
BackgroundJobStatus,
CreateSandboxRequest,
SandboxClient,
)
from prime_sandboxes.core import APIClient
from prime_tunnel import Tunnel

import verifiers as vf
Expand Down Expand Up @@ -86,6 +88,7 @@ def __init__(
self.labels = labels
self.active_rollouts: dict[str, dict[str, Any]] = {}
self.intercepts: dict[str, dict[str, Any]] = {} # request_id -> intercept data
self.active_sandboxes: set[str] = set()
self.interception_server: Any = None
self.server_lock = asyncio.Lock()
self.server_runner: Any = None
Expand Down Expand Up @@ -151,6 +154,7 @@ async def setup_state(self, state: State) -> State:
)
sandbox = await sandbox_client.create(sandbox_request)
state["sandbox_id"] = sandbox.id
self.active_sandboxes.add(sandbox.id)
logger.debug(f"Created sandbox {sandbox.id}")
await sandbox_client.wait_for_creation(sandbox.id)

Expand Down Expand Up @@ -704,6 +708,31 @@ async def teardown_tunnel(self):
self.server_site = None
self.interception_server = None

@vf.teardown
async def teardown_sandboxes(self):
"""Delete all active sandboxes on teardown.

Uses the synchronous SandboxClient for teardown to avoid event loop issues
during signal handling and interpreter shutdown.
"""
if len(self.active_sandboxes) == 0:
return
logger.info(f"Deleting {len(self.active_sandboxes)} remaining sandboxes")

sync_client = SandboxClient(APIClient())
sandbox_ids = list(self.active_sandboxes)

batch_size = 100
for i in range(0, len(sandbox_ids), batch_size):
batch = sandbox_ids[i : i + batch_size]
try:
sync_client.bulk_delete(sandbox_ids=batch)
for sandbox_id in batch:
self.active_sandboxes.discard(sandbox_id)
logger.debug(f"Bulk deleted batch of {len(batch)} sandboxes")
except Exception as e:
logger.warning(f"Bulk delete failed for batch: {e}")

@vf.cleanup
async def cleanup_interception_context(self, state: State):
"""Cleanup interception context for rollout"""
Expand Down Expand Up @@ -766,6 +795,7 @@ async def destroy_sandbox(self, state: State):
try:
sandbox_client = AsyncSandboxClient()
await sandbox_client.delete(sandbox_id)
self.active_sandboxes.discard(sandbox_id)
logger.debug(f"Deleted sandbox {sandbox_id}")
except Exception as e:
logger.warning(f"Failed to delete sandbox {sandbox_id}: {e}")
Expand Down
Loading