Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run black/format #3

Merged
merged 5 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ With this repository cloned, run the following at the root of the directory:
```bash
poetry install
```
That loads all required dependencies.
That loads all required dependencies.

Then run the worker and workflow.

Expand All @@ -36,27 +36,45 @@ poetry run python run_workflow.py
![](stat/../static/webui_failure.png)
![](static/failure.gif)

### Demo: Happy Path
Enter your booking information in the Flask app <http://127.0.0.1:5000>, then see the tasks in the Web UI at <http://localhost:8233/>.

### Demo: Recover forward
Select your running or completed Workflow ID.

Under **WorkflowExecutionCompleted** car, hotel and flight are booked.

Notice each is executed via an activity.

### Demo: Recover Forward (retries)

In the `run_workflow.py` modify the global variable `ATTEMPTS = 1` to `ATTEMPTS = 3`, so that the `book_flight` Activity attempts a retry 3 times.
Render your booking information in the Flask app <http://127.0.0.1:5000>, then see the tasks in the Web UI at <http://localhost:8233/>.

Enter your booking information in the Flask app <http://127.0.0.1:5000>, then see the tasks in the Web UI at <http://localhost:8233/>.
Select your running or completed Workflow ID.

Under **Recent** events, select the failed Activity, `book_flight` (in compact view).
Under **ActivityTaskStarted** you'll see the Attempts (5), and the stack trace message letting you know the last failed attempt.

### Demo: Recover backwards
Under **ActivityTaskStarted** you'll see the Attempts (3), and the stack trace message letting you know the last failed attempt.

Then notice how the Workflow executes the compensations.

### Demo: Recover Backward (rollback)

In the `run_workflow.py` modify the global variable `ATTEMPTS = 3` to `ATTEMPTS = 5`, so that the `book_flight` Activity attempts a retry 5 times.
Render your booking information in the Flask app <http://127.0.0.1:5000>, then see the tasks in the Web UI at <http://localhost:8233/>.

In the `book_workflow.py` modify the global variable `ATTEMPTS_FLIGHT = 5` to `ATTEMPTS_FLIGHT = 2`, so that the `book_flight` Activity attempts a retry twice.
Renter your booking information in the Flask app <http://127.0.0.1:5000>, then see the tasks in the Web UI at <http://localhost:8233/>.
Select your running or completed Workflow ID.

Under **Recent** events, select the failed Activity, `book_flight` (in compact view).
Under **ActivityTaskStarted** you'll see the Attempts (2), and the stack trace message letting you know the last failed attempt.
Then notice how the Workflow executes the compensations.

Under **ActivityTaskStarted** you'll see the Attempts (5), and the stack trace message letting you know the last failed attempt.

Under **ActivityTaskFailed** you'll see error `Too many retries, flight booking not possible at this time!`. You will also see that since the booking cannot be completed, rollback (undo) is performed using compensation.

## Design

The booking saga is implemented using the Temporal Workflow framework, which provides a robust and fault-tolerant platform for coordinating distributed transactions.

The saga workflow consists of three activities: `book_car()`, `book_hotel)()`, and `book_flight)()`, each of which is responsible for making a reservation with the corresponding service provider. If any of these activities fail, the workflow will trigger the corresponding compensating action (`undo_book_car()`, `undo_book_hotel()`, or `undo_book_flight()`) to undo any previous bookings.

The `non_retryable_error_types` parameter is used to specify a list of error types that should not be retried when a Workflow or Activity fails.
The `non_retryable_error_types` parameter is used to specify a list of error types that should not be retried when a Workflow or Activity fails.
9 changes: 8 additions & 1 deletion activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class BookVacationInput:
book_car_id: str
book_hotel_id: str
book_flight_id: str
attempts: int


@activity.defn
Expand All @@ -26,12 +27,18 @@ async def book_hotel(input: BookVacationInput) -> str:

@activity.defn
async def book_flight(input: BookVacationInput) -> str:
if activity.info().attempt < 4:
if activity.info().attempt < input.attempts:
activity.heartbeat(
f"Invoking activity, attempt number {activity.info().attempt}"
)
await asyncio.sleep(1)
raise RuntimeError("Service is down")
elif activity.info().attempt > 3:
raise RuntimeError(
"Too many retries, flight booking not possible at this time!"
)

print(f"Booking flight: {input.book_flight_id}")
return f"Booking flight: {input.book_flight_id}"


Expand Down
8 changes: 5 additions & 3 deletions book_workflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import timedelta

