Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Introduce the chat history reducer #10190

Merged
merged 26 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
84fabe0
wip: chat history reducer
moonbox3 Jan 13, 2025
8d9102f
Merge branch 'main' into py-chat-history-reduce
moonbox3 Jan 14, 2025
a5b712e
Implement agent chat history reducer. Add unit tests. Add sample.
moonbox3 Jan 15, 2025
14ab2f4
Update readme
moonbox3 Jan 15, 2025
98108bf
Merge branch 'main' into py-chat-history-reduce
moonbox3 Jan 15, 2025
6f132b9
Move chat history reducer to extend chat history. Break circular depe…
moonbox3 Jan 16, 2025
6d4f617
Address PR feedback
moonbox3 Jan 17, 2025
8564f23
Repair uv.lock
moonbox3 Jan 17, 2025
58f1218
Fix uv lock again
moonbox3 Jan 17, 2025
66abe04
Upgrade to uv 0.5.20 locally. Re sync uv lock
moonbox3 Jan 17, 2025
93f788f
Merge branch 'main' into py-chat-history-reduce
moonbox3 Jan 17, 2025
0aebeee
Address PR feedback
moonbox3 Jan 20, 2025
429ddf8
Sample update.
moonbox3 Jan 20, 2025
fdc7af7
Work on chat history reducer.
moonbox3 Jan 21, 2025
8ad615b
Merge main to branch
moonbox3 Jan 21, 2025
e5d5d22
wip on history reducer
moonbox3 Jan 21, 2025
3277e39
wip: instruction role handling
moonbox3 Jan 21, 2025
0002b9a
Final updates to chat history reducer and samples. Update tests.
moonbox3 Jan 22, 2025
daa685c
Improve AzureAI inference instruction role field. Add tests.
moonbox3 Jan 22, 2025
3b11a7e
Merge branch 'main' into py-chat-history-reduce
moonbox3 Jan 22, 2025
58547df
Cleanup
moonbox3 Jan 23, 2025
57b5e04
Merge branch 'main' into py-chat-history-reduce
moonbox3 Jan 23, 2025
4c21cc0
Merge main to branch
moonbox3 Jan 23, 2025
91f72ea
PR feedback
moonbox3 Jan 23, 2025
78065d0
Merge branch 'py-chat-history-reduce' of github.com:moonbox3/semantic…
moonbox3 Jan 23, 2025
307efd8
Update sample to use kernel invoke function call
moonbox3 Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/samples/concepts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [Assistant Agent Retrieval](./agents/assistant_agent_retrieval.py)
- [Assistant Agent Streaming](./agents/assistant_agent_streaming.py)
- [Chat Completion Function Termination](./agents/chat_completion_function_termination.py)
- [Chat Completion History Reducer](./agents/chat_completion_history_reducer.py)
- [Mixed Chat Agents](./agents/mixed_chat_agents.py)
- [Mixed Chat Agents Plugins](./agents/mixed_chat_agents_plugins.py)
- [Mixed Chat Files](./agents/mixed_chat_files.py)
Expand Down
285 changes: 285 additions & 0 deletions python/samples/concepts/agents/chat_completion_history_reducer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import logging

from semantic_kernel.agents import (
AgentGroupChat,
ChatCompletionAgent,
ChatHistorySummarizationReducer,
ChatHistoryTruncationReducer,
)
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, OpenAIChatCompletion
from semantic_kernel.contents import AuthorRole, ChatHistory, ChatMessageContent
from semantic_kernel.kernel import Kernel

#####################################################################
# The following sample demonstrates how to create use the #
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved
# Semantic Kernel Agent Framework Chat History Reducer. The sample #
# covers two types of reducers: summarization reduction and a #
# truncation reduction. For this sample, the ChatCompletionAgent #
# is used. #
#####################################################################


# Initialize the logger for debugging and information messages
logger = logging.getLogger(__name__)

# Flag to determine whether to use Azure OpenAI services or OpenAI
# Set this to True if using Azure OpenAI (requires appropriate configuration)
use_azure_openai = True


