Skip to content

Commit 929d073

Browse files
Merge pull request #1028 from MervinPraison/claude/fix-real-time-streaming-20250722
fix: implement real-time streaming for Agent.start() method
2 parents ca420fe + 662e155 commit 929d073

File tree

5 files changed

+571
-23
lines changed

5 files changed

+571
-23
lines changed

src/praisonai-agents/praisonaiagents/agent/agent.py

Lines changed: 103 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def __init__(
206206
knowledge_config: Optional[Dict[str, Any]] = None,
207207
use_system_prompt: Optional[bool] = True,
208208
markdown: bool = True,
209-
stream: bool = True,
209+
stream: bool = False,
210210
self_reflect: bool = False,
211211
max_reflect: int = 3,
212212
min_reflect: int = 1,
@@ -281,8 +281,8 @@ def __init__(
281281
conversations to establish agent behavior and context. Defaults to True.
282282
markdown (bool, optional): Enable markdown formatting in agent responses for better
283283
readability and structure. Defaults to True.
284-
stream (bool, optional): Enable streaming responses from the language model. Set to False
285-
for LLM providers that don't support streaming. Defaults to True.
284+
stream (bool, optional): Enable streaming responses from the language model for real-time
285+
output when using Agent.start() method. Defaults to False for backward compatibility.
286286
self_reflect (bool, optional): Enable self-reflection capabilities where the agent
287287
evaluates and improves its own responses. Defaults to False.
288288
max_reflect (int, optional): Maximum number of self-reflection iterations to prevent
@@ -1953,34 +1953,114 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]:
19531953
# Reset the final display flag for each new conversation
19541954
self._final_display_shown = False
19551955

1956-
# Temporarily disable verbose mode to prevent console output during streaming
1956+
# Temporarily disable verbose mode to prevent console output conflicts during streaming
19571957
original_verbose = self.verbose
19581958
self.verbose = False
19591959

1960-
# Use the existing chat logic but capture and yield chunks
1961-
# This approach reuses all existing logic without duplication
1962-
response = self.chat(prompt, **kwargs)
1963-
1964-
# Restore original verbose mode
1965-
self.verbose = original_verbose
1966-
1967-
if response:
1968-
# Simulate streaming by yielding the response in word chunks
1969-
# This provides a consistent streaming experience regardless of LLM type
1970-
words = str(response).split()
1971-
chunk_size = max(1, len(words) // 20) # Split into ~20 chunks for smooth streaming
1960+
# For custom LLM path, use the new get_response_stream generator
1961+
if self._using_custom_llm:
1962+
# Handle knowledge search
1963+
actual_prompt = prompt
1964+
if self.knowledge:
1965+
search_results = self.knowledge.search(prompt, agent_id=self.agent_id)
1966+
if search_results:
1967+
if isinstance(search_results, dict) and 'results' in search_results:
1968+
knowledge_content = "\n".join([result['memory'] for result in search_results['results']])
1969+
else:
1970+
knowledge_content = "\n".join(search_results)
1971+
actual_prompt = f"{prompt}\n\nKnowledge: {knowledge_content}"
1972+
1973+
# Handle tools properly
1974+
tools = kwargs.get('tools', self.tools)
1975+
if tools is None or (isinstance(tools, list) and len(tools) == 0):
1976+
tool_param = self.tools
1977+
else:
1978+
tool_param = tools
19721979

1973-
for i in range(0, len(words), chunk_size):
1974-
chunk_words = words[i:i + chunk_size]
1975-
chunk = ' '.join(chunk_words)
1980+
# Convert MCP tools if needed
1981+
if tool_param is not None:
1982+
from ..mcp.mcp import MCP
1983+
if isinstance(tool_param, MCP) and hasattr(tool_param, 'to_openai_tool'):
1984+
openai_tool = tool_param.to_openai_tool()
1985+
if openai_tool:
1986+
if isinstance(openai_tool, list):
1987+
tool_param = openai_tool
1988+
else:
1989+
tool_param = [openai_tool]
1990+
1991+
# Store chat history length for potential rollback
1992+
chat_history_length = len(self.chat_history)
1993+
1994+
# Normalize prompt content for chat history
1995+
normalized_content = actual_prompt
1996+
if isinstance(actual_prompt, list):
1997+
normalized_content = next((item["text"] for item in actual_prompt if item.get("type") == "text"), "")
1998+
1999+
# Prevent duplicate messages in chat history
2000+
if not (self.chat_history and
2001+
self.chat_history[-1].get("role") == "user" and
2002+
self.chat_history[-1].get("content") == normalized_content):
2003+
self.chat_history.append({"role": "user", "content": normalized_content})
2004+
2005+
try:
2006+
# Use the new streaming generator from LLM class
2007+
response_content = ""
2008+
for chunk in self.llm_instance.get_response_stream(
2009+
prompt=actual_prompt,
2010+
system_prompt=self._build_system_prompt(tool_param),
2011+
chat_history=self.chat_history,
2012+
temperature=kwargs.get('temperature', 0.2),
2013+
tools=tool_param,
2014+
output_json=kwargs.get('output_json'),
2015+
output_pydantic=kwargs.get('output_pydantic'),
2016+
verbose=False, # Keep verbose false for streaming
2017+
markdown=self.markdown,
2018+
agent_name=self.name,
2019+
agent_role=self.role,
2020+
agent_tools=[t.__name__ if hasattr(t, '__name__') else str(t) for t in (tool_param or [])],
2021+
task_name=kwargs.get('task_name'),
2022+
task_description=kwargs.get('task_description'),
2023+
task_id=kwargs.get('task_id'),
2024+
execute_tool_fn=self.execute_tool
2025+
):
2026+
response_content += chunk
2027+
yield chunk
19762028

1977-
# Add space after chunk unless it's the last one
1978-
if i + chunk_size < len(words):
1979-
chunk += ' '
2029+
# Add complete response to chat history
2030+
if response_content:
2031+
self.chat_history.append({"role": "assistant", "content": response_content})
2032+
2033+
except Exception as e:
2034+
# Rollback chat history on error
2035+
self.chat_history = self.chat_history[:chat_history_length]
2036+
logging.error(f"Custom LLM streaming error: {e}")
2037+
raise
2038+
2039+
else:
2040+
# For OpenAI-style models, fall back to the chat method for now
2041+
# TODO: Implement OpenAI streaming in future iterations
2042+
response = self.chat(prompt, **kwargs)
2043+
2044+
if response:
2045+
# Simulate streaming by yielding the response in word chunks
2046+
words = str(response).split()
2047+
chunk_size = max(1, len(words) // 20)
19802048

1981-
yield chunk
2049+
for i in range(0, len(words), chunk_size):
2050+
chunk_words = words[i:i + chunk_size]
2051+
chunk = ' '.join(chunk_words)
2052+
2053+
if i + chunk_size < len(words):
2054+
chunk += ' '
2055+
2056+
yield chunk
2057+
2058+
# Restore original verbose mode
2059+
self.verbose = original_verbose
19822060

19832061
except Exception as e:
2062+
# Restore verbose mode on any error
2063+
self.verbose = original_verbose
19842064
# Graceful fallback to non-streaming if streaming fails
19852065
logging.warning(f"Streaming failed, falling back to regular response: {e}")
19862066
response = self.chat(prompt, **kwargs)

src/praisonai-agents/praisonaiagents/llm/llm.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,6 +1557,198 @@ def get_response(
15571557
total_time = time.time() - start_time
15581558
logging.debug(f"get_response completed in {total_time:.2f} seconds")
15591559

1560+
def get_response_stream(
1561+
self,
1562+
prompt: Union[str, List[Dict]],
1563+
system_prompt: Optional[str] = None,
1564+
chat_history: Optional[List[Dict]] = None,
1565+
temperature: float = 0.2,
1566+
tools: Optional[List[Any]] = None,
1567+
output_json: Optional[BaseModel] = None,
1568+
output_pydantic: Optional[BaseModel] = None,
1569+
verbose: bool = False, # Default to non-verbose for streaming
1570+
markdown: bool = True,
1571+
agent_name: Optional[str] = None,
1572+
agent_role: Optional[str] = None,
1573+
agent_tools: Optional[List[str]] = None,
1574+
task_name: Optional[str] = None,
1575+
task_description: Optional[str] = None,
1576+
task_id: Optional[str] = None,
1577+
execute_tool_fn: Optional[Callable] = None,
1578+
**kwargs
1579+
):
1580+
"""Generator that yields real-time response chunks from the LLM.
1581+
1582+
This method provides true streaming by yielding content chunks as they
1583+
are received from the underlying LLM, enabling real-time display of
1584+
responses without waiting for the complete response.
1585+
1586+
Args:
1587+
prompt: The prompt to send to the LLM
1588+
system_prompt: Optional system prompt
1589+
chat_history: Optional chat history
1590+
temperature: Sampling temperature
1591+
tools: Optional list of tools for function calling
1592+
output_json: Optional JSON schema for structured output
1593+
output_pydantic: Optional Pydantic model for structured output
1594+
verbose: Whether to enable verbose logging (default False for streaming)
1595+
markdown: Whether to enable markdown processing
1596+
agent_name: Optional agent name for logging
1597+
agent_role: Optional agent role for logging
1598+
agent_tools: Optional list of agent tools for logging
1599+
task_name: Optional task name for logging
1600+
task_description: Optional task description for logging
1601+
task_id: Optional task ID for logging
1602+
execute_tool_fn: Optional function for executing tools
1603+
**kwargs: Additional parameters
1604+
1605+
Yields:
1606+
str: Individual content chunks as they are received from the LLM
1607+
1608+
Raises:
1609+
Exception: If streaming fails or LLM call encounters an error
1610+
"""
1611+
try:
1612+
import litellm
1613+
1614+
# Build messages using existing logic
1615+
messages, original_prompt = self._build_messages(
1616+
prompt=prompt,
1617+
system_prompt=system_prompt,
1618+
chat_history=chat_history,
1619+
output_json=output_json,
1620+
output_pydantic=output_pydantic
1621+
)
1622+
1623+
# Format tools for litellm
1624+
formatted_tools = self._format_tools_for_litellm(tools)
1625+
1626+
# Determine if we should use streaming based on tool support
1627+
use_streaming = True
1628+
if formatted_tools and not self._supports_streaming_tools():
1629+
# Provider doesn't support streaming with tools, fall back to non-streaming
1630+
use_streaming = False
1631+
1632+
if use_streaming:
1633+
# Real-time streaming approach with tool call support
1634+
try:
1635+
tool_calls = []
1636+
response_text = ""
1637+
1638+
for chunk in litellm.completion(
1639+
**self._build_completion_params(
1640+
messages=messages,
1641+
tools=formatted_tools,
1642+
temperature=temperature,
1643+
stream=True,
1644+
output_json=output_json,
1645+
output_pydantic=output_pydantic,
1646+
**kwargs
1647+
)
1648+
):
1649+
if chunk and chunk.choices and chunk.choices[0].delta:
1650+
delta = chunk.choices[0].delta
1651+
1652+
# Process both content and tool calls using existing helper
1653+
response_text, tool_calls = self._process_stream_delta(
1654+
delta, response_text, tool_calls, formatted_tools
1655+
)
1656+
1657+
# Yield content chunks in real-time as they arrive
1658+
if delta.content:
1659+
yield delta.content
1660+
1661+
# After streaming completes, handle tool calls if present
1662+
if tool_calls and execute_tool_fn:
1663+
# Add assistant message with tool calls to conversation
1664+
if self._is_ollama_provider():
1665+
messages.append({
1666+
"role": "assistant",
1667+
"content": response_text
1668+
})
1669+
else:
1670+
serializable_tool_calls = self._serialize_tool_calls(tool_calls)
1671+
messages.append({
1672+
"role": "assistant",
1673+
"content": response_text,
1674+
"tool_calls": serializable_tool_calls
1675+
})
1676+
1677+
# Execute tool calls and add results to conversation
1678+
for tool_call in tool_calls:
1679+
is_ollama = self._is_ollama_provider()
1680+
function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama)
1681+
1682+
try:
1683+
# Execute the tool
1684+
tool_result = execute_tool_fn(function_name, arguments)
1685+
1686+
# Add tool result to messages
1687+
tool_message = self._create_tool_message(function_name, tool_result, tool_call_id, is_ollama)
1688+
messages.append(tool_message)
1689+
1690+
except Exception as e:
1691+
logging.error(f"Tool execution error for {function_name}: {e}")
1692+
# Add error message to conversation
1693+
error_message = self._create_tool_message(
1694+
function_name, f"Error executing tool: {e}", tool_call_id, is_ollama
1695+
)
1696+
messages.append(error_message)
1697+
1698+
# Continue conversation after tool execution - get follow-up response
1699+
try:
1700+
follow_up_response = litellm.completion(
1701+
**self._build_completion_params(
1702+
messages=messages,
1703+
tools=formatted_tools,
1704+
temperature=temperature,
1705+
stream=False,
1706+
**kwargs
1707+
)
1708+
)
1709+
1710+
if follow_up_response and follow_up_response.choices:
1711+
follow_up_content = follow_up_response.choices[0].message.content
1712+
if follow_up_content:
1713+
# Yield the follow-up response after tool execution
1714+
yield follow_up_content
1715+
except Exception as e:
1716+
logging.error(f"Follow-up response failed: {e}")
1717+
1718+
except Exception as e:
1719+
logging.error(f"Streaming failed: {e}")
1720+
# Fall back to non-streaming if streaming fails
1721+
use_streaming = False
1722+
1723+
if not use_streaming:
1724+
# Fall back to non-streaming and yield the complete response
1725+
try:
1726+
response = litellm.completion(
1727+
**self._build_completion_params(
1728+
messages=messages,
1729+
tools=formatted_tools,
1730+
temperature=temperature,
1731+
stream=False,
1732+
output_json=output_json,
1733+
output_pydantic=output_pydantic,
1734+
**kwargs
1735+
)
1736+
)
1737+
1738+
if response and response.choices:
1739+
content = response.choices[0].message.content
1740+
if content:
1741+
# Yield the complete response as a single chunk
1742+
yield content
1743+
1744+
except Exception as e:
1745+
logging.error(f"Non-streaming fallback failed: {e}")
1746+
raise
1747+
1748+
except Exception as e:
1749+
logging.error(f"Error in get_response_stream: {e}")
1750+
raise
1751+
15601752
def _is_gemini_model(self) -> bool:
15611753
"""Check if the model is a Gemini model."""
15621754
if not self.model:

0 commit comments

Comments
 (0)