-
Notifications
You must be signed in to change notification settings - Fork 5
Description
Summary
Optimize SqlAlchemySagaStorage (and keep the option for other persistent storages) by using one session per saga and checkpoint commits instead of a new session and commit on every storage call. The session stays internal to the storage; callers and step handlers never receive or share it.
Problem
With the current design, each storage method that mutates state opens its own session and commits at the end:
- Sessions: A new session per call (
create_saga,update_status,update_context,log_step). For N steps this is on the order of dozens of sessions per saga. - Commits: In the happy path we get 4 + 3·N commits (create + RUNNING, then for each step: log STARTED, update_context, log COMPLETED, plus final context + COMPLETED).
This causes noticeable overhead (round-trips, connection churn) when sagas have multiple steps.
Goals
- Performance: Speed up saga execution, reduce overhead, use as few commits and sessions as possible (one session per saga, one commit per logical checkpoint).
- Backward compatibility: Do not break the existing library API; existing code that does not use the new path keeps working.
- Reliability: Preserve the same guarantees: recovery after crash/deploy, eventual consistency, optimistic locking, strict backward recovery.
Proposed solution
Introduce a scoped run abstraction: one session per saga, created and owned by the storage implementation, never exposed to callers. The orchestrator uses this run and commits only at checkpoints.
1. New abstraction: storage run (session hidden)
- Add a way to create a run from storage, e.g.
create_run() -> AsyncContextManager[ISagaStorageRun]. ISagaStorageRun(or equivalent) exposes the same logical operations as today (e.g.create_saga,update_context,update_status,log_step,load_saga_state,get_step_history) but never commits inside these methods; it also exposescommit()androllback().- The session is created inside the storage when a run is created and lives only inside the run; it is never returned or shared. Only the orchestrator (SagaTransaction) uses the run and calls
commit()at checkpoints.
2. Checkpoints (orchestrator commits)
- After creating the saga and setting status to RUNNING → one commit.
- After each successfully completed step (all step writes in one transaction: e.g. log STARTED, update context, log COMPLETED) → one commit per step.
- At completion: final context + status COMPLETED → one commit.
- During compensation: after switching to COMPENSATING and after each compensated step → one commit per step (and one for final FAILED if needed).
So for N steps we go from 4 + 3·N commits to 2 + N (or similar), and from many sessions to one session per saga.
3. Backward compatibility
- Existing
ISagaStoragemethods remain unchanged (same signatures and behavior when used without a run). - New behavior is opt-in: storage may implement
create_run(). If the orchestrator detects that storage supports runs, it uses the run path; otherwise it keeps the current “one session per call” path. MemorySagaStoragecan either omitcreate_run()(orchestrator falls back to current behavior) or implement it as a no-op / in-memory run with the same checkpoint semantics. No change required for existing consumers that only use the current API.
4. Reliability and eventual consistency
- Checkpoints ensure that after each commit we have a consistent persisted state (status, context, step history). Recovery can resume from the last completed step or continue compensation.
- Per-step atomicity: Within one run, all writes for a step (e.g. log + context update) are in a single transaction, then one
commit(). No half-written step state. - Recovery:
get_sagas_for_recovery,load_saga_state,get_step_historycontinue to work (either outside a run or via short-lived sessions). Persisted data after checkpoints is the same as today; only the frequency of commits changes. - Optimistic locking:
update_context(..., current_version=...)still runs in the same run transaction and commits with the rest of the step; no version/context split. - Strict backward recovery: Unchanged; transition to COMPENSATING and compensation steps are checkpointed as above.
Scope of work (for implementation)
- Define
ISagaStorageRun(or equivalent) with: create_saga, update_context, update_status, log_step, load_saga_state, get_step_history, commit(), rollback(); session not exposed. - Add
create_run()(e.g. onISagaStorageor as an optional protocol) returning an async context manager that yields the run and closes the session on exit. - Implement run support in
SqlAlchemySagaStorage: create session insidecreate_run(), run methods use that session and do not commit; run exposescommit()/rollback(). - Update the saga orchestrator (SagaTransaction): when storage supports runs, use
create_run(), perform all step writes within the run, and callcommit()only at the defined checkpoints; otherwise keep current behavior. - Optionally implement
create_run()forMemorySagaStorage(no-op or in-memory run) so tests can exercise the same code path. - Tests: existing saga tests still pass; add or adjust tests for the run path and checkpoint behavior; ensure recovery and compensation still work with the new path.
Non-goals / out of scope
- Exposing the session or run to application code or step handlers.
- Changing the public saga API (
Saga.transaction(..., storage=...)); only internal use of storage may change when using runs. - Reducing commits below “one per step” (that would weaken recovery guarantees).