Skip to content

Draft: DO NOT MERGE, IWF-688: Cody translated iwf workflow pattern from java #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions patterns/timeout/handle_timeout_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import List, Dict, Any, Optional
from datetime import timedelta

from iwf.workflow_state import WorkflowState
from iwf.state_decision import StateDecision
from iwf.object_workflow import ObjectWorkflow
from iwf.workflow_context import WorkflowContext
from iwf.state_def import StateDef
from iwf.command.command_request import CommandRequest
from iwf.command.command_results import CommandResults
from iwf.command.timer_command import TimerCommand


class InitState(WorkflowState):
def get_state_options(self):
return None

async def execute(self, context: WorkflowContext, input_data: Optional[Any]) -> StateDecision:
# Create a timer command that will trigger after 10 seconds
timer_command = TimerCommand(fire_after=timedelta(seconds=10))
command_request = CommandRequest(timer_commands=[timer_command])

# Execute the command
command_results = await context.command_client.execute_command(command_request)

# Check if timer has fired
if command_results.timer_results and command_results.timer_results[0].fired:
# Timer fired, go to timeout state
return StateDecision.single_next_state("TimeoutState")
else:
# Timer not fired, go to task state
return StateDecision.single_next_state("TaskState")


class TimeoutState(WorkflowState):
def get_state_options(self):
return None

async def execute(self, context: WorkflowContext, input_data: Optional[Any]) -> StateDecision:
# Handle timeout logic here
print("Handling timeout...")

# Complete the workflow
return StateDecision.graceful_complete_workflow(None)


class TaskState(WorkflowState):
def get_state_options(self):
return None

async def execute(self, context: WorkflowContext, input_data: Optional[Any]) -> StateDecision:
# Perform the task
print("Performing the task...")

# Complete the workflow
return StateDecision.graceful_complete_workflow(None)


class HandlingTimeoutWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> List[StateDef]:
return [
StateDef.starting_state(InitState()),
StateDef.non_starting_state(TimeoutState()),
StateDef.non_starting_state(TaskState())
]
5 changes: 5 additions & 0 deletions patterns/timeout/iwf-config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# IWF server configuration
IWF_SERVER_URL = "http://localhost:8801"

# Registry configuration
REGISTRY_NAMESPACE = "default"
48 changes: 48 additions & 0 deletions patterns/timeout/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio
import sys
from iwf.client import Client
from iwf.registry import Registry
from iwf.iwf_api.models import WorkflowOptions

from handle_timeout_workflow import HandlingTimeoutWorkflow
from iwf_config import IWF_SERVER_URL, REGISTRY_NAMESPACE

# Create registry
registry = Registry(REGISTRY_NAMESPACE)

# Register workflow
registry.add_workflow(HandlingTimeoutWorkflow())

# Create client
client = Client(IWF_SERVER_URL)


async def start_workflow():
# Start a workflow
workflow_id = "handling-timeout-workflow-" + str(hash(str(sys.argv)))
workflow_options = WorkflowOptions(workflow_id=workflow_id)

# Start the workflow with no input
await client.start_workflow(
workflow_type=HandlingTimeoutWorkflow.__name__,
workflow_options=workflow_options,
workflow_input=None
)
print(f"Started workflow with ID: {workflow_id}")


async def run_worker():
# Start the worker
await client.start_worker(registry)


async def main():
# Start a workflow
await start_workflow()

# Run the worker
await run_worker()


if __name__ == "__main__":
asyncio.run(main())