# Helper function to create and configure a Kernel with the desired chat completion service
def _create_kernel_with_chat_completion(service_id: str) -> Kernel:
"""A helper function to create a kernel with a chat completion service."""
kernel = Kernel()
if use_azure_openai:
# Add Azure OpenAI service to the kernel
kernel.add_service(AzureChatCompletion(service_id=service_id))
else:
# Add OpenAI service to the kernel
kernel.add_service(OpenAIChatCompletion(service_id=service_id))
return kernel


class HistoryReducerExample:
"""
Demonstrates how to create a ChatCompletionAgent with a ChatHistoryReducer
(either truncation or summarization) and how to invoke that agent
multiple times while applying the history reduction.
"""

# Agent-specific settings
TRANSLATOR_NAME = "NumeroTranslator" # Name of the agent
TRANSLATOR_INSTRUCTIONS = "Add one to the latest user number and spell it in Spanish without explanation."

def create_truncating_agent(self, reducer_msg_count: int, reducer_threshold: int) -> ChatCompletionAgent:
"""
Creates a ChatCompletionAgent with a truncation-based history reducer.

Parameters:
- reducer_msg_count: Target number of messages to retain after truncation.
- reducer_threshold: Threshold number of messages to trigger truncation.

Returns:
- A configured ChatCompletionAgent instance with truncation enabled.
"""
truncation_reducer = ChatHistoryTruncationReducer(
target_count=reducer_msg_count, threshold_count=reducer_threshold
)

agent = ChatCompletionAgent(
name=self.TRANSLATOR_NAME,
instructions=self.TRANSLATOR_INSTRUCTIONS,
kernel=_create_kernel_with_chat_completion("truncate_agent"),
)
agent.history_reducer = truncation_reducer
return agent

def create_summarizing_agent(self, reducer_msg_count: int, reducer_threshold: int) -> ChatCompletionAgent:
"""
Creates a ChatCompletionAgent with a summarization-based history reducer.

Parameters:
- reducer_msg_count: Target number of messages to retain after summarization.
- reducer_threshold: Threshold number of messages to trigger summarization.

Returns:
- A configured ChatCompletionAgent instance with summarization enabled.
"""
kernel = _create_kernel_with_chat_completion("summarize_agent")

summarization_reducer = ChatHistorySummarizationReducer(
service=kernel.get_service(service_id="summarize_agent"),
target_count=reducer_msg_count,
threshold_count=reducer_threshold,
summarization_instructions=("Add one to user number, but in Spanish. Then summarize context."),
)

agent = ChatCompletionAgent(
name=self.TRANSLATOR_NAME,
instructions=self.TRANSLATOR_INSTRUCTIONS,
kernel=kernel,
)
agent.history_reducer = summarization_reducer
return agent

async def invoke_agent(self, agent: ChatCompletionAgent, message_count: int):
"""
Demonstrates agent invocation with direct history management and reduction.

Parameters:
- agent: The ChatCompletionAgent to invoke.
- message_count: The number of messages to simulate in the conversation.
"""
chat_history = ChatHistory() # Initialize a new chat history

index = 1
while index <= message_count:
# Provide user input
user_message = ChatMessageContent(role=AuthorRole.USER, content=str(index))
chat_history.messages.append(user_message)
print(f"# User: '{index}'")

# Attempt history reduction if a reducer is present
is_reduced = False
if agent.history_reducer is not None:
reduced = await agent.history_reducer.reduce(chat_history.messages)
if reduced is not None:
chat_history.messages.clear()
chat_history.messages.extend(reduced)
is_reduced = True
print("@ (History was reduced!)")

# Invoke the agent and display its response
async for response in agent.invoke(chat_history):
chat_history.messages.append(response)
print(f"# {response.role} - {response.name}: '{response.content}'")

index += 2
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved
print(f"@ Message Count: {len(chat_history.messages)}\n")

# If history was reduced, print summaries
if is_reduced:
self._print_summaries_from_front(chat_history.messages)
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved

