Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 182 additions & 1 deletion src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1937,7 +1937,188 @@ def run(self):

def start(self, prompt: str, **kwargs):
"""Start the agent with a prompt. This is a convenience method that wraps chat()."""
return self.chat(prompt, **kwargs)
# Check if streaming is enabled and user wants streaming chunks
if self.stream and kwargs.get('stream', True):
return self._start_stream(prompt, **kwargs)
else:
return self.chat(prompt, **kwargs)

def _start_stream(self, prompt: str, **kwargs):
"""Generator method that yields streaming chunks from the agent."""
# Reset the final display flag for each new conversation
self._final_display_shown = False

# Search for existing knowledge if any knowledge is provided
if self.knowledge:
search_results = self.knowledge.search(prompt, agent_id=self.agent_id)
if search_results:
# Check if search_results is a list of dictionaries or strings
if isinstance(search_results, dict) and 'results' in search_results:
# Extract memory content from the results
knowledge_content = "\n".join([result['memory'] for result in search_results['results']])
else:
# If search_results is a list of strings, join them directly
knowledge_content = "\n".join(search_results)

# Append found knowledge to the prompt
prompt = f"{prompt}\n\nKnowledge: {knowledge_content}"

Comment on lines +1952 to +1965
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Extract knowledge search logic to avoid duplication.

This knowledge search logic is duplicated from the chat method (lines 1213-1226). Consider extracting it to a shared method.

Add a shared method:

def _enrich_prompt_with_knowledge(self, prompt: str) -> str:
    """Enrich prompt with knowledge search results if available."""
    if self.knowledge:
        search_results = self.knowledge.search(prompt, agent_id=self.agent_id)
        if search_results:
            # Check if search_results is a list of dictionaries or strings
            if isinstance(search_results, dict) and 'results' in search_results:
                # Extract memory content from the results
                knowledge_content = "\n".join([result['memory'] for result in search_results['results']])
            else:
                # If search_results is a list of strings, join them directly
                knowledge_content = "\n".join(search_results)
            
            # Append found knowledge to the prompt
            prompt = f"{prompt}\n\nKnowledge: {knowledge_content}"
    return prompt

Then use it in both _start_stream and chat methods:

-        # Search for existing knowledge if any knowledge is provided
-        if self.knowledge:
-            search_results = self.knowledge.search(prompt, agent_id=self.agent_id)
-            if search_results:
-                # Check if search_results is a list of dictionaries or strings
-                if isinstance(search_results, dict) and 'results' in search_results:
-                    # Extract memory content from the results
-                    knowledge_content = "\n".join([result['memory'] for result in search_results['results']])
-                else:
-                    # If search_results is a list of strings, join them directly
-                    knowledge_content = "\n".join(search_results)
-                
-                # Append found knowledge to the prompt
-                prompt = f"{prompt}\n\nKnowledge: {knowledge_content}"
+        prompt = self._enrich_prompt_with_knowledge(prompt)
🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/agent/agent.py around lines 1955 to
1968, the knowledge search logic is duplicated from the chat method (lines
1213-1226). To fix this, extract the knowledge search and prompt enrichment code
into a new shared method named _enrich_prompt_with_knowledge that takes a prompt
string, performs the knowledge search, and appends the knowledge content if
found, then returns the enriched prompt. Replace the duplicated code in both the
_start_stream and chat methods by calling this new shared method to avoid
redundancy.

# Get streaming response using the internal streaming method
for chunk in self._chat_stream(prompt, **kwargs):
yield chunk

def _chat_stream(self, prompt, temperature=0.2, tools=None, output_json=None, output_pydantic=None, reasoning_steps=False, **kwargs):
"""Internal streaming method that yields chunks from the LLM response."""

# Use the same logic as chat() but yield chunks instead of returning final response
if self._using_custom_llm:
# For custom LLM, yield chunks from the LLM instance
for chunk in self._custom_llm_stream(prompt, temperature, tools, output_json, output_pydantic, reasoning_steps, **kwargs):
yield chunk
else:
# For standard OpenAI client, yield chunks from the streaming response
for chunk in self._openai_stream(prompt, temperature, tools, output_json, output_pydantic, reasoning_steps, **kwargs):
yield chunk

