Skip to content

Commit

Permalink
Added "RetryPolicy" for sql gen and sql exec. Defaults to 3 attempts …
Browse files Browse the repository at this point in the history
…on exception. Can simulate it by having simulate_error_sql_exec/simulate_error_sql_gen during State() initialization.
  • Loading branch information
rbalamohan committed Sep 26, 2024
1 parent b3113f7 commit 9cae755
Showing 1 changed file with 17 additions and 40 deletions.
57 changes: 17 additions & 40 deletions conversational-analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pandas as pd
from langgraph.graph import StateGraph
from langgraph.types import Send
from langgraph.types import RetryPolicy
from pydantic import BaseModel
from waii_sdk_py import WAII
from waii_sdk_py.query import QueryGenerationRequest, RunQueryRequest
Expand All @@ -20,8 +20,9 @@ class State(BaseModel):
insight: str = ''
response: str = ''
error: Optional[str] = None
attempts: int = 0
path_decision: str = ""
simulate_error_sql_gen: bool = False
simulate_error_sql_exec: bool = False


class LanggraphWorkflowManager:
Expand All @@ -45,10 +46,14 @@ def init_waii(self):
def create_workflow(self) -> StateGraph:
workflow = StateGraph(State)

# Define retry policies
sql_retry_policy = RetryPolicy(max_attempts=3, backoff_factor=2.0, initial_interval=0.5, retry_on=Exception)
execution_retry_policy = RetryPolicy(max_attempts=3, backoff_factor=2.0, initial_interval=0.5, retry_on=Exception)

# Add nodes to the graph
workflow.add_node("Intent Classifier", self.intent_classifier)
workflow.add_node("SQL Generator", self.sql_generator)
workflow.add_node("SQL Executor", self.sql_executor)
workflow.add_node("SQL Generator", self.sql_generator, retry=sql_retry_policy)
workflow.add_node("SQL Executor", self.sql_executor, retry=execution_retry_policy)
workflow.add_node("Chart Generator", self.chart_gen)
workflow.add_node("Insight Generator", self.insight_generator)
workflow.add_node("Result Synthesizer", self.result_synthesizer)
Expand Down Expand Up @@ -101,25 +106,25 @@ def unknown_handler(self, state: State) -> State:

def sql_generator(self, state: State) -> State:
print(f"Generating SQL for query: {state.query}")
if state.error:
return state
if state.simulate_error_sql_gen:
raise Exception("Error in SQL generation")
try:
sql = self.waii_sql_generator(question=state.query)
return state.model_copy(update={"sql": sql})
except Exception as e:
return state.model_copy(update={"error": str(e)})
raise Exception(f"Error in SQL generation: {str(e)}")

def sql_executor(self, state: State) -> State:
print(f"Executing query: {state.sql}")
if state.error:
return state
print(f"Executing query: {state.query}")
if state.simulate_error_sql_exec:
raise Exception("Error in SQL execution")
try:
data = self.waii_sql_executor(query=state.sql)
updated_state = state.model_copy(update={"data": data}, deep=True)
print(f"State after exec: {updated_state}")
return updated_state
except Exception as e:
return state.model_copy(update={"error": str(e)})
raise Exception(f"Error in SQL generation: {str(e)}")

def chart_gen(self, state: State) -> State:
print(f"Generating chart for data: {state.data}")
Expand Down Expand Up @@ -152,35 +157,6 @@ def result_synthesizer(self, state: State) -> State:
print(f"Response: {response}")
return state.model_copy(update={"response": response}, deep=True)

def decision_step(self, state: State) -> dict:
print("Deciding the next step based on the query result...")
# Example decision logic: If 'data' has more than one row, generate a chart.
if len(state.data) > 1:
print("Decision: Generating chart")
return {"path_decision": "chart_generator"}
else:
print("Decision: Generating insight")
return {"path_decision": "insight_generator"}

def decision_step_conditional_branch(self, state: State):
print(f"Routing based on path_decision: {state.path_decision}")
if state.path_decision == "chart_generator":
return [Send("Chart Generator", state)]
elif state.path_decision == "Insight Generator":
return [Send("Insight Generator", state)]
else:
raise ValueError(f"Unknown path_decision: {state.path_decision}")

def should_retry_generic(self, state: State, success_next: str, retry_next: str = "SQL Generator"):
print(f"Checking if we should retry in {success_next}: error: {state.error}")
if state.error and state.attempts < 3:
return [Send(retry_next, State(**state.dict(), attempts=state.attempts + 1, error=None))]
print(f"Moving on to {success_next}")
return [Send(success_next, State(**state.dict(), error=None, attempts=0))]

def create_retry_func(self, success_next: str):
return lambda state: self.should_retry_generic(state, success_next)

def waii_intent_classification(self, query: str) -> str:
system_message = """You are an expert in classifying questions into 'sql', 'data_visualization', 'insight', or 'others'."""
question = f"Can you classify the following question into one of these categories? Question: '{query}'. " \
Expand Down Expand Up @@ -229,6 +205,7 @@ def waii_insight_generator(param: List[str]) -> str:
def run_workflow(self):
while True:
try:
# Use simulate_error_sql_exec=True or simulate_error_sql_gen=True to simulate errors
initial_state = State()
app_response = self.app.invoke(initial_state)
print(f"{app_response['response']}")
Expand Down

0 comments on commit 9cae755

Please sign in to comment.