Skip to content

Commit 21e28ce

Browse files
Add step-level retry policies (#294)
1 parent 68dd6e3 commit 21e28ce

File tree

11 files changed

+748
-53
lines changed

11 files changed

+748
-53
lines changed

ARCHITECTURE.md

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,11 @@ await step.sleep("wait-one-hour", "1h");
225225
### 4.1. Step Failures & Retries
226226

227227
When a step's function throws an error, the framework records the error in the
228-
`step_attempt` and sets its status to `failed`. The error then propagates up. If
229-
the run is retryable, the entire workflow run is rescheduled with exponential
230-
backoff by setting its `availableAt` timestamp to a future time. On the next
231-
execution, the replay will reach the failed step and re-execute its function.
228+
`step_attempt` and sets its status to `failed`. The error then propagates up.
229+
Retry scheduling for that failure is driven by the failed-attempt count for that
230+
specific `stepName` in the workflow run. If retryable, the workflow run is
231+
rescheduled by setting `availableAt` to the computed backoff time. On the next
232+
execution, replay reaches the failed step and re-executes its function.
232233

233234
### 4.2. Workflow Failures & Retries
234235

@@ -241,25 +242,12 @@ status is set to `failed` permanently.
241242

242243
### 4.3. Retry Policy
243244

244-
A `RetryPolicy` controls the backoff and retry limits for a workflow run. They
245-
are defined at the workflow spec level and apply to all runs of that workflow:
245+
OpenWorkflow uses the same `RetryPolicy` shape for two separate concerns:
246246

247-
```ts
248-
const workflow = ow.defineWorkflow(
249-
{
250-
name: "charge-customer",
251-
retryPolicy: {
252-
initialInterval: "1s",
253-
backoffCoefficient: 2,
254-
maximumInterval: "100s",
255-
maximumAttempts: Infinity, // unlimited
256-
},
257-
},
258-
async ({ step }) => {
259-
// workflow implementation
260-
},
261-
);
262-
```
247+
- **Step retry policy** (`step.run({ retryPolicy })` or step defaults) for
248+
step-function failures. Budgets/backoff are tracked per step name.
249+
- **Workflow retry policy** (`workflow.spec.retryPolicy`) for workflow-level
250+
failures outside step execution.
263251

264252
### 4.4. Workflow Deadlines
265253

packages/docs/docs/advanced-patterns.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const data = await step.run({ name: "fetch-external-api" }, async () => {
3939
```
4040

4141
The default retry behavior works well for most cases. Configure retry behavior
42-
at the workflow level with `retryPolicy`, or handle errors explicitly in your
42+
per step with `step.run({ retryPolicy })`, or handle errors explicitly in your
4343
step functions for custom logic. See [Retries](/docs/retries) for details.
4444

4545
## Sleeping (Pausing) Workflows

packages/docs/docs/retries.mdx

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,13 @@ Each retry:
4545
- Returns cached results for completed steps
4646
- Re-executes the failed step
4747

48-
## Exponential Backoff
48+
## Retry Policy
49+
50+
Both steps and workflows use the same retry policy shape. A retry policy
51+
controls exponential backoff — how long to wait between retries, how fast delays
52+
grow, and when to stop retrying.
4953

50-
Failed workflows are rescheduled with increasing delays:
54+
With the defaults, retry delays look like this:
5155

5256
| Attempt | Delay |
5357
| ------- | --------- |
@@ -58,28 +62,58 @@ Failed workflows are rescheduled with increasing delays:
5862
| 5 | ~8s |
5963
| ... | ... |
6064

61-
This prevents overwhelming external services during outages.
65+
This prevents overwhelming external services during outages. Retries continue
66+
until canceled, until `deadlineAt` is reached (or the next retry would pass it),
67+
or until `maximumAttempts` is exhausted.
6268

63-
By default, retries continue until canceled or until `deadlineAt` is reached (or
64-
the next retry would pass it). If `maximumAttempts` is configured, the workflow
65-
is also marked `failed` once that limit is reached.
69+
Retry policies have the following fields:
6670

67-
## Retry Policy
71+
| Field | Default | Description |
72+
| -------------------- | ---------- | --------------------------------------------------- |
73+
| `initialInterval` | `"1s"` | Delay before the first retry after a failed attempt |
74+
| `backoffCoefficient` | `2` | Multiplier applied to each subsequent retry delay |
75+
| `maximumInterval` | `"100s"` | Upper bound for retry delay |
76+
| `maximumAttempts` | `Infinity` | Maximum attempts, including the initial one |
77+
78+
### Step Retry Policy
79+
80+
Each `step.run(...)` can define its own retry policy. If you omit `retryPolicy`,
81+
OpenWorkflow uses the defaults shown above.
82+
83+
```ts
84+
await step.run(
85+
{
86+
name: "call-api",
87+
retryPolicy: {
88+
initialInterval: "500ms",
89+
backoffCoefficient: 2,
90+
maximumInterval: "30s",
91+
maximumAttempts: 5,
92+
},
93+
},
94+
async () => {
95+
// step logic
96+
},
97+
);
98+
```
99+
100+
### Workflow Retry Policy
68101

69-
Retry behavior is configured per workflow using `retryPolicy` in the workflow
70-
spec:
102+
Workflow-level `retryPolicy` applies to non-step failures — for example, missing
103+
workflow definitions or errors thrown outside `step.run`. If you omit
104+
`retryPolicy` (or individual fields), OpenWorkflow uses the same defaults.
71105

72106
```ts
73107
import { defineWorkflow } from "openworkflow";
74108

75-
export const chargeCustomer = defineWorkflow(
109+
defineWorkflow(
76110
{
77111
name: "charge-customer",
78112
retryPolicy: {
79-
initialInterval: "1s",
113+
initialInterval: "500ms",
80114
backoffCoefficient: 2,
81-
maximumInterval: "100s",
82-
maximumAttempts: Infinity,
115+
maximumInterval: "30s",
116+
maximumAttempts: 5,
83117
},
84118
},
85119
async ({ step }) => {
@@ -88,15 +122,6 @@ export const chargeCustomer = defineWorkflow(
88122
);
89123
```
90124

91-
`retryPolicy` is optional. Any omitted fields use defaults.
92-
93-
| Field | Description |
94-
| -------------------- | ----------------------------------------------------- |
95-
| `initialInterval` | Delay before the first retry after a failed attempt |
96-
| `backoffCoefficient` | Multiplier applied to each subsequent retry delay |
97-
| `maximumInterval` | Upper bound for retry delay |
98-
| `maximumAttempts` | Maximum total attempts, including the initial attempt |
99-
100125
## What Triggers a Retry
101126

102127
Retries happen when:

packages/docs/docs/steps.mdx

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ const result = await step.run({ name: "fetch-user" }, async () => {
2222
});
2323
```
2424

25-
The first argument is a config object with the step name. The second argument is
26-
the async function to execute. The function's return value is stored and
27-
returned on subsequent replays.
25+
The first argument is a config object with the step name (and optional
26+
`retryPolicy`). The second argument is the async function to execute. The
27+
function's return value is stored and returned on subsequent replays.
2828

2929
## Why Steps Matter
3030

@@ -138,6 +138,31 @@ Pauses the workflow until a specified duration has elapsed. See
138138
await step.sleep("wait-one-hour", "1h");
139139
```
140140

141+
## Retry Policy (Optional)
142+
143+
Control backoff and retry limits for an individual step:
144+
145+
```ts
146+
await step.run(
147+
{
148+
name: "charge-card",
149+
retryPolicy: {
150+
initialInterval: "1s",
151+
backoffCoefficient: 2,
152+
maximumInterval: "30s",
153+
maximumAttempts: 5,
154+
},
155+
},
156+
async () => {
157+
await payments.charge();
158+
},
159+
);
160+
```
161+
162+
Any `retryPolicy` fields you omit fall back to defaults. Step retry policies
163+
are independent from the workflow-level retry policy. See
164+
[Retries](/docs/retries) for the full behavior and defaults.
165+
141166
## Error Handling
142167

143168
If a step throws an error, the error is recorded and the workflow fails:

packages/openworkflow/backend.testsuite.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,44 @@ export function testBackend(options: TestBackendOptions): void {
910910
});
911911
});
912912

913+
describe("rescheduleWorkflowRunAfterFailedStepAttempt()", () => {
914+
test("reschedules a running workflow run with explicit availableAt", async () => {
915+
const claimed = await createClaimedWorkflowRun(backend);
916+
const availableAt = new Date(Date.now() + 5000);
917+
const error = { message: "step failed" };
918+
919+
const updated =
920+
await backend.rescheduleWorkflowRunAfterFailedStepAttempt({
921+
workflowRunId: claimed.id,
922+
workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
923+
error,
924+
availableAt,
925+
});
926+
927+
expect(updated.status).toBe("pending");
928+
expect(updated.availableAt?.getTime()).toBe(availableAt.getTime());
929+
expect(updated.error).toEqual(error);
930+
expect(updated.workerId).toBeNull();
931+
expect(updated.startedAt).toBeNull();
932+
expect(updated.finishedAt).toBeNull();
933+
});
934+
935+
test("fails if the worker does not own the run", async () => {
936+
const claimed = await createClaimedWorkflowRun(backend);
937+
938+
await expect(
939+
backend.rescheduleWorkflowRunAfterFailedStepAttempt({
940+
workflowRunId: claimed.id,
941+
workerId: randomUUID(),
942+
error: { message: "step failed" },
943+
availableAt: new Date(Date.now() + 1000),
944+
}),
945+
).rejects.toThrow(
946+
"Failed to reschedule workflow run after failed step attempt",
947+
);
948+
});
949+
});
950+
913951
describe("createStepAttempt()", () => {
914952
test("creates a step attempt", async () => {
915953
const workflowRun = await createClaimedWorkflowRun(backend);

packages/openworkflow/backend.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ export interface Backend {
3636
failWorkflowRun(
3737
params: Readonly<FailWorkflowRunParams>,
3838
): Promise<WorkflowRun>;
39+
rescheduleWorkflowRunAfterFailedStepAttempt(
40+
params: Readonly<RescheduleWorkflowRunAfterFailedStepAttemptParams>,
41+
): Promise<WorkflowRun>;
3942
cancelWorkflowRun(
4043
params: Readonly<CancelWorkflowRunParams>,
4144
): Promise<WorkflowRun>;
@@ -108,6 +111,13 @@ export interface FailWorkflowRunParams {
108111
retryPolicy: RetryPolicy;
109112
}
110113

114+
export interface RescheduleWorkflowRunAfterFailedStepAttemptParams {
115+
workflowRunId: string;
116+
workerId: string;
117+
error: SerializedError;
118+
availableAt: Date;
119+
}
120+
111121
export interface CancelWorkflowRunParams {
112122
workflowRunId: string;
113123
}

packages/openworkflow/execution.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import { OpenWorkflow } from "./client.js";
2+
import type { StepAttempt } from "./core/step.js";
3+
import { createStepExecutionStateFromAttempts } from "./execution.js";
24
import { BackendPostgres } from "./postgres.js";
35
import { DEFAULT_POSTGRES_URL } from "./postgres/postgres.js";
46
import { randomUUID } from "node:crypto";
@@ -540,6 +542,61 @@ describe("executeWorkflow", () => {
540542
});
541543
});
542544

545+
describe("createStepExecutionStateFromAttempts", () => {
546+
test("builds successful cache and failed-count map from mixed history", () => {
547+
const completed = createMockStepAttempt({
548+
id: "completed-a",
549+
stepName: "step-a",
550+
status: "completed",
551+
output: "a",
552+
});
553+
const failedA1 = createMockStepAttempt({
554+
id: "failed-a-1",
555+
stepName: "step-a",
556+
status: "failed",
557+
});
558+
const failedA2 = createMockStepAttempt({
559+
id: "failed-a-2",
560+
stepName: "step-a",
561+
status: "failed",
562+
});
563+
const failedB = createMockStepAttempt({
564+
id: "failed-b",
565+
stepName: "step-b",
566+
status: "failed",
567+
});
568+
const running = createMockStepAttempt({
569+
id: "running-c",
570+
stepName: "step-c",
571+
status: "running",
572+
});
573+
574+
const state = createStepExecutionStateFromAttempts([
575+
completed,
576+
failedA1,
577+
failedA2,
578+
failedB,
579+
running,
580+
]);
581+
582+
expect(state.cache.size).toBe(1);
583+
expect(state.cache.get("step-a")).toBe(completed);
584+
expect(state.cache.has("step-b")).toBe(false);
585+
expect(state.cache.has("step-c")).toBe(false);
586+
587+
expect(state.failedCountsByStepName.get("step-a")).toBe(2);
588+
expect(state.failedCountsByStepName.get("step-b")).toBe(1);
589+
expect(state.failedCountsByStepName.has("step-c")).toBe(false);
590+
});
591+
592+
test("returns empty cache and counts for empty history", () => {
593+
const state = createStepExecutionStateFromAttempts([]);
594+
595+
expect(state.cache.size).toBe(0);
596+
expect(state.failedCountsByStepName.size).toBe(0);
597+
});
598+
});
599+
543600
async function createBackend(): Promise<BackendPostgres> {
544601
return await BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
545602
namespaceId: randomUUID(),
@@ -549,3 +606,30 @@ async function createBackend(): Promise<BackendPostgres> {
549606
function sleep(ms: number): Promise<void> {
550607
return new Promise((resolve) => setTimeout(resolve, ms));
551608
}
609+
610+
function createMockStepAttempt(
611+
overrides: Partial<StepAttempt> = {},
612+
): StepAttempt {
613+
const status = overrides.status ?? "completed";
614+
615+
return {
616+
namespaceId: "default",
617+
id: "step-attempt-id",
618+
workflowRunId: "workflow-run-id",
619+
stepName: "step",
620+
kind: "function",
621+
status,
622+
config: {},
623+
context: null,
624+
output: null,
625+
error: null,
626+
childWorkflowRunNamespaceId: null,
627+
childWorkflowRunId: null,
628+
startedAt: new Date("2026-01-01T00:00:00.000Z"),
629+
finishedAt:
630+
status === "running" ? null : new Date("2026-01-01T00:00:01.000Z"),
631+
createdAt: new Date("2026-01-01T00:00:00.000Z"),
632+
updatedAt: new Date("2026-01-01T00:00:01.000Z"),
633+
...overrides,
634+
};
635+
}

0 commit comments

Comments
 (0)