def _custom_llm_stream(self, prompt, temperature=0.2, tools=None, output_json=None, output_pydantic=None, reasoning_steps=False, **kwargs):
"""Handle streaming for custom LLM instances."""
# Store chat history length for potential rollback
chat_history_length = len(self.chat_history)

try:
# Special handling for MCP tools when using provider/model format
if tools is None or (isinstance(tools, list) and len(tools) == 0):
tool_param = self.tools
else:
tool_param = tools

# Convert MCP tool objects to OpenAI format if needed
if tool_param is not None:
from ..mcp.mcp import MCP
if isinstance(tool_param, MCP) and hasattr(tool_param, 'to_openai_tool'):
openai_tool = tool_param.to_openai_tool()
if openai_tool:
if isinstance(openai_tool, list):
tool_param = openai_tool
else:
tool_param = [openai_tool]

# Normalize prompt content for consistent chat history storage
normalized_content = prompt
if isinstance(prompt, list):
normalized_content = next((item["text"] for item in prompt if item.get("type") == "text"), "")

# Prevent duplicate messages
if not (self.chat_history and
self.chat_history[-1].get("role") == "user" and
self.chat_history[-1].get("content") == normalized_content):
self.chat_history.append({"role": "user", "content": normalized_content})

# Get streaming response from LLM instance
if hasattr(self.llm_instance, 'get_response_stream'):
# Use streaming method if available
stream_response = self.llm_instance.get_response_stream(
prompt=prompt,
system_prompt=self._build_system_prompt(tools),
chat_history=self.chat_history,
temperature=temperature,
tools=tool_param,
output_json=output_json,
output_pydantic=output_pydantic,
verbose=self.verbose,
markdown=self.markdown,
console=self.console,
agent_name=self.name,
agent_role=self.role,
agent_tools=[t.__name__ if hasattr(t, '__name__') else str(t) for t in (tools if tools is not None else self.tools)],
reasoning_steps=reasoning_steps,
execute_tool_fn=self.execute_tool
)

accumulated_response = ""
for chunk in stream_response:
accumulated_response += chunk
yield chunk

# Add final response to chat history
self.chat_history.append({"role": "assistant", "content": accumulated_response})

else:
# Fallback to regular response if streaming not available
response_text = self.llm_instance.get_response(
prompt=prompt,
system_prompt=self._build_system_prompt(tools),
chat_history=self.chat_history,
temperature=temperature,
tools=tool_param,
output_json=output_json,
output_pydantic=output_pydantic,
verbose=self.verbose,
markdown=self.markdown,
console=self.console,
agent_name=self.name,
agent_role=self.role,
agent_tools=[t.__name__ if hasattr(t, '__name__') else str(t) for t in (tools if tools is not None else self.tools)],
reasoning_steps=reasoning_steps,
execute_tool_fn=self.execute_tool,
stream=True
)

self.chat_history.append({"role": "assistant", "content": response_text})
# Yield the complete response as a single chunk
yield response_text

except Exception as e:
# Rollback chat history on error
self.chat_history = self.chat_history[:chat_history_length]
yield f"Error: {str(e)}"

def _openai_stream(self, prompt, temperature=0.2, tools=None, output_json=None, output_pydantic=None, reasoning_steps=False, **kwargs):
"""Handle streaming for standard OpenAI client."""
# Store chat history length for potential rollback
chat_history_length = len(self.chat_history)

try:
# Use the new _build_messages helper method
messages, original_prompt = self._build_messages(prompt, temperature, output_json, output_pydantic)

# Normalize original_prompt for consistent chat history storage
normalized_content = original_prompt
if isinstance(original_prompt, list):
normalized_content = next((item["text"] for item in original_prompt if item.get("type") == "text"), "")

# Prevent duplicate messages
if not (self.chat_history and
self.chat_history[-1].get("role") == "user" and
self.chat_history[-1].get("content") == normalized_content):
self.chat_history.append({"role": "user", "content": normalized_content})

# Get streaming response from OpenAI client
if self._openai_client is None:
raise ValueError("OpenAI client is not initialized. Please provide OPENAI_API_KEY or use a custom LLM provider.")

# Stream the response using OpenAI client
accumulated_response = ""
for chunk in self._openai_client.chat_completion_with_tools_stream(
messages=messages,
model=self.llm,
temperature=temperature,
tools=self._format_tools_for_completion(tools),
execute_tool_fn=self.execute_tool,
reasoning_steps=reasoning_steps,
verbose=self.verbose,
max_iterations=10
):
accumulated_response += chunk
yield chunk

