Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ Core features improve the library itself to cater wider range of functionalities

| Name | Description | Status | Release version |
|------|-------------|--------|-----------------|
| Full composability | Right now teams can only be combined with teams and agents with agents. We want to extend this to team + agent composibility | Yet to start | 0.0.4 |
| Full composability | Right now teams can only be combined with teams and agents with agents. We want to extend this to team + agent composibility | In progress | 0.0.4 |
| Error handling | Ability to handle errors autonomously | In Progress | 0.0.4|
|Output formatter| Ability to templatize output format using pydantic| Yet to start| 0.0.4|
|LLM Extensibilty| Ability to different LLMs across different agents and teams| Yet to start| 0.0.4|
|Auto-Query RAG| Ability to make metadata query within the agentic, which can automatically add metadata while rag query runs, like timestamp or other data|Yet to start|TBD|
Expand Down
147 changes: 94 additions & 53 deletions examples/email_reply_agent.ipynb

Large diffs are not rendered by default.

82 changes: 82 additions & 0 deletions examples/tool_error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from flo_ai import Flo
from flo_ai import FloSession
from langchain_openai import ChatOpenAI
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
from typing import List
from dotenv import load_dotenv
from flo_ai.tools.flo_tool import flotool

load_dotenv()

llm = ChatOpenAI(temperature=0, model_name='gpt-4o-mini')

session = FloSession(
llm,
log_level="ERROR"
)

class AdditionToolInput(BaseModel):
numbers: List[int] = Field(..., description="List of numbers to add")

import asyncio
# Use flotool to define the tool function
@flotool(name="AdditionTool", description="Tool to add numbers")
async def addition_tool(numbers: List[int]) -> str:
result = sum(numbers)
await asyncio.sleep(1)
return f"The sum is {result}"

@flotool(name="MultiplicationTool", description="Tool to multiply numbers to get product of numbers")
def mul_tool(numbers: List[int]) -> str:
result = sum(numbers)
# await asyncio.sleep(1)
return f"The product is {result}"

session.register_tool(
name="Adder",
tool=addition_tool
).register_tool(
name="Multiplier",
tool=mul_tool
)

simple_weather_checking_agent = """
apiVersion: flo/alpha-v1
kind: FloAgent
name: weather-assistant
agent:
name: SummationHelper
kind: agentic
job: >
Add or multiply numbers. Always answer based on what the tool says
tools:
- name: Adder
- name: Multiplier
"""

from IPython.display import Image, display
flo = Flo.build(session, simple_weather_checking_agent, log_level="ERROR")

import asyncio

# Assuming flo.ainvoke is your async method for invoking the tool or chain
async def invoke_main():
result = await flo.async_invoke("Whats the sum of 1, 3, 4, 5 and 6, and their product")
print(result)

asyncio.run(invoke_main())

# import asyncio

# async def stream_main():
# # Use 'async for' to iterate over the asynchronous generator
# async for s in flo.async_stream("Whats the sum of 1, 3, 4, 5 and 6, and their product"):
# if "__end__" not in s:
# print(s)
# print("----")

# asyncio.run(stream_main())



2 changes: 1 addition & 1 deletion features/steps/flo_name_steps.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from behave import given, when, then, step
from behave import when, then
from flo_ai.yaml.validators import raise_for_name_error

@when('use the name, {flo_name}')
Expand Down
5 changes: 3 additions & 2 deletions flo_ai/builders/yaml_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from flo_ai.state.flo_session import FloSession
from flo_ai.router.flo_router_factory import FloRouterFactory
from flo_ai.factory.agent_factory import AgentFactory
from flo_ai.yaml.validators import raise_for_name_error, DuplicateStringError
from flo_ai.error.flo_exception import FloException
from flo_ai.yaml.validators import raise_for_name_error
from flo_ai.common.flo_logger import builder_logger

