Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agents-api): Fix bug in task-execution workflow and uuid-int-list-to-str fn #476

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion agents-api/agents_api/common/utils/cozo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from types import SimpleNamespace
from uuid import UUID

from beartype import beartype
from pycozo import Client

# Define a mock client for testing purposes, simulating Cozo API client behavior.
Expand All @@ -20,5 +21,6 @@
)


def uuid_int_list_to_uuid4(data) -> UUID:
@beartype
def uuid_int_list_to_uuid4(data: list[int]) -> UUID:
return UUID(bytes=b"".join([i.to_bytes(1, "big") for i in data]))
1 change: 1 addition & 0 deletions agents-api/agents_api/models/execution/get_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

@rewrap_exceptions(
{
AssertionError: partialclass(HTTPException, status_code=404),
QueryException: partialclass(HTTPException, status_code=400),
ValidationError: partialclass(HTTPException, status_code=400),
TypeError: partialclass(HTTPException, status_code=400),
Expand Down
6 changes: 5 additions & 1 deletion agents-api/agents_api/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ def fix_uuid(

fixed = {
**item,
**{attr: uuid_int_list_to_uuid4(item[attr]) for attr in id_attrs},
**{
attr: uuid_int_list_to_uuid4(item[attr])
for attr in id_attrs
if isinstance(item[attr], list)
},
}

return fixed
Expand Down
12 changes: 1 addition & 11 deletions agents-api/agents_api/routers/tasks/get_execution_details.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from fastapi import HTTPException, status
from pydantic import UUID4

from agents_api.autogen.openapi_model import (
Expand All @@ -13,13 +12,4 @@

@router.get("/executions/{execution_id}", tags=["executions"])
async def get_execution_details(execution_id: UUID4) -> Execution:
try:
return get_execution_query(execution_id=execution_id)
except AssertionError as e:
print("-" * 100)
print(e)
print("-" * 100)

raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Execution not found"
) from e
return get_execution_query(execution_id=execution_id)
102 changes: 57 additions & 45 deletions agents-api/agents_api/workflows/task_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
StepContext,
StepOutcome,
)
from ..env import testing
from ..env import debug, testing


STEP_TO_ACTIVITY = {
Expand Down Expand Up @@ -71,6 +71,26 @@
GenericStep = RootModel[WorkflowStep]


# TODO: find a way to transition to error if workflow or activity times out.


async def transition(state, context, **kwargs) -> None:
# NOTE: The state variable is closured from the outer scope
transition_request = CreateTransitionRequest(
current=context.cursor,
**{
**state.model_dump(exclude_unset=True),
**kwargs, # Override with any additional kwargs
},
)

await workflow.execute_activity(
task_steps.transition_step,
args=[context, transition_request],
schedule_to_close_timeout=timedelta(seconds=2),
)


@workflow.defn
class TaskExecutionWorkflow:
@workflow.run
Expand All @@ -93,7 +113,7 @@ async def run(

# ---

# 1a. Set global state
# 1. Set global state
# (By default, exit if last otherwise transition 'step' to the next step)
state = PendingTransition(
type="finish" if context.is_last_step else "step",
Expand All @@ -103,23 +123,6 @@ async def run(
metadata={"__meta__": {"step_type": step_type.__name__}},
)

# 1b. Prep a transition request
async def transition(**kwargs) -> None:
# NOTE: The state variable is closured from the outer scope
transition_request = CreateTransitionRequest(
current=context.cursor,
**{
**state.model_dump(exclude_unset=True),
**kwargs, # Override with any additional kwargs
},
)

await workflow.execute_activity(
task_steps.transition_step,
args=[context, transition_request],
schedule_to_close_timeout=timedelta(seconds=600),
)

# ---

# 2. Transition to starting if not done yet
Expand All @@ -142,48 +145,53 @@ async def transition(**kwargs) -> None:

# 3. Execute the current step's activity if applicable

try:
if activity := STEP_TO_ACTIVITY.get(step_type):
execute_activity = workflow.execute_activity
elif activity := STEP_TO_LOCAL_ACTIVITY.get(step_type):
execute_activity = workflow.execute_local_activity
else:
execute_activity = None
if activity := STEP_TO_ACTIVITY.get(step_type):
execute_activity = workflow.execute_activity
elif activity := STEP_TO_LOCAL_ACTIVITY.get(step_type):
execute_activity = workflow.execute_local_activity
else:
execute_activity = None

outcome = None
if execute_activity:
outcome = None

if execute_activity:
try:
outcome = await execute_activity(
activity,
context,
#
# TODO: This should be a configurable timeout everywhere based on the task
schedule_to_close_timeout=timedelta(seconds=3 if testing else 600),
schedule_to_close_timeout=timedelta(
seconds=3 if debug or testing else 600
),
)

except Exception as e:
await transition(type="error", output=dict(error=e))
raise ApplicationError(f"Activity {activity} threw error: {e}") from e
except Exception as e:
await transition(state, context, type="error", output=dict(error=e))
raise ApplicationError(f"Activity {activity} threw error: {e}") from e

# ---

# 4. Then, based on the outcome and step type, decide what to do next
match context.current_step, outcome:
# Handle errors (activity returns None)
case step, StepOutcome(error=error) if error is not None:
await transition(type="error", output=dict(error=error))
await transition(state, context, type="error", output=dict(error=error))
raise ApplicationError(
f"step {type(step).__name__} threw error: {error}"
)

case LogStep(), StepOutcome(output=output):
# Add the logged message to transition history
await transition(output=dict(logged=output))
await transition(state, context, output=dict(logged=output))

# Set the output to the current input
state.output = context.current_input

case ReturnStep(), StepOutcome(output=output):
await transition(output=output, type="finish", next=None)
await transition(
state, context, output=output, type="finish", next=None
)
return output # <--- Byeeee!

case SwitchStep(switch=switch), StepOutcome(output=index) if index >= 0:
Expand Down Expand Up @@ -359,15 +367,18 @@ async def transition(**kwargs) -> None:
case ErrorWorkflowStep(error=error), _:
state.output = dict(error=error)
state.type = "error"
await transition()
await transition(state, context)

raise ApplicationError(f"Error raised by ErrorWorkflowStep: {error}")

case YieldStep(), StepOutcome(
output=output, transition_to=(yield_transition_type, yield_next_target)
):
await transition(
output=output, type=yield_transition_type, next=yield_next_target
state,
output=output,
type=yield_transition_type,
next=yield_next_target,
)

state.output = await workflow.execute_child_workflow(
Expand All @@ -376,7 +387,7 @@ async def transition(**kwargs) -> None:
)

case WaitForInputStep(), StepOutcome(output=output):
await transition(output=output, type="wait", next=None)
await transition(state, context, output=output, type="wait", next=None)

state.type = "resume"
state.output = await execute_activity(
Expand All @@ -391,7 +402,7 @@ async def transition(**kwargs) -> None:
raise ApplicationError("Not implemented")

# 5. Create transition for completed step
await transition()
await transition(state, context)

# ---

Expand All @@ -400,9 +411,10 @@ async def transition(**kwargs) -> None:
if state.type in ("finish", "cancelled"):
return state.output

# Otherwise, recurse to the next step
# TODO: Should use a continue_as_new workflow ONLY if the next step is a conditional or loop
# Otherwise, we should just call the next step as a child workflow
workflow.continue_as_new(
args=[execution_input, state.next, previous_inputs + [state.output]]
)
else:
# Otherwise, recurse to the next step
# TODO: Should use a continue_as_new workflow ONLY if the next step is a conditional or loop
# Otherwise, we should just call the next step as a child workflow
return workflow.continue_as_new(
args=[execution_input, state.next, previous_inputs + [state.output]]
)
Loading