Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
The following sample demonstrates how to create an Azure AI Agent
and use it with streaming responses. The agent is configured to use
a plugin that provides a list of specials from the menu and the price
of the requested menu item.
of the requested menu item. The thread message ID is also printed as each
message is processed.
"""


Expand Down Expand Up @@ -72,12 +73,22 @@ async def main() -> None:
]

try:
last_thread_msg_id = None
for user_input in user_inputs:
print(f"# User: '{user_input}'")
first_chunk = True
async for response in agent.invoke_stream(messages=user_input, thread=thread):
async for response in agent.invoke_stream(
messages=user_input,
thread=thread,
):
if first_chunk:
print(f"# {response.role}: ", end="", flush=True)
# Show the thread message id before the first text chunk
if "thread_message_id" in response.content.metadata:
current_id = response.content.metadata["thread_message_id"]
if current_id != last_thread_msg_id:
print(f"(thread message id: {current_id}) ", end="", flush=True)
last_thread_msg_id = current_id
first_chunk = False
print(response.content, end="", flush=True)
thread = response.thread
Expand All @@ -87,6 +98,23 @@ async def main() -> None:
await thread.delete() if thread else None
await client.agents.delete_agent(agent.id)

"""
Sample Output:

# User: 'Hello'
# AuthorRole.ASSISTANT: (thread message id: msg_HZ2h4Wzbj7GEcnVCjnyEuYWT) Hello! How can I assist you with
the menu today?
# User: 'What is the special soup?'
# AuthorRole.ASSISTANT: (thread message id: msg_TSjkJK6hHJojIkPvF6uUofHD) The special soup today is
Clam Chowder. Would you like to know more about it or anything else from the menu?
# User: 'How much does that cost?'
# AuthorRole.ASSISTANT: (thread message id: msg_liwTpBFrB9JpCM1oM9EXKiwq) The Clam Chowder costs $9.99.
Is there anything else you'd like to know?
# User: 'Thank you'
# AuthorRole.ASSISTANT: (thread message id: msg_K6lpR3gYIHethXq17T6gJcxi) You're welcome!
If you have any more questions or need assistance, feel free to ask. Enjoy your meal!
"""


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
RunStepDeltaToolCallObject,
)

THREAD_MESSAGE_ID = "thread_message_id"

"""
The methods in this file are used with Azure AI Agent
related code. They are used to invoke, create chat messages,
Expand Down Expand Up @@ -115,6 +117,7 @@ def generate_message_content(
{
"created_at": completed_step.created_at,
"message_id": message.id, # message needs to be defined in context
"thread_message_id": message.id, # Add `thread_message_id` to avoid breaking the existing `message_id` key
"step_id": completed_step.id,
"run_id": completed_step.run_id,
"thread_id": completed_step.thread_id,
Expand Down Expand Up @@ -150,7 +153,9 @@ def generate_message_content(

@experimental
def generate_streaming_message_content(
assistant_name: str, message_delta_event: "MessageDeltaChunk"
assistant_name: str,
message_delta_event: "MessageDeltaChunk",
thread_msg_id: str | None = None,
) -> StreamingChatMessageContent:
"""Generate streaming message content from a MessageDeltaEvent."""
delta = message_delta_event.delta
Expand Down Expand Up @@ -196,7 +201,11 @@ def generate_streaming_message_content(
)
)

return StreamingChatMessageContent(role=role, name=assistant_name, items=items, choice_index=0) # type: ignore
metadata: dict[str, Any] | None = None
if thread_msg_id:
metadata = {THREAD_MESSAGE_ID: thread_msg_id}

return StreamingChatMessageContent(role=role, name=assistant_name, items=items, choice_index=0, metadata=metadata) # type: ignore


@experimental
Expand Down
16 changes: 15 additions & 1 deletion python/semantic_kernel/agents/azure_ai/agent_thread_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from azure.ai.agents.models._enums import MessageRole

from semantic_kernel.agents.azure_ai.agent_content_generation import (
THREAD_MESSAGE_ID,
generate_azure_ai_search_content,
generate_bing_grounding_content,
generate_code_interpreter_content,
Expand Down Expand Up @@ -550,6 +551,7 @@ async def _process_stream_events(
output_messages: "list[ChatMessageContent] | None" = None,
) -> AsyncIterable["StreamingChatMessageContent"]:
"""Process events from the main stream and delegate tool output handling as needed."""
thread_msg_id = None
while True:
# Use 'async with' only if the stream supports async context management (main agent stream).
# Tool output handlers only support async iteration, not context management.
Expand All @@ -567,8 +569,14 @@ async def _process_stream_events(
run_step = cast(RunStep, event_data)
logger.info(f"Assistant run in progress with ID: {run_step.id}")

elif event_type == AgentStreamEvent.THREAD_MESSAGE_CREATED:
# Keep the current message id stable unless a new one arrives
if thread_msg_id != event_data.id:
thread_msg_id = event_data.id
logger.info(f"Assistant message created with ID: {thread_msg_id}")

elif event_type == AgentStreamEvent.THREAD_MESSAGE_DELTA:
yield generate_streaming_message_content(agent.name, event_data)
yield generate_streaming_message_content(agent.name, event_data, thread_msg_id)

elif event_type == AgentStreamEvent.THREAD_RUN_STEP_COMPLETED:
step_completed = cast(RunStep, event_data)
Expand Down Expand Up @@ -622,6 +630,8 @@ async def _process_stream_events(
agent_name=agent.name, step_details=details
)
if content:
if thread_msg_id and THREAD_MESSAGE_ID not in content.metadata:
content.metadata[THREAD_MESSAGE_ID] = thread_msg_id
if output_messages is not None:
output_messages.append(content)
if content_is_visible:
Expand Down Expand Up @@ -654,6 +664,8 @@ async def _process_stream_events(
action_result.function_result_streaming_content,
):
if content and output_messages is not None:
if thread_msg_id and THREAD_MESSAGE_ID not in content.metadata:
content.metadata[THREAD_MESSAGE_ID] = thread_msg_id
output_messages.append(content)

handler: BaseAsyncAgentEventHandler = AsyncAgentEventHandler()
Expand Down Expand Up @@ -692,6 +704,8 @@ async def _process_stream_events(
agent_name=agent.name, mcp_tool_calls=mcp_tool_calls
)
if content:
if thread_msg_id and THREAD_MESSAGE_ID not in content.metadata:
content.metadata[THREAD_MESSAGE_ID] = thread_msg_id
output_messages.append(content)

# Create tool approvals for MCP calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)

from semantic_kernel.agents.azure_ai.agent_content_generation import (
THREAD_MESSAGE_ID,
generate_annotation_content,
generate_bing_grounding_content,
generate_code_interpreter_content,
Expand Down Expand Up @@ -295,11 +296,12 @@ def test_generate_streaming_message_content_text_only_no_annotations():
],
),
)
out = generate_streaming_message_content("assistant", delta)
out = generate_streaming_message_content("assistant", delta, thread_msg_id="thread_1")
assert out.content == "just text"
assert len(out.items) == 1
assert isinstance(out.items[0], StreamingTextContent)
assert out.items[0].text == "just text"
assert out.metadata.get(THREAD_MESSAGE_ID) == "thread_1"


def test_generate_annotation_content_empty_title_and_url_only():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,10 @@ def __iter__(self):


class MockRunData:
def __init__(self, id, status):
def __init__(self, id, status, content: str | None = None):
self.id = id
self.status = status
self.content = content


class MockAsyncIterable:
Expand Down Expand Up @@ -332,6 +333,7 @@ async def test_agent_thread_actions_invoke_stream(ai_project_client, ai_agent_de

events = [
MockEvent("thread.run.created", MockRunData(id="run_1", status="queued")),
MockEvent("thread.message.created", MockRunData(id="msg_1", status="created", content="Hello")),
MockEvent("thread.run.in_progress", MockRunData(id="run_1", status="in_progress")),
MockEvent("thread.run.completed", MockRunData(id="run_1", status="completed")),
]
Expand All @@ -350,3 +352,5 @@ async def test_agent_thread_actions_invoke_stream(ai_project_client, ai_agent_de
kernel=AsyncMock(spec=Kernel),
):
collected_messages.append(content)
assert isinstance(content, ChatMessageContent)
assert content.metadata.get("message_id") == "msg_1"
Loading