def build_supervised_team(session: FloSession) -> ExecutableFlo:
Expand Down Expand Up @@ -45,5 +46,5 @@ def validate_names(name_set: set, name):
raise_for_name_error(name)
if name in name_set:
builder_logger.error(f"Duplicate name found: '{name}'")
raise DuplicateStringError(f"The name '{name}' is already in the set.")
raise FloException(f"The name '{name}' is already in the set.")
name_set.add(name)
13 changes: 13 additions & 0 deletions flo_ai/core.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from flo_ai.yaml.config import to_supervised_team
from flo_ai.builders.yaml_builder import build_supervised_team, FloRoutedTeamConfig
from typing import Any, Iterator, Union
from flo_ai.state.flo_session import FloSession
from flo_ai.models.flo_executable import ExecutableFlo
from flo_ai.error.flo_exception import FloException
from flo_ai.common.flo_logger import common_logger, builder_logger, set_global_log_level

class Flo:
Expand All @@ -25,13 +27,24 @@ def stream(self, query, config = None) -> Iterator[Union[dict[str, Any], Any]]:
self.logger.info(f"Streaming query for session {self.session.session_id}: {query}")
return self.runnable.stream(query, config)

def async_stream(self, query, config = None) -> Iterator[Union[dict[str, Any], Any]]:
self.logger.info(f"Streaming query for session {self.session.session_id}: {query}")
return self.runnable.astream(query, config)

def invoke(self, query, config = None) -> Iterator[Union[dict[str, Any], Any]]:
config = {
'callbacks' : [self.session.langchain_logger]
}
self.logger.info(f"Invoking query for session {self.session.session_id}: {query}")
return self.runnable.invoke(query, config)

def async_invoke(self, query, config = None) -> Iterator[Union[dict[str, Any], Any]]:
config = {
'callbacks' : [self.session.langchain_logger]
}
self.logger.info(f"Invoking query for session {self.session.session_id}: {query}")
return self.runnable.ainvoke(query, config)

@staticmethod
def build(session: FloSession, yaml: str, log_level: str = "INFO"):
set_global_log_level(log_level)
Expand Down
2 changes: 2 additions & 0 deletions flo_ai/error/flo_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class FloException(Exception):
pass
3 changes: 2 additions & 1 deletion flo_ai/factory/agent_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def __create_agentic_agent(session: FloSession, agent: AgentConfig, tool_map) ->
flo_agent: FloAgent = FloAgent.Builder(
session,
agent,
tools
tools,
on_error=session.on_agent_error
).build()
return flo_agent

Expand Down
15 changes: 6 additions & 9 deletions flo_ai/models/flo_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from flo_ai.models.flo_executable import ExecutableFlo
from flo_ai.state.flo_session import FloSession
from typing import Union, Optional
from typing import Union, Optional, Callable
from flo_ai.yaml.config import AgentConfig
from flo_ai.models.flo_executable import ExecutableType

Expand All @@ -28,14 +28,12 @@ def __init__(self,
verbose: bool = True,
role: Optional[str] = None,
llm: Union[BaseLanguageModel, None] = None,
return_intermediate_steps: bool = False,
handle_parsing_errors: bool = True) -> None:
prompt: Union[ChatPromptTemplate, str] = config.job
on_error: Union[str, Callable] = True) -> None:

prompt: Union[ChatPromptTemplate, str] = config.job
self.name: str = config.name
self.llm = llm if llm is not None else session.llm
self.config = config
# TODO improve to add more context of what other agents are available
system_prompts = [("system", "You are a {}".format(role)), ("system", prompt)] if role is not None else [("system", prompt)]
system_prompts.append(MessagesPlaceholder(variable_name="messages"))
system_prompts.append(MessagesPlaceholder(variable_name="agent_scratchpad"))
Expand All @@ -44,15 +42,14 @@ def __init__(self,
) if isinstance(prompt, str) else prompt
self.tools: list[BaseTool] = tools
self.verbose = verbose
self.return_intermediate_steps = return_intermediate_steps
self.handle_parsing_errors = handle_parsing_errors
self.on_error = on_error


def build(self) -> AgentExecutor:
agent = create_tool_calling_agent(self.llm, self.tools, self.prompt)
executor = AgentExecutor(agent=agent,
tools=self.tools,
verbose=self.verbose,
return_intermediate_steps=self.return_intermediate_steps,
handle_parsing_errors=self.handle_parsing_errors)
return_intermediate_steps=True,
handle_parsing_errors=self.on_error)
return FloAgent(agent, executor, self.config)
14 changes: 14 additions & 0 deletions flo_ai/models/flo_executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,27 @@ def stream(self, work, config = None):
]
}, config)

