Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions ccproxy/llms/formatters/openai_to_anthropic/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,96 @@
from typing import Any

from ccproxy.core.constants import DEFAULT_MAX_TOKENS
from ccproxy.core.logging import get_logger
from ccproxy.llms.models import anthropic as anthropic_models
from ccproxy.llms.models import openai as openai_models


logger = get_logger(__name__)


def _sanitize_tool_results(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Remove orphaned tool_result blocks that don't have matching tool_use blocks.

The Anthropic API requires that each tool_result block must have a corresponding
tool_use block in the immediately preceding assistant message. This function removes
tool_result blocks that don't meet this requirement, converting them to text to
preserve information.

Args:
messages: List of Anthropic format messages

Returns:
Sanitized messages with orphaned tool_results removed or converted to text
"""
if not messages:
return messages

sanitized = []
for i, msg in enumerate(messages):
if msg.get("role") == "user" and isinstance(msg.get("content"), list):
# Find tool_use_ids from the immediately preceding assistant message
valid_tool_use_ids: set[str] = set()
if i > 0 and messages[i - 1].get("role") == "assistant":
prev_content = messages[i - 1].get("content", [])
if isinstance(prev_content, list):
for block in prev_content:
if isinstance(block, dict) and block.get("type") == "tool_use":
tool_id = block.get("id")
if tool_id:
valid_tool_use_ids.add(tool_id)

# Filter content blocks
new_content = []
orphaned_results = []
for block in msg["content"]:
if isinstance(block, dict) and block.get("type") == "tool_result":
tool_use_id = block.get("tool_use_id")
if tool_use_id in valid_tool_use_ids:
new_content.append(block)
else:
# Track orphaned tool_result for conversion to text
orphaned_results.append(block)
logger.warning(
"orphaned_tool_result_removed",
tool_use_id=tool_use_id,
valid_ids=list(valid_tool_use_ids),
message_index=i,
category="message_sanitization",
)
else:
new_content.append(block)

# Convert orphaned results to text block to preserve information
if orphaned_results:
orphan_text = "[Previous tool results from compacted history]\n"
for orphan in orphaned_results:
content = orphan.get("content", "")
if isinstance(content, list):
text_parts = []
for c in content:
if isinstance(c, dict) and c.get("type") == "text":
text_parts.append(c.get("text", ""))
content = "\n".join(text_parts)
# Truncate long content
content_str = str(content)
if len(content_str) > 500:
content_str = content_str[:500] + "..."
orphan_text += f"- Tool {orphan.get('tool_use_id', 'unknown')}: {content_str}\n"

# Add as text block at the beginning
new_content.insert(0, {"type": "text", "text": orphan_text})

# Update message content (only if we have content left)
if new_content:
sanitized.append({**msg, "content": new_content})
# If no content left, skip this message entirely
else:
sanitized.append(msg)

return sanitized


async def convert__openai_chat_to_anthropic_message__request(
request: openai_models.ChatCompletionRequest,
) -> anthropic_models.CreateMessageRequest:
Expand Down Expand Up @@ -165,6 +251,9 @@ async def convert__openai_chat_to_anthropic_message__request(
else:
out_messages.append({"role": "user", "content": content})

# Sanitize tool_result blocks to ensure they have matching tool_use blocks
out_messages = _sanitize_tool_results(out_messages)

payload_data: dict[str, Any] = {
"model": model,
"messages": out_messages,
Expand Down Expand Up @@ -581,4 +670,5 @@ def derive_thinking_config(
__all__ = [
"convert__openai_chat_to_anthropic_message__request",
"convert__openai_responses_to_anthropic_message__request",
"_sanitize_tool_results", # Exposed for testing
]
39 changes: 31 additions & 8 deletions ccproxy/llms/streaming/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,18 +421,33 @@ async def _process_chunk(
self.current_thinking_text = ""
self.current_thinking_signature = None

elif (
self.tool_calls
and self.enable_tool_calls
and self.output_format == "sse"
):
# Send completed tool calls (only for SSE format, dict format sends immediately)
for tool_call in self.tool_calls.values():
elif self.tool_calls and self.enable_tool_calls:
# Send completed tool calls for both SSE and dict formats
# Previous bug: Only sent for SSE format, causing dict format (SDK mode) to miss tool calls
logger.trace(
"openai_stream_sending_tool_calls",
tool_count=len(self.tool_calls),
output_format=self.output_format,
category="streaming_conversion",
)

for tool_call_index, (tool_call_id, tool_call) in enumerate(
self.tool_calls.items()
):
logger.trace(
"openai_stream_tool_call_yielding",
tool_call_id=tool_call_id,
tool_name=tool_call["name"],
has_arguments=bool(tool_call["arguments"]),
index=tool_call_index,
category="streaming_conversion",
)

yield self._format_chunk_output(
delta={
"tool_calls": [
{
"index": 0,
"index": tool_call_index,
"id": tool_call["id"],
"type": "function",
"function": {
Expand All @@ -446,6 +461,14 @@ async def _process_chunk(
}
)

# Clear tool_calls after yielding to prevent duplicates
logger.trace(
"openai_stream_clearing_tool_calls",
cleared_count=len(self.tool_calls),
category="streaming_conversion",
)
self.tool_calls.clear()

elif chunk_type == "message_delta":
# Usage information
usage = chunk_data.get("usage", {})
Expand Down
61 changes: 29 additions & 32 deletions ccproxy/plugins/claude_sdk/stream_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,49 @@ def __init__(
async def create_listener(self) -> AsyncIterator[Any]:
"""Create a new listener for this stream.

This method starts the worker on first listener and returns
an async iterator for consuming messages.
This method creates the worker if needed, pre-registers the listener,
then starts the worker. This ordering prevents race conditions where
fast STDIO tools could return results before the listener was ready.

Yields:
Messages from the stream
"""
# Start worker if needed
await self._ensure_worker_started()
# Create worker if needed (but don't start yet)
async with self._worker_lock:
if self._worker is None:
worker_id = f"{self.handle_id}-worker"
self._worker = StreamWorker(
worker_id=worker_id,
message_iterator=self._message_iterator,
session_id=self.session_id,
request_id=self.request_id,
session_client=self._session_client,
stream_handle=self,
)
logger.debug(
"stream_handle_worker_created",
handle_id=self.handle_id,
worker_id=worker_id,
session_id=self.session_id,
category="streaming",
)

if not self._worker:
raise RuntimeError("Failed to start stream worker")
raise RuntimeError("Failed to create stream worker")

# Create listener
# Pre-register listener BEFORE starting worker
# This fixes the race condition where fast STDIO tools could
# return results before the listener was ready
queue = self._worker.get_message_queue()
listener = await queue.create_listener()
self._listeners[listener.listener_id] = listener

if self._first_listener_at is None:
self._first_listener_at = time.time()

# NOW start the worker (after listener is registered)
await self._worker.start()

logger.debug(
"stream_handle_listener_created",
handle_id=self.handle_id,
Expand Down Expand Up @@ -135,32 +158,6 @@ async def create_listener(self) -> AsyncIterator[Any]:
# Check if we should trigger cleanup
await self._check_cleanup()

async def _ensure_worker_started(self) -> None:
"""Ensure the worker is started, creating it if needed."""
async with self._worker_lock:
if self._worker is None:
# Create worker
worker_id = f"{self.handle_id}-worker"
self._worker = StreamWorker(
worker_id=worker_id,
message_iterator=self._message_iterator,
session_id=self.session_id,
request_id=self.request_id,
session_client=self._session_client,
stream_handle=self, # Pass self for message tracking
)

# Start worker
await self._worker.start()

logger.debug(
"stream_handle_worker_created",
handle_id=self.handle_id,
worker_id=worker_id,
session_id=self.session_id,
category="streaming",
)

async def _remove_listener(self, listener_id: str) -> None:
"""Remove a listener and clean it up.

Expand Down
13 changes: 6 additions & 7 deletions ccproxy/plugins/claude_sdk/stream_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ async def _run_worker(self) -> None:
self._total_messages += 1
self._last_message_time = time.time()

# Check if we have listeners
if await self._message_queue.has_listeners():
# Broadcast to all listeners
delivered_count = await self._message_queue.broadcast(message)
self._messages_delivered += delivered_count
# Always broadcast - the queue handles no-listeners case atomically
# Previous bug: Separate has_listeners() check was racy with fast STDIO tools
delivered_count = await self._message_queue.broadcast(message)

if delivered_count > 0:
self._messages_delivered += delivered_count
logger.trace(
"stream_worker_message_delivered",
worker_id=self.worker_id,
Expand All @@ -208,9 +208,8 @@ async def _run_worker(self) -> None:
total_messages=self._total_messages,
)
else:
# No listeners - discard message
# No listeners at broadcast time - message discarded
self._messages_discarded += 1

logger.trace(
"stream_worker_message_discarded",
worker_id=self.worker_id,
Expand Down
Loading