from temporalio import workflow
Expand All @@ -6,8 +7,6 @@
with workflow.unsafe.imports_passed_through():
from activities import BookVacationInput, book_car, book_flight, book_hotel

ATTEMPTS_FLIGHT = 5


@workflow.defn
class BookWorkflow:
Expand All @@ -29,6 +28,9 @@ async def run(self, input: BookVacationInput):
start_to_close_timeout=timedelta(seconds=10),
)

# Sleep to simulate flight booking taking longer, allowing for worker restart while workflow running
await asyncio.sleep(15)

compensations.append("undo_book_flight")
output += " " + await workflow.execute_activity(
book_flight,
Expand All @@ -37,7 +39,7 @@ async def run(self, input: BookVacationInput):
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=1),
maximum_attempts=ATTEMPTS_FLIGHT,
maximum_attempts=input.attempts,
non_retryable_error_types=["Exception"],
),
)
Expand Down
21 changes: 12 additions & 9 deletions run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@


async def main():

if os.getenv('TEMPORAL_MTLS_TLS_CERT') and os.getenv('TEMPORAL_MTLS_TLS_KEY') is not None:
if (
os.getenv("TEMPORAL_MTLS_TLS_CERT")
and os.getenv("TEMPORAL_MTLS_TLS_KEY") is not None
):
server_root_ca_cert: Optional[bytes] = None
f = open(os.getenv('TEMPORAL_MTLS_TLS_CERT'), "rb")
client_cert = f.read()
with open(os.getenv("TEMPORAL_MTLS_TLS_CERT"), "rb") as f:
client_cert = f.read()

f = open(os.getenv('TEMPORAL_MTLS_TLS_KEY'), "rb")
client_key = f.read()
with open(os.getenv("TEMPORAL_MTLS_TLS_KEY"), "rb") as f:
client_key = f.read()

# Start client
client = await Client.connect(
os.getenv('TEMPORAL_HOST_URL'),
namespace=os.getenv('TEMPORAL_NAMESPACE'),
os.getenv("TEMPORAL_HOST_URL"),
namespace=os.getenv("TEMPORAL_NAMESPACE"),
tls=TLSConfig(
server_root_ca_cert=server_root_ca_cert,
client_cert=client_cert,
client_private_key=client_key,
),
)
else:
client = await Client.connect("localhost:7233")
client = await Client.connect("localhost:7233")

worker = Worker(
client,
Expand All @@ -54,5 +56,6 @@ async def main():
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
21 changes: 14 additions & 7 deletions run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

app = Flask(__name__)

# Set to more than 1 to show activity retries due to service down
ATTEMPTS = 1


@app.route("/")
async def display_form():
Expand All @@ -30,28 +33,32 @@ async def book_vacation():
book_car_id=car,
book_hotel_id=hotel,
book_flight_id=flight,
attempts=ATTEMPTS,
)

if os.getenv('TEMPORAL_MTLS_TLS_CERT') and os.getenv('TEMPORAL_MTLS_TLS_KEY') is not None:
if (
os.getenv("TEMPORAL_MTLS_TLS_CERT")
and os.getenv("TEMPORAL_MTLS_TLS_KEY") is not None
):
server_root_ca_cert: Optional[bytes] = None
f = open(os.getenv('TEMPORAL_MTLS_TLS_CERT'), "rb")
f = open(os.getenv("TEMPORAL_MTLS_TLS_CERT"), "rb")
client_cert = f.read()

f = open(os.getenv('TEMPORAL_MTLS_TLS_KEY'), "rb")
f = open(os.getenv("TEMPORAL_MTLS_TLS_KEY"), "rb")
client_key = f.read()

# Start client
client = await Client.connect(
os.getenv('TEMPORAL_HOST_URL'),
namespace=os.getenv('TEMPORAL_NAMESPACE'),
os.getenv("TEMPORAL_HOST_URL"),
namespace=os.getenv("TEMPORAL_NAMESPACE"),
tls=TLSConfig(
server_root_ca_cert=server_root_ca_cert,
client_cert=client_cert,
client_private_key=client_key,
),
)
else:
client = await Client.connect("localhost:7233")
client = await Client.connect("localhost:7233")

result = await client.execute_workflow(
BookWorkflow.run,
Expand Down Expand Up @@ -81,4 +88,4 @@ async def book_vacation():


if __name__ == "__main__":
app.run(host='0.0.0.0', debug=True)
app.run(host="0.0.0.0", debug=True)