def astream(self, work, config = None):
return self.runnable.astream({
STATE_NAME_MESSAGES: [
HumanMessage(content=work)
]
}, config)

def invoke(self, work, config = None):
return self.runnable.invoke({
STATE_NAME_MESSAGES: [
HumanMessage(content=work)
],
}, config)

def ainvoke(self, work, config = None):
return self.runnable.ainvoke({
STATE_NAME_MESSAGES: [
HumanMessage(content=work)
],
}, config)


def draw(self, xray=True):
return self.runnable.get_graph().draw_mermaid_png()
11 changes: 1 addition & 10 deletions flo_ai/router/flo_llm_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,11 @@
from flo_ai.state.flo_state import TeamFloAgentState
from flo_ai.yaml.config import TeamConfig
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field
from pydantic import BaseModel, Field

class NextAgent(BaseModel):
next: str = Field(description="Name of the next member to be called")

class StateUpdateComponent:
def __init__(self, name: str, session: FloSession) -> None:
self.name = name
self.inner_session = session

def __call__(self, input):
self.inner_session.append(self.name)
return input

class FloLLMRouter(FloRouter):

def __init__(self,
Expand Down
2 changes: 1 addition & 1 deletion flo_ai/router/flo_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from langchain_core.runnables import Runnable
from flo_ai.state.flo_session import FloSession
from flo_ai.constants.prompt_constants import FLO_FINISH
from flo_ai.router.flo_llm_router import FloLLMRouter, StateUpdateComponent
from flo_ai.router.flo_llm_router import FloLLMRouter
from flo_ai.models.flo_team import FloTeam
from flo_ai.yaml.config import TeamConfig
from langchain_core.output_parsers import JsonOutputParser
Expand Down
16 changes: 12 additions & 4 deletions flo_ai/state/flo_session.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import uuid
from typing import Union
from typing import Union, Dict
from langchain_core.language_models import BaseLanguageModel
from langchain_core.tools import BaseTool
from flo_ai.common.flo_logger import session_logger, FloLogger
Expand All @@ -9,23 +8,32 @@

from typing import Optional

def _handle_agent_error(error) -> str:
error_message = str(error)[:50]
return f"""
Following error happened while agent execution, please retry with the fix for the same:
{error_message}
"""

class FloSession:

def __init__(self,
llm: BaseLanguageModel,
loop_size: int = 2,
max_loop: int = 3,
log_level: Optional[str] = "INFO",
custom_langchainlog_handler: Optional[FloLangchainLogger] = None) -> None:
custom_langchainlog_handler: Optional[FloLangchainLogger] = None,
on_agent_error=_handle_agent_error) -> None:

self.session_id = str(random_str(16))
self.llm = llm
self.tools = dict()
self.tools: Dict[str, BaseTool] = dict()
self.counter = dict()
self.navigation: list[str] = list()
self.pattern_series = dict()
self.loop_size: int = loop_size
self.max_loop: int = max_loop
self.on_agent_error = on_agent_error

self.init_logger(log_level)
self.logger = session_logger
Expand Down
28 changes: 28 additions & 0 deletions flo_ai/tools/flo_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import asyncio
from langchain.tools import tool
from functools import wraps
from typing import Optional

def flotool(
name: str,
description: Optional[str] = None,
argument_contract: Optional[type] = None,
unsafe: bool = False):

def decorator(func):
func.__doc__ = func.__doc__ or description

@tool(name, args_schema=argument_contract)
@wraps(func)
async def wrapper(*args, **kwargs):
try:
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
return func(*args, **kwargs)
except Exception as e:
if unsafe:
raise e
return f"An error occurred while executing the tool: {str(e)}, please retry with the corresponding fix"
return wrapper

return decorator
9 changes: 2 additions & 7 deletions flo_ai/yaml/validators.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import re
from flo_ai.error.flo_exception import FloException

name_regex = r'^[a-zA-Z0-9-_]+$'

class DuplicateStringError(Exception):
pass

class InvalidStringError(Exception):
pass

def raise_for_name_error(string):
if not re.match(name_regex, string):
raise InvalidStringError("Name must contain only alphanumeric characters and hyphens.")
raise FloException("Name must contain only alphanumeric characters and hyphens.")
Loading
Loading