async def invoke_chat(self, agent: ChatCompletionAgent, message_count: int):
"""
Demonstrates agent invocation within a group chat.

Parameters:
- agent: The ChatCompletionAgent to invoke.
- message_count: The number of messages to simulate in the conversation.
"""
chat = AgentGroupChat() # Initialize a new group chat
last_history_count = 0

index = 1
while index <= message_count:
# Add user message to the chat
user_msg = ChatMessageContent(role=AuthorRole.USER, content=str(index))
await chat.add_chat_message(user_msg)
print(f"# User: '{index}'")

# Invoke the agent and display its response
async for message in chat.invoke(agent):
print(f"# {message.role} - {message.name or '*'}: '{message.content}'")

index += 2

# Retrieve chat messages in descending order (newest first)
msgs = []
async for m in chat.get_chat_messages(agent):
msgs.append(m)

print(f"@ Message Count: {len(msgs)}\n")

# Check for reduction in message count and print summaries
if len(msgs) < last_history_count:
self._print_summaries_from_back(msgs)

last_history_count = len(msgs)

def _print_summaries_from_front(self, messages: list[ChatMessageContent]):
"""
Prints summaries from the front of the message list.

Parameters:
- messages: List of chat messages to process.
"""
summary_index = 0
while summary_index < len(messages):
msg = messages[summary_index]
if msg.metadata and msg.metadata.get("__summary__"):
print(f"\tSummary: {msg.content}")
summary_index += 1
else:
break

def _print_summaries_from_back(self, messages: list[ChatMessageContent]):
"""
Prints summaries from the back of the message list.

Parameters:
- messages: List of chat messages to process.
"""
summary_index = len(messages) - 1
while summary_index >= 0:
msg = messages[summary_index]
if msg.metadata and msg.metadata.get("__summary__"):
print(f"\tSummary: {msg.content}")
summary_index -= 1
else:
break


# Main entry point for the script
async def main():
# Initialize the example class
example = HistoryReducerExample()

# Demonstrate truncation-based reduction
trunc_agent = example.create_truncating_agent(
# reducer_msg_count:
# Purpose: Defines the target number of messages to retain after applying truncation or summarization.
# What it controls: This parameter determines how much of the most recent conversation history
# is preserved while discarding or summarizing older messages.
# Why change it?:
# - Smaller values: Use when memory constraints are tight, or the assistant only needs a brief history
# to maintain context.
# - Larger values: Use when retaining more conversational context is critical for accurate responses
# or maintaining a richer dialogue.
reducer_msg_count=10,
# reducer_threshold:
# Purpose: Acts as a buffer to avoid reducing history prematurely when the current message count exceeds
# reducer_msg_count by a small margin.
# What it controls: Helps ensure that essential paired messages (like a user query and the assistant’s response)
# are not "orphaned" or lost during truncation or summarization.
# Why change it?:
# - Smaller values: Use when you want stricter reduction criteria and are okay with possibly cutting older
# pairs of messages sooner.
# - Larger values: Use when you want to minimize the risk of cutting a critical part of the conversation,
# especially for sensitive interactions like API function calls or complex responses.
reducer_threshold=10,
)
print("===TruncatedAgentReduction Demo===")
await example.invoke_agent(trunc_agent, message_count=50)

# Demonstrate summarization-based reduction
sum_agent = example.create_summarizing_agent(
# Same configuration for summarization-based reduction
reducer_msg_count=10, # Target number of messages to retain
reducer_threshold=10, # Buffer to avoid premature reduction
)
print("\n===SummarizedAgentReduction Demo===")
await example.invoke_agent(sum_agent, message_count=50)

# Demonstrate group chat with truncation
print("\n===TruncatedChatReduction Demo===")
await example.invoke_chat(trunc_agent, message_count=50)

# Demonstrate group chat with summarization
print("\n===SummarizedChatReduction Demo===")
await example.invoke_chat(sum_agent, message_count=50)


# Interaction between reducer_msg_count and reducer_threshold:
# The combination of these values determines when reduction occurs and how much history is kept.
# Example:
# If reducer_msg_count = 10 and reducer_threshold = 5, history will not be truncated until the total message count
# exceeds 15. This approach ensures flexibility in retaining conversational context while still adhering to memory
# constraints.

