Replies: 2 comments
-
To stream the response using from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.runnables import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate
from typing import Any
from langchain_openai import ChatOpenAI
from core.config import settings
from ai.tools.current_time import CurrentTimeTool
def get_tools() -> list:
return [CurrentTimeTool()]
def get_prompt() -> ChatPromptTemplate:
return ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant.",
),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
]
)
def get_llm(provider: str, model: str, temperature: float) -> Any:
if provider == "openai":
return ChatOpenAI(model=model, temperature=temperature, openai_api_key=settings.OPENAI_API_KEY, streaming=True)
else:
raise ValueError(f"Unsupported provider: {provider}")
async def get_agent(session_id: int) -> RunnableWithMessageHistory:
llm = get_llm("openai", model="gpt-3.5-turbo", temperature=0)
tools = get_tools()
prompt = get_prompt()
agent = create_tool_calling_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
memory = ChatMessageHistory(session_id=session_id)
agent_with_chat = RunnableWithMessageHistory(
agent_executor,
lambda session_id: memory,
input_messages_key="input",
history_messages_key="chat_history",
)
return agent_with_chat
import asyncio
from ai.agent import get_agent
async def main():
message = "tell me about poland"
session_id = 123
agent = await get_agent(session_id)
async for chunk in agent.astream(
{"input": message},
config={"configurable": {"session_id": session_id}}
):
print(chunk)
if __name__ == "__main__":
asyncio.run(main()) This code sets up an agent with memory and a search tool, and then streams the responses for the given messages. The Additionally, if you want to keep track of these messages automatically, you can wrap this in a from langchain_core.chat_history import BaseChatMessageHistory, InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
store = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in store:
store[session_id] = InMemoryChatMessageHistory()
return store[session_id]
agent_with_chat_history = RunnableWithMessageHistory(
agent_executor,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
)
# Example usage
config = {"configurable": {"session_id": "abc2"}}
for r in agent_with_chat_history.stream(
{"input": [HumanMessage(content="hi! I'm bob")]},
config=config,
):
print(r.content, end="|")
for r in agent_with_chat_history.stream(
{"input": [HumanMessage(content="what's my name?")]},
config=config,
):
print(r.content, end="|") This setup ensures that the agent can remember past interactions and take them into account when responding to follow-up questions. The For asynchronous usage, you can refer to the following example from the test cases: async def test_input_messages_async() -> None:
runnable = RunnableLambda(
lambda messages: "you said: "
+ "\n".join(str(m.content) for m in messages if isinstance(m, HumanMessage))
)
store: Dict = {}
get_session_history = _get_get_session_history(store=store)
with_history = RunnableWithMessageHistory(runnable, get_session_history)
config = {"session_id": "1_async"}
output = await with_history.ainvoke([HumanMessage(content="hello")], config) # type: ignore[arg-type]
assert output == "you said: hello"
output = await with_history.ainvoke([HumanMessage(content="good bye")], config) # type: ignore[arg-type]
assert output == "you said: hello\ngood bye"
output = [
c
async for c in with_history.astream([HumanMessage(content="hi again")], config) # type: ignore[arg-type]
] == ["you said: hello\ngood bye\nhi again"]
assert store == {
"1_async": InMemoryChatMessageHistory(
messages=[
HumanMessage(content="hello"),
AIMessage(content="you said: hello"),
HumanMessage(content="good bye"),
AIMessage(content="you said: hello\ngood bye"),
HumanMessage(content="hi again"),
AIMessage(content="you said: hello\ngood bye\nhi again"),
]
)
} This example demonstrates how to use Additionally, here is an example of how to use the async def test_runnable_with_multi_action_per_step() -> None:
"""Test an agent that can make multiple function calls at once."""
# Will alternate between responding with hello and goodbye
infinite_cycle = cycle(
[AIMessage(content="looking for pet..."), AIMessage(content="Found Pet")]
)
model = GenericFakeChatModel(messages=infinite_cycle)
template = ChatPromptTemplate.from_messages(
[("system", "You are Cat Agent 007"), ("human", "{question}")]
)
def fake_parse(inputs: dict) -> Union[AgentFinish, AgentAction]:
"""A parser."""
return AgentFinish(return_values={"foo": "meow"}, log="hard-coded-message")
agent = template | model | fake_parse
executor = AgentExecutor(agent=agent, tools=[]) # type: ignore[arg-type]
# astream
results = [r async for r in executor.astream({"question": "hello"})]
assert results == [
{
"actions": [
AgentAction(
tool="find_pet", tool_input={"pet": "cat"}, log="find_pet()"
)
],
"messages": [AIMessage(content="find_pet()")],
},
{
"messages": [HumanMessage(content="Spying from under the bed.")],
"steps": [
AgentStep(
action=AgentAction(
tool="find_pet", tool_input={"pet": "cat"}, log="find_pet()"
),
observation="Spying from under the bed.",
)
],
},
{"foo": "meow", "messages": [AIMessage(content="hard-coded-message")]},
] This example shows how to stream responses asynchronously using the |
Beta Was this translation helpful? Give feedback.
-
This is what worked for me. import asyncio
async def main():
async for event in agent_executor_with_history.astream_events(
{"input": "Tell me a story about a cat. In 100 words."},
version="v2",
config={"configurable": {"session_id": 1}}
):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="", flush=True)
asyncio.run(main()) This is the video I watched from Langchain. I hope this helps! |
Beta Was this translation helpful? Give feedback.
-
Checked other resources
Commit to Help
Example Code
Beta Was this translation helpful? Give feedback.
All reactions