Skip to content

Expense example #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
with:
python-version: ${{ matrix.python }}
- run: uv tool install poethepoet
- run: uv sync --group=dsl --group=encryption --group=trio-async
- run: uv sync --group=dsl --group=encryption --group=trio-async --group=expense
- run: poe lint
- run: mkdir junit-xml
- run: poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency.
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [expense](expense) - Human-in-the-loop processing and asynchronous activity completion.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
Expand Down
83 changes: 83 additions & 0 deletions expense/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Expense

This sample workflow processes an expense request. It demonstrates human-in-the loop processing using Temporal's signal mechanism.

## Overview

This sample demonstrates the following workflow:

1. **Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system.

2. **Register for Decision**: The workflow calls `register_for_decision_activity`, which registers the workflow with the external UI system so it can receive signals when decisions are made.

3. **Wait for Signal**: The workflow uses `workflow.wait_condition()` to wait for an external signal containing the approval/rejection decision.

4. **Signal-Based Completion**: When a human approves or rejects the expense, the external UI system sends a signal to the workflow using `workflow_handle.signal()`, providing the decision result.

5. **Process Payment**: Once the workflow receives the approval decision via signal, it executes the `payment_activity` to complete the simulated expense processing.

This pattern enables human-in-the-loop workflows where workflows can wait as long as necessary for external decisions using Temporal's durable signal mechanism.

## Steps To Run Sample

* You need a Temporal service running. See the main [README.md](../README.md) for more details.
* Start the sample expense system UI:
```bash
uv run -m expense.ui
```
* Start workflow and activity workers:
```bash
uv run -m expense.worker
```
* Start expense workflow execution:
```bash
# Start workflow and return immediately (default)
uv run -m expense.starter

# Start workflow and wait for completion
uv run -m expense.starter --wait

# Start workflow with custom expense ID
uv run -m expense.starter --expense-id "my-expense-123"

# Start workflow with custom ID and wait for completion
uv run -m expense.starter --wait --expense-id "my-expense-123"
```
* When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense.
* You should see the workflow complete after you approve the expense. You can also reject the expense.

## Running Tests

```bash
# Run all expense tests
uv run -m pytest tests/expense/ -v

# Run specific test categories
uv run -m pytest tests/expense/test_expense_workflow.py -v # Workflow tests
uv run -m pytest tests/expense/test_expense_activities.py -v # Activity tests
uv run -m pytest tests/expense/test_expense_integration.py -v # Integration tests
uv run -m pytest tests/expense/test_ui.py -v # UI tests

# Run a specific test
uv run -m pytest tests/expense/test_expense_workflow.py::TestWorkflowPaths::test_workflow_approved_complete_flow -v
```

## Key Concepts Demonstrated

* **Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction
* **Workflow Signals**: Using `workflow.signal()` and `workflow.wait_condition()` for external communication
* **Signal-Based Completion**: External systems sending signals to workflows for asynchronous decision-making
* **External System Integration**: Communication between workflows and external systems via web services and signals
* **HTTP Client Lifecycle Management**: Proper resource management with worker-scoped HTTP clients

## Troubleshooting

If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything.

## Files

* `workflow.py` - The main expense processing workflow with signal handling
* `activities.py` - Three activities: create expense, register for decision, process payment
* `ui.py` - A demonstration expense approval system web UI with signal sending
* `worker.py` - Worker to run workflows and activities with HTTP client lifecycle management
* `starter.py` - Client to start workflow executions with optional completion waiting
3 changes: 3 additions & 0 deletions expense/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
EXPENSE_SERVER_HOST = "localhost"
EXPENSE_SERVER_PORT = 8099
EXPENSE_SERVER_HOST_PORT = f"http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}"
122 changes: 122 additions & 0 deletions expense/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from typing import Optional

import httpx
from temporalio import activity
from temporalio.exceptions import ApplicationError

from expense import EXPENSE_SERVER_HOST_PORT

# Module-level HTTP client, managed by worker lifecycle
_http_client: Optional[httpx.AsyncClient] = None