# Add the accumulated response to chat history
self.chat_history.append({"role": "assistant", "content": accumulated_response})

except Exception as e:
# Rollback chat history on error
self.chat_history = self.chat_history[:chat_history_length]
yield f"Error: {str(e)}"

def execute(self, task, context=None):
"""Execute a task synchronously - backward compatibility method"""
Expand Down
144 changes: 144 additions & 0 deletions src/praisonai-agents/praisonaiagents/llm/openai_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,150 @@ async def achat_completion_with_tools(
break

return final_response

def chat_completion_with_tools_stream(
self,
messages: List[Dict[str, Any]],
model: str = "gpt-4o",
temperature: float = 0.7,
tools: Optional[List[Any]] = None,
execute_tool_fn: Optional[Callable] = None,
reasoning_steps: bool = False,
verbose: bool = True,
max_iterations: int = 10,
**kwargs
):
"""
Create a streaming chat completion with tool support.

This method yields chunks of the response as they are generated,
enabling real-time streaming to the user.

Args:
messages: List of message dictionaries
model: Model to use
temperature: Temperature for generation
tools: List of tools (can be callables, dicts, or strings)
execute_tool_fn: Function to execute tools
reasoning_steps: Whether to show reasoning
verbose: Whether to show verbose output
max_iterations: Maximum tool calling iterations
**kwargs: Additional API parameters

Yields:
String chunks of the response as they are generated
"""
# Format tools for OpenAI API
formatted_tools = self.format_tools(tools)

# Continue tool execution loop until no more tool calls are needed
iteration_count = 0

while iteration_count < max_iterations:
try:
# Create streaming response
response_stream = self._sync_client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
tools=formatted_tools if formatted_tools else None,
stream=True,
**kwargs
)

full_response_text = ""
reasoning_content = ""
chunks = []

# Stream the response chunk by chunk
for chunk in response_stream:
chunks.append(chunk)
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response_text += content
yield content

# Handle reasoning content if enabled
if reasoning_steps and chunk.choices and hasattr(chunk.choices[0].delta, "reasoning_content"):
rc = chunk.choices[0].delta.reasoning_content
if rc:
reasoning_content += rc
yield f"[Reasoning: {rc}]"

# Process the complete response to check for tool calls
final_response = process_stream_chunks(chunks)

if not final_response:
return

# Check for tool calls
tool_calls = getattr(final_response.choices[0].message, 'tool_calls', None)

if tool_calls and execute_tool_fn:
# Convert ToolCall dataclass objects to dict for JSON serialization
serializable_tool_calls = []
for tc in tool_calls:
if isinstance(tc, ToolCall):
# Convert dataclass to dict
serializable_tool_calls.append({
"id": tc.id,
"type": tc.type,
"function": tc.function
})
else:
# Already an OpenAI object, keep as is
serializable_tool_calls.append(tc)

messages.append({
"role": "assistant",
"content": final_response.choices[0].message.content,
"tool_calls": serializable_tool_calls
})

for tool_call in tool_calls:
# Handle both ToolCall dataclass and OpenAI object
try:
if isinstance(tool_call, ToolCall):
function_name = tool_call.function["name"]
arguments = json.loads(tool_call.function["arguments"])
else:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
except json.JSONDecodeError as e:
if verbose:
yield f"\n[Error parsing arguments for {function_name if 'function_name' in locals() else 'unknown function'}: {str(e)}]"
continue

if verbose:
yield f"\n[Calling function: {function_name}]"

# Execute the tool with error handling
try:
tool_result = execute_tool_fn(function_name, arguments)
results_str = json.dumps(tool_result) if tool_result else "Function returned an empty output"
except Exception as e:
results_str = f"Error executing function: {str(e)}"
if verbose:
yield f"\n[Function error: {str(e)}]"

if verbose:
yield f"\n[Function result: {results_str}]"

messages.append({
"role": "tool",
"tool_call_id": tool_call.id if hasattr(tool_call, 'id') else tool_call['id'],
"content": results_str
})

# Continue the loop to allow more tool calls
iteration_count += 1
else:
# No tool calls, we're done
break

except Exception as e:
yield f"Error: {str(e)}"
break

def parse_structured_output(
self,
Expand Down
Loading