Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions jaf/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ async def _run_internal(
aggregated_text = ""
# Working array of partial tool calls
partial_tool_calls: List[Dict[str, Any]] = []
# Track usage data from streaming chunks
usage_data: Optional[Dict[str, int]] = None

async for chunk in get_stream(state, current_agent, config): # type: ignore[arg-type]
# Text deltas
Expand Down Expand Up @@ -589,6 +591,16 @@ async def _run_internal(
if args_delta:
target["function"]["arguments"] += args_delta

# Extract usage from raw chunk data (OpenAI sends it in the last chunk)
raw = getattr(chunk, "raw", None)
print(f"[JAF DEBUG] Chunk raw: {raw}")
if raw and isinstance(raw, dict):
chunk_usage = raw.get("usage")
print(f"[JAF DEBUG] Chunk usage: {chunk_usage}")
if chunk_usage:
usage_data = chunk_usage
print(f"[JAF DEBUG] Usage data captured: {usage_data}")

# Emit partial assistant message when something changed
if delta_text or tcd is not None:
assistant_event_streamed = True
Expand Down Expand Up @@ -653,8 +665,10 @@ async def _run_internal(
"message": {
"content": aggregated_text or None,
"tool_calls": final_tool_calls
}
},
"usage": usage_data
}
print(f"[JAF DEBUG] Final llm_response usage: {usage_data}")
except Exception:
# Fallback to non-streaming on error
assistant_event_streamed = False
Expand All @@ -664,11 +678,13 @@ async def _run_internal(

# Emit LLM call end event
if config.on_event:
event_usage = llm_response.get("usage")
print(f"[JAF DEBUG] Emitting llm_call_end with usage: {event_usage}")
config.on_event(LLMCallEndEvent(data=to_event_data(LLMCallEndEventData(
choice=llm_response,
trace_id=state.trace_id,
run_id=state.run_id,
usage=llm_response.get("usage")
usage=event_usage
))))

# Check if response has message
Expand Down
1 change: 1 addition & 0 deletions jaf/core/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ def collect(self, event: TraceEvent) -> None:

# Extract usage from the event data
usage = self._get_event_data(event, "usage", {})
print(f"[LANGFUSE DEBUG] Received usage from event: {usage}")

# Extract model information from choice data or event data
model = choice.get("model", "unknown")
Expand Down
9 changes: 8 additions & 1 deletion jaf/providers/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,10 @@ async def get_completion_stream(
if agent.output_codec:
request_params["response_format"] = {"type": "json_object"}

# Enable streaming
# Enable streaming with usage data
request_params["stream"] = True
request_params["stream_options"] = {"include_usage": True}
print(f"[LITELLM DEBUG] Streaming request with stream_options: {request_params.get('stream_options')}")

loop = asyncio.get_running_loop()
queue: asyncio.Queue = asyncio.Queue(maxsize=256)
Expand Down Expand Up @@ -382,6 +384,10 @@ def _producer():
delta = getattr(choice, "delta", None)
finish_reason = getattr(choice, "finish_reason", None)

# Debug: Check if chunk has usage
if raw_obj and 'usage' in raw_obj:
print(f"[LITELLM DEBUG] Found usage in chunk: {raw_obj.get('usage')}")

# Text content delta
if delta is not None:
content_delta = getattr(delta, "content", None)
Expand Down Expand Up @@ -672,6 +678,7 @@ async def get_completion_stream(
"model": model_name,
"messages": messages,
"stream": True,
"stream_options": {"include_usage": True},
**self.litellm_kwargs
}

Expand Down