# Recommendations:
# - Adjust for performance: Use a lower reducer_msg_count in environments with limited memory or when the assistant
# needs faster processing times.
# - Context sensitivity: Increase reducer_msg_count and reducer_threshold in use cases where maintaining continuity
# across multiple interactions is essential (e.g., multi-turn conversations or complex workflows).
# - Experiment: Start with the default values (10 and 10) and refine based on your application's behavior and the
# assistant's response quality.


# Execute the main function if the script is run directly
if __name__ == "__main__":
asyncio.run(main())
4 changes: 4 additions & 0 deletions python/semantic_kernel/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from semantic_kernel.agents.chat_completion.chat_completion_agent import ChatCompletionAgent
from semantic_kernel.agents.group_chat.agent_chat import AgentChat
from semantic_kernel.agents.group_chat.agent_group_chat import AgentGroupChat
from semantic_kernel.agents.history.chat_history_summarization_reducer import ChatHistorySummarizationReducer
from semantic_kernel.agents.history.chat_history_truncation_reducer import ChatHistoryTruncationReducer

__all__ = [
"Agent",
"AgentChat",
"AgentGroupChat",
"ChatCompletionAgent",
"ChatHistorySummarizationReducer",
"ChatHistoryTruncationReducer",
]
26 changes: 24 additions & 2 deletions python/semantic_kernel/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@

import uuid
from collections.abc import Iterable
from typing import ClassVar
from typing import TYPE_CHECKING, ClassVar

from pydantic import Field

from semantic_kernel.agents.channels.agent_channel import AgentChannel
from semantic_kernel.agents.history.chat_history_reducer import ChatHistoryReducer
from semantic_kernel.kernel import Kernel
from semantic_kernel.kernel_pydantic import KernelBaseModel
from semantic_kernel.utils.experimental_decorator import experimental_class
from semantic_kernel.utils.naming import generate_random_ascii_name
from semantic_kernel.utils.validation import AGENT_NAME_REGEX

if TYPE_CHECKING:
from semantic_kernel.contents.chat_history import ChatHistory


@experimental_class
class Agent(KernelBaseModel):
Expand All @@ -37,6 +41,20 @@ class Agent(KernelBaseModel):
instructions: str | None = None
kernel: Kernel = Field(default_factory=Kernel)
channel_type: ClassVar[type[AgentChannel] | None] = None
history_reducer: ChatHistoryReducer | None = None

async def reduce_history(self, history: "ChatHistory") -> bool:
"""Perform the reduction on the provided history, returning True if reduction occurred."""
if self.history_reducer is None:
return False

new_messages = await self.history_reducer.reduce(history.messages)
if new_messages is not None:
history.messages.clear()
history.messages.extend(new_messages)
return True

return False

def get_channel_keys(self) -> Iterable[str]:
"""Get the channel keys.
Expand All @@ -46,7 +64,11 @@ def get_channel_keys(self) -> Iterable[str]:
"""
if not self.channel_type:
raise NotImplementedError("Unable to get channel keys. Channel type not configured.")
return [self.channel_type.__name__]
yield self.channel_type.__name__

if self.history_reducer is not None:
yield self.history_reducer.__class__.__name__
yield str(self.history_reducer.__hash__)

async def create_channel(self) -> AgentChannel:
"""Create a channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ async def invoke(
f"Invalid channel binding for agent with id: `{id}` with name: ({type(agent).__name__})"
)

# pre-process history reduction
await agent.reduce_history(self)

message_count = len(self.messages)
mutated_history = set()
message_queue: Deque[ChatMessageContent] = deque()
Expand Down Expand Up @@ -119,6 +122,9 @@ async def invoke_stream(
f"Invalid channel binding for agent with id: `{id}` with name: ({type(agent).__name__})"
)

# pre-process history reduction
await agent.reduce_history(self)

message_count = len(self.messages)

async for response_message in agent.invoke_stream(self):
Expand Down
Empty file.
Loading
Loading