async def initialize_http_client() -> None:
"""Initialize the global HTTP client. Called by worker setup."""
global _http_client
if _http_client is None:
_http_client = httpx.AsyncClient()


async def cleanup_http_client() -> None:
"""Cleanup the global HTTP client. Called by worker shutdown."""
global _http_client
if _http_client is not None:
await _http_client.aclose()
_http_client = None


def get_http_client() -> httpx.AsyncClient:
"""Get the global HTTP client."""
if _http_client is None:
raise RuntimeError(
"HTTP client not initialized. Call initialize_http_client() first."
)
return _http_client


@activity.defn
async def create_expense_activity(expense_id: str) -> None:
if not expense_id:
raise ValueError("expense id is empty")

client = get_http_client()
try:
response = await client.get(
f"{EXPENSE_SERVER_HOST_PORT}/create",
params={"is_api_call": "true", "id": expense_id},
)
response.raise_for_status()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically 4xx is probably a ApplicationError with non_retryable=True, but that's a bit pedantic. But with this setup, an activity that, say, has invalid auth will retry forever

except httpx.HTTPStatusError as e:
if 400 <= e.response.status_code < 500:
raise ApplicationError(
f"Client error: {e.response.status_code} {e.response.text}",
non_retryable=True,
) from e
raise

body = response.text

if body == "SUCCEED":
activity.logger.info(f"Expense created. ExpenseID: {expense_id}")
return

raise Exception(body)


@activity.defn
async def register_for_decision_activity(expense_id: str) -> None:
"""
Register the expense for decision. This activity registers the workflow
with the external system so it can receive signals when decisions are made.
"""
if not expense_id:
raise ValueError("expense id is empty")

logger = activity.logger
http_client = get_http_client()

# Get workflow info to register with the UI system
activity_info = activity.info()
workflow_id = activity_info.workflow_id

# Register the workflow ID with the UI system so it can send signals
try:
response = await http_client.post(
f"{EXPENSE_SERVER_HOST_PORT}/registerWorkflow",
params={"id": expense_id},
data={"workflow_id": workflow_id},
)
response.raise_for_status()
logger.info(f"Registered expense for decision. ExpenseID: {expense_id}")
except Exception as e:
logger.error(f"Failed to register workflow with UI system: {e}")
raise


@activity.defn
async def payment_activity(expense_id: str) -> None:
if not expense_id:
raise ValueError("expense id is empty")

client = get_http_client()
try:
response = await client.post(
f"{EXPENSE_SERVER_HOST_PORT}/action",
data={"is_api_call": "true", "type": "payment", "id": expense_id},
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
if 400 <= e.response.status_code < 500:
raise ApplicationError(
f"Client error: {e.response.status_code} {e.response.text}",
non_retryable=True,
) from e
raise

body = response.text

if body == "SUCCEED":
activity.logger.info(f"payment_activity succeed ExpenseID: {expense_id}")
return

raise Exception(body)
52 changes: 52 additions & 0 deletions expense/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import argparse
import asyncio
import uuid

from temporalio.client import Client

from .workflow import SampleExpenseWorkflow


async def main():
parser = argparse.ArgumentParser(description="Start an expense workflow")
parser.add_argument(
"--wait",
action="store_true",
help="Wait for workflow completion (default: start and return immediately)",
)
parser.add_argument(
"--expense-id",
type=str,
help="Expense ID to use (default: generate random UUID)",
)
args = parser.parse_args()

# The client is a heavyweight object that should be created once per process.
client = await Client.connect("localhost:7233")

expense_id = args.expense_id or str(uuid.uuid4())
workflow_id = f"expense_{expense_id}"

# Start the workflow
handle = await client.start_workflow(
SampleExpenseWorkflow.run,
expense_id,
id=workflow_id,
task_queue="expense",
)

print(f"Started workflow WorkflowID {handle.id} RunID {handle.result_run_id}")
print(f"Workflow will register itself with UI system for expense {expense_id}")

if args.wait:
print("Waiting for workflow to complete...")
result = await handle.result()
print(f"Workflow completed with result: {result}")
return result
else:
print("Workflow started. Use --wait flag to wait for completion.")
return None


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading