Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ await step.sleep("wait-one-hour", "1h");
### 4.1. Step Failures & Retries

When a step's function throws an error, the framework records the error in the
`step_attempt` and sets its status to `failed`. The error then propagates up. If
the run is retryable, the entire workflow run is rescheduled with exponential
backoff by setting its `availableAt` timestamp to a future time. On the next
execution, the replay will reach the failed step and re-execute its function.
`step_attempt` and sets its status to `failed`. The error then propagates up.
Retry scheduling for that failure is driven by the failed-attempt count for that
specific `stepName` in the workflow run. If retryable, the workflow run is
rescheduled by setting `availableAt` to the computed backoff time. On the next
execution, replay reaches the failed step and re-executes its function.

### 4.2. Workflow Failures & Retries

Expand All @@ -241,25 +242,12 @@ status is set to `failed` permanently.

### 4.3. Retry Policy

A `RetryPolicy` controls the backoff and retry limits for a workflow run. They
are defined at the workflow spec level and apply to all runs of that workflow:
OpenWorkflow uses the same `RetryPolicy` shape for two separate concerns:

```ts
const workflow = ow.defineWorkflow(
{
name: "charge-customer",
retryPolicy: {
initialInterval: "1s",
backoffCoefficient: 2,
maximumInterval: "100s",
maximumAttempts: Infinity, // unlimited
},
},
async ({ step }) => {
// workflow implementation
},
);
```
- **Step retry policy** (`step.run({ retryPolicy })` or step defaults) for
step-function failures. Budgets/backoff are tracked per step name.
- **Workflow retry policy** (`workflow.spec.retryPolicy`) for workflow-level
failures outside step execution.

### 4.4. Workflow Deadlines

