Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Use typescript autogen for backend types in frontend.
- Try to enforce using metric tools rather than downloading assets.
- Rule to avoid overvalidating.
- Adapt everything to vercel v5.

## [v0.10.0] - 2.10.2025

Expand Down
89 changes: 47 additions & 42 deletions backend/src/neuroagent/agent_routine.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,17 @@ async def astream(
active_agent = agent
content = await messages_to_openai_content(messages)
history = copy.deepcopy(content)
tool_map = {tool.name: tool for tool in agent.tools}

turns = 0
metadata_data = []

# In case of HIL, the start steps breaks Vercel and adds a new part.
if messages[-1].entity == Entity.USER:
yield f"data: {json.dumps({'type': 'start', 'messageId': f'msg_{uuid.uuid4().hex}'})}\n\n"

while turns <= max_turns:
# We need to redefine the tool map since the tools can change on agent switch.
tool_map = {tool.name: tool for tool in active_agent.tools}
# Force an AI message once max turns reached.
# I.e. we do a total number of turns of max_turns + 1
# The +1 being the final AI message.
Expand Down Expand Up @@ -269,11 +276,19 @@ async def astream(
turns += 1
draft_tool_calls: list[dict[str, str]] = []
draft_tool_calls_index = -1
text_id = f"text_{uuid.uuid4().hex}"
text_started = False
reasoning_id = f"text_{uuid.uuid4().hex}"
reasoning_started = False
async for chunk in completion:
for choice in chunk.choices:
if choice.finish_reason == "stop":
if choice.delta.content:
yield f"0:{json.dumps(choice.delta.content, separators=(',', ':'))}\n"
if not text_started:
yield f"data: {json.dumps({'type': 'text-start', 'id': text_id})}\n\n"
text_started = True

yield f"data: {json.dumps({'type': 'text-delta', 'id': text_id, 'delta': choice.delta.content})}\n\n"

elif choice.finish_reason == "tool_calls":
# Some models stream the whole tool call in one chunk.
Expand Down Expand Up @@ -306,12 +321,7 @@ async def astream(
)
except ValidationError:
args = input_args
tool_call_data = {
"toolCallId": draft_tool_call["id"],
"toolName": draft_tool_call["name"],
"args": args,
}
yield f"9:{json.dumps(tool_call_data, separators=(',', ':'))}\n"
yield f"data: {json.dumps({'type': 'tool-input-available', 'toolCallId': draft_tool_call['id'], 'toolName': draft_tool_call['name'], 'input': args})}\n\n"

# Check for tool calls
elif choice.delta.tool_calls:
Expand All @@ -333,11 +343,7 @@ async def astream(
draft_tool_calls.append(
{"id": id, "name": name, "arguments": ""} # type: ignore
)
tool_begin_data = {
"toolCallId": id,
"toolName": name,
}
yield f"b:{json.dumps(tool_begin_data, separators=(',', ':'))}\n"
yield f"data: {json.dumps({'type': 'tool-input-start', 'toolCallId': id, 'toolName': name})}\n\n"

if arguments:
current_id = (
Expand All @@ -346,36 +352,39 @@ async def astream(
"id"
]
)
args_data = {
"toolCallId": current_id,
"argsTextDelta": arguments,
}
yield f"c:{json.dumps(args_data, separators=(',', ':'))}\n"
yield f"data: {json.dumps({'type': 'tool-input-delta', 'toolCallId': current_id, 'inputTextDelta': arguments})}\n\n"
draft_tool_calls[draft_tool_calls_index][
"arguments"
] += arguments
elif (
hasattr(choice.delta, "reasoning")
and choice.delta.reasoning
):
yield f"g:{json.dumps(choice.delta.reasoning, separators=(',', ':'))}\n\n"
if not reasoning_started:
yield f"data: {json.dumps({'type': 'reasoning-start', 'id': reasoning_id})}\n\n"
reasoning_started = True

yield f"data: {json.dumps({'type': 'reasoning-delta', 'id': reasoning_id, 'delta': choice.delta.reasoning})}\n\n"

else:
if choice.delta.content is not None:
yield f"0:{json.dumps(choice.delta.content, separators=(',', ':'))}\n"
if not text_started:
yield f"data: {json.dumps({'type': 'text-start', 'id': text_id})}\n\n"
text_started = True

yield f"data: {json.dumps({'type': 'text-delta', 'id': text_id, 'delta': choice.delta.content})}\n\n"

delta_json = choice.delta.model_dump()
delta_json.pop("role", None)
merge_chunk(message, delta_json)

if chunk.choices == []:
finish_data = {
"finishReason": "tool-calls"
if len(draft_tool_calls) > 0
else "stop",
}
else:
finish_data = {"finishReason": "stop"}
if reasoning_started:
yield f"data: {json.dumps({'type': 'reasoning-end', 'id': reasoning_id})}\n\n"
reasoning_started = False

if text_started:
yield f"data: {json.dumps({'type': 'text-end', 'id': text_id})}\n\n"
text_started = False

message["tool_calls"] = list(message.get("tool_calls", {}).values())
if not message["tool_calls"]:
Expand Down Expand Up @@ -431,7 +440,7 @@ async def astream(
)

if not messages[-1].tool_calls:
yield f"e:{json.dumps(finish_data)}\n"
yield f"data: {json.dumps({'type': 'finish-step'})}\n\n"
break

# kick out tool calls that require HIL
Expand Down Expand Up @@ -472,13 +481,9 @@ async def astream(

# Before extending history, yield each tool response
for tool_response in tool_calls_executed.messages:
response_data = {
"toolCallId": tool_response["tool_call_id"],
"result": tool_response["content"],
}
yield f"a:{json.dumps(response_data, separators=(',', ':'))}\n"
yield f"data: {json.dumps({'type': 'tool-output-available', 'toolCallId': tool_response['tool_call_id'], 'output': tool_response['content']})}\n\n"

yield f"e:{json.dumps(finish_data)}\n"
yield f"data: {json.dumps({'type': 'finish-step'})}\n\n"

for tool_response in tool_calls_executed.messages:
# Check if an LLM has been called inside of the tool
Expand Down Expand Up @@ -529,24 +534,24 @@ async def astream(

# If the tool call response contains HIL validation, do not update anything and return
if tool_calls_with_hil:
annotation_data = [
metadata_data = [
{"toolCallId": msg.tool_call_id, "validated": "pending"}
for msg in tool_calls_with_hil
]

yield f"8:{json.dumps(annotation_data, separators=(',', ':'))}\n"
yield f"e:{json.dumps(finish_data)}\n"
yield f"data: {json.dumps({'type': 'finish-step'})}\n\n"
break

history.extend(tool_calls_executed.messages)
context_variables.update(tool_calls_executed.context_variables)
if tool_calls_executed.agent:
active_agent = tool_calls_executed.agent

done_data = {
"finishReason": "stop",
}
yield f"d:{json.dumps(done_data)}\n"
if metadata_data:
yield f"data: {json.dumps({'type': 'finish', 'messageMetadata': {'hil': metadata_data}})}\n\n"
else:
yield f"data: {json.dumps({'type': 'finish'})}\n\n"
yield "data: [DONE]\n\n"

# User interrupts streaming
except asyncio.exceptions.CancelledError:
Expand Down
105 changes: 50 additions & 55 deletions backend/src/neuroagent/app/app_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
utc_now,
)
from neuroagent.app.schemas import (
AnnotationMessageVercel,
AnnotationToolCallVercel,
MessagesRead,
MessagesReadVercel,
MetadataToolCallVercel,
PaginatedResponse,
RateLimitInfo,
ReasoningPartVercel,
TextPartVercel,
ToolCallPartVercel,
ToolCallVercel,
ToolMetadataDict,
)
from neuroagent.tools.base_tool import BaseTool
from neuroagent.utils import get_token_count, messages_to_openai_content
Expand Down Expand Up @@ -253,53 +252,50 @@ def format_messages_vercel(
"""Format db messages to Vercel schema."""
messages: list[MessagesReadVercel] = []
parts: list[TextPartVercel | ToolCallPartVercel | ReasoningPartVercel] = []
annotations: list[AnnotationMessageVercel | AnnotationToolCallVercel] = []
metadata: list[MetadataToolCallVercel] = []

for msg in reversed(db_messages):
if msg.entity in [Entity.USER, Entity.AI_MESSAGE]:
content = json.loads(msg.content)
text_content = content.get("content")
reasoning_content = content.get("reasoning")

# Optional reasoning
if reasoning_content:
parts.append(ReasoningPartVercel(reasoning=reasoning_content))

message_data = {
message_data: dict[str, Any] = {
"id": msg.message_id,
"role": "user" if msg.entity == Entity.USER else "assistant",
"createdAt": msg.creation_date,
"content": text_content,
"isComplete": msg.is_complete,
}

# add tool calls and reset buffer after attaching
if msg.entity == Entity.AI_MESSAGE:
if text_content:
parts.append(TextPartVercel(text=text_content))
if reasoning_content:
parts.append(ReasoningPartVercel(text=reasoning_content))

annotations.append(
AnnotationMessageVercel(
messageId=msg.message_id, isComplete=msg.is_complete
)
)
message_data["metadata"] = {"toolCalls": metadata}

message_data["parts"] = parts
message_data["annotations"] = annotations

# If we encounter a user message with a non empty buffer we have to add a dummy ai message.
elif parts:
messages.append(
MessagesReadVercel(
id=uuid.uuid4(),
role="assistant",
createdAt=msg.creation_date,
content="",
parts=parts,
annotations=annotations,
else:
if parts:
# If we encounter a user message with a non empty buffer we have to add a dummy ai message.
messages.append(
MessagesReadVercel(
id=uuid.uuid4(),
role="assistant",
createdAt=msg.creation_date,
parts=parts,
metadata=ToolMetadataDict(toolCalls=metadata),
isComplete=False,
)
)
)
# Normal User message (with empty buffer)
if text_content:
parts.append(TextPartVercel(text=text_content))

message_data["parts"] = parts
parts = []
annotations = []
metadata = []
messages.append(MessagesReadVercel(**message_data))

# Buffer tool calls until the next AI_MESSAGE
Expand All @@ -310,7 +306,7 @@ def format_messages_vercel(

# Add optional reasoning
if reasoning_content:
parts.append(ReasoningPartVercel(reasoning=reasoning_content))
parts.append(ReasoningPartVercel(text=reasoning_content))

for tc in msg.tool_calls:
requires_validation = tool_hil_mapping.get(tc.name, False)
Expand All @@ -323,19 +319,18 @@ def format_messages_vercel(
else:
status = "pending"

parts.append(TextPartVercel(text=text_content or ""))
if text_content:
parts.append(TextPartVercel(text=text_content))
parts.append(
ToolCallPartVercel(
toolInvocation=ToolCallVercel(
toolCallId=tc.tool_call_id,
toolName=tc.name,
args=json.loads(tc.arguments),
state="call",
)
toolCallId=tc.tool_call_id,
type=f"tool-{tc.name}",
input=json.loads(tc.arguments),
state="input-available",
)
)
annotations.append(
AnnotationToolCallVercel(
metadata.append(
MetadataToolCallVercel(
toolCallId=tc.tool_call_id,
validated=status, # type: ignore
isComplete=msg.is_complete,
Expand All @@ -347,28 +342,28 @@ def format_messages_vercel(
tool_call_id = json.loads(msg.content).get("tool_call_id")
tool_call = next(
(
part.toolInvocation
part
for part in parts
if isinstance(part, ToolCallPartVercel)
and part.toolInvocation.toolCallId == tool_call_id
and part.toolCallId == tool_call_id
),
None,
)
annotation = next(
if tool_call:
tool_call.output = json.loads(msg.content).get("content")
tool_call.state = "output-available"

met = next(
(
annotation
for annotation in annotations
if isinstance(annotation, AnnotationToolCallVercel)
and annotation.toolCallId == tool_call_id
met
for met in metadata
if isinstance(met, MetadataToolCallVercel)
and met.toolCallId == tool_call_id
),
None,
)
if tool_call:
tool_call.result = json.loads(msg.content).get("content")
tool_call.state = "result"

if annotation:
annotation.isComplete = msg.is_complete
if met:
met.isComplete = msg.is_complete

# If the tool call buffer is not empty, we need to add a dummy AI message.
if parts:
Expand All @@ -377,9 +372,9 @@ def format_messages_vercel(
id=uuid.uuid4(),
role="assistant",
createdAt=msg.creation_date,
content="",
parts=parts,
annotations=annotations,
metadata=ToolMetadataDict(toolCalls=metadata),
isComplete=False,
)
)

Expand Down
3 changes: 2 additions & 1 deletion backend/src/neuroagent/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
StrainGetOneTool,
SubjectGetAllTool,
SubjectGetOneTool,
WeatherTool,
)
from neuroagent.tools.base_tool import BaseTool

Expand Down Expand Up @@ -442,7 +443,7 @@ def get_tool_list(
SubjectGetAllTool,
SubjectGetOneTool,
# NowTool,
# WeatherTool,
WeatherTool,
# RandomPlotGeneratorTool,
]

Expand Down
Loading
Loading