Expand Down
2 changes: 1 addition & 1 deletion packages/docs/docs/advanced-patterns.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const data = await step.run({ name: "fetch-external-api" }, async () => {
```

The default retry behavior works well for most cases. Configure retry behavior
at the workflow level with `retryPolicy`, or handle errors explicitly in your
per step with `step.run({ retryPolicy })`, or handle errors explicitly in your
step functions for custom logic. See [Retries](/docs/retries) for details.

## Sleeping (Pausing) Workflows
Expand Down
69 changes: 47 additions & 22 deletions packages/docs/docs/retries.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,13 @@ Each retry:
- Returns cached results for completed steps
- Re-executes the failed step

## Exponential Backoff
## Retry Policy

Both steps and workflows use the same retry policy shape. A retry policy
controls exponential backoff — how long to wait between retries, how fast delays
grow, and when to stop retrying.

Failed workflows are rescheduled with increasing delays:
With the defaults, retry delays look like this:

| Attempt | Delay |
| ------- | --------- |
Expand All @@ -58,28 +62,58 @@ Failed workflows are rescheduled with increasing delays:
| 5 | ~8s |
| ... | ... |

This prevents overwhelming external services during outages.
This prevents overwhelming external services during outages. Retries continue
until canceled, until `deadlineAt` is reached (or the next retry would pass it),
or until `maximumAttempts` is exhausted.

By default, retries continue until canceled or until `deadlineAt` is reached (or
the next retry would pass it). If `maximumAttempts` is configured, the workflow
is also marked `failed` once that limit is reached.
Retry policies have the following fields:

## Retry Policy
| Field | Default | Description |
| -------------------- | ---------- | --------------------------------------------------- |
| `initialInterval` | `"1s"` | Delay before the first retry after a failed attempt |
| `backoffCoefficient` | `2` | Multiplier applied to each subsequent retry delay |
| `maximumInterval` | `"100s"` | Upper bound for retry delay |
| `maximumAttempts` | `Infinity` | Maximum attempts, including the initial one |

### Step Retry Policy

Each `step.run(...)` can define its own retry policy. If you omit `retryPolicy`,
OpenWorkflow uses the defaults shown above.

```ts
await step.run(
{
name: "call-api",
retryPolicy: {
initialInterval: "500ms",
backoffCoefficient: 2,
maximumInterval: "30s",
maximumAttempts: 5,
},
},
async () => {
// step logic
},
);
```

### Workflow Retry Policy

Retry behavior is configured per workflow using `retryPolicy` in the workflow
spec:
Workflow-level `retryPolicy` applies to non-step failures — for example, missing
workflow definitions or errors thrown outside `step.run`. If you omit
`retryPolicy` (or individual fields), OpenWorkflow uses the same defaults.

```ts
import { defineWorkflow } from "openworkflow";

export const chargeCustomer = defineWorkflow(
defineWorkflow(
{
name: "charge-customer",
retryPolicy: {
initialInterval: "1s",
initialInterval: "500ms",
backoffCoefficient: 2,
maximumInterval: "100s",
maximumAttempts: Infinity,
maximumInterval: "30s",
maximumAttempts: 5,
},
},
async ({ step }) => {
Expand All @@ -88,15 +122,6 @@ export const chargeCustomer = defineWorkflow(
);
```

`retryPolicy` is optional. Any omitted fields use defaults.

| Field | Description |
| -------------------- | ----------------------------------------------------- |
| `initialInterval` | Delay before the first retry after a failed attempt |
| `backoffCoefficient` | Multiplier applied to each subsequent retry delay |
| `maximumInterval` | Upper bound for retry delay |
| `maximumAttempts` | Maximum total attempts, including the initial attempt |

## What Triggers a Retry

Retries happen when:
Expand Down
31 changes: 28 additions & 3 deletions packages/docs/docs/steps.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ const result = await step.run({ name: "fetch-user" }, async () => {
});
```

The first argument is a config object with the step name. The second argument is
the async function to execute. The function's return value is stored and
returned on subsequent replays.
The first argument is a config object with the step name (and optional
`retryPolicy`). The second argument is the async function to execute. The
function's return value is stored and returned on subsequent replays.

## Why Steps Matter

Expand Down Expand Up @@ -138,6 +138,31 @@ Pauses the workflow until a specified duration has elapsed. See
await step.sleep("wait-one-hour", "1h");
```

## Retry Policy (Optional)

Control backoff and retry limits for an individual step:

```ts
await step.run(
{
name: "charge-card",
retryPolicy: {
initialInterval: "1s",
backoffCoefficient: 2,
maximumInterval: "30s",
maximumAttempts: 5,
},
},
async () => {
await payments.charge();
},
);
```

Any `retryPolicy` fields you omit fall back to defaults. Step retry policies
are independent from the workflow-level retry policy. See
[Retries](/docs/retries) for the full behavior and defaults.

## Error Handling

If a step throws an error, the error is recorded and the workflow fails:
Expand Down
38 changes: 38 additions & 0 deletions packages/openworkflow/backend.testsuite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,44 @@ export function testBackend(options: TestBackendOptions): void {
});
});

describe("rescheduleWorkflowRunAfterFailedStepAttempt()", () => {
test("reschedules a running workflow run with explicit availableAt", async () => {
const claimed = await createClaimedWorkflowRun(backend);
const availableAt = new Date(Date.now() + 5000);
const error = { message: "step failed" };

const updated =
await backend.rescheduleWorkflowRunAfterFailedStepAttempt({
workflowRunId: claimed.id,
workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
error,
availableAt,
});

expect(updated.status).toBe("pending");
expect(updated.availableAt?.getTime()).toBe(availableAt.getTime());
expect(updated.error).toEqual(error);
expect(updated.workerId).toBeNull();
expect(updated.startedAt).toBeNull();
expect(updated.finishedAt).toBeNull();
});

test("fails if the worker does not own the run", async () => {
const claimed = await createClaimedWorkflowRun(backend);

await expect(
backend.rescheduleWorkflowRunAfterFailedStepAttempt({
workflowRunId: claimed.id,
workerId: randomUUID(),
error: { message: "step failed" },
availableAt: new Date(Date.now() + 1000),
}),
).rejects.toThrow(
"Failed to reschedule workflow run after failed step attempt",
);
});
});

describe("createStepAttempt()", () => {
test("creates a step attempt", async () => {
const workflowRun = await createClaimedWorkflowRun(backend);
Expand Down
10 changes: 10 additions & 0 deletions packages/openworkflow/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export interface Backend {
failWorkflowRun(
params: Readonly<FailWorkflowRunParams>,
): Promise<WorkflowRun>;
rescheduleWorkflowRunAfterFailedStepAttempt(
params: Readonly<RescheduleWorkflowRunAfterFailedStepAttemptParams>,
): Promise<WorkflowRun>;
cancelWorkflowRun(
params: Readonly<CancelWorkflowRunParams>,
): Promise<WorkflowRun>;
Expand Down Expand Up @@ -108,6 +111,13 @@ export interface FailWorkflowRunParams {
retryPolicy: RetryPolicy;
}

export interface RescheduleWorkflowRunAfterFailedStepAttemptParams {
workflowRunId: string;
workerId: string;
error: SerializedError;
availableAt: Date;
}

export interface CancelWorkflowRunParams {
workflowRunId: string;
}
Expand Down
84 changes: 84 additions & 0 deletions packages/openworkflow/execution.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { OpenWorkflow } from "./client.js";
import type { StepAttempt } from "./core/step.js";
import { createStepExecutionStateFromAttempts } from "./execution.js";
import { BackendPostgres } from "./postgres.js";
import { DEFAULT_POSTGRES_URL } from "./postgres/postgres.js";
import { randomUUID } from "node:crypto";
Expand Down Expand Up @@ -540,6 +542,61 @@ describe("executeWorkflow", () => {
});
});

describe("createStepExecutionStateFromAttempts", () => {
test("builds successful cache and failed-count map from mixed history", () => {
const completed = createMockStepAttempt({
id: "completed-a",
stepName: "step-a",
status: "completed",
output: "a",
});
const failedA1 = createMockStepAttempt({
id: "failed-a-1",
stepName: "step-a",
status: "failed",
});
const failedA2 = createMockStepAttempt({
id: "failed-a-2",
stepName: "step-a",
status: "failed",
});
const failedB = createMockStepAttempt({
id: "failed-b",
stepName: "step-b",
status: "failed",
});
const running = createMockStepAttempt({
id: "running-c",
stepName: "step-c",
status: "running",
});

const state = createStepExecutionStateFromAttempts([
completed,
failedA1,
failedA2,
failedB,
running,
]);

expect(state.cache.size).toBe(1);
expect(state.cache.get("step-a")).toBe(completed);
expect(state.cache.has("step-b")).toBe(false);
expect(state.cache.has("step-c")).toBe(false);

expect(state.failedCountsByStepName.get("step-a")).toBe(2);
expect(state.failedCountsByStepName.get("step-b")).toBe(1);
expect(state.failedCountsByStepName.has("step-c")).toBe(false);
});

test("returns empty cache and counts for empty history", () => {
const state = createStepExecutionStateFromAttempts([]);

expect(state.cache.size).toBe(0);
expect(state.failedCountsByStepName.size).toBe(0);
});
});

async function createBackend(): Promise<BackendPostgres> {
return await BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
namespaceId: randomUUID(),
Expand All @@ -549,3 +606,30 @@ async function createBackend(): Promise<BackendPostgres> {
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

function createMockStepAttempt(
overrides: Partial<StepAttempt> = {},
): StepAttempt {
const status = overrides.status ?? "completed";

return {
namespaceId: "default",
id: "step-attempt-id",
workflowRunId: "workflow-run-id",
stepName: "step",
kind: "function",
status,
config: {},
context: null,
output: null,
error: null,
childWorkflowRunNamespaceId: null,
childWorkflowRunId: null,
startedAt: new Date("2026-01-01T00:00:00.000Z"),
finishedAt:
status === "running" ? null : new Date("2026-01-01T00:00:01.000Z"),
createdAt: new Date("2026-01-01T00:00:00.000Z"),
updatedAt: new Date("2026-01-01T00:00:01.000Z"),
...overrides,
};
}
Loading