Skip to content

Commit 61b9b92

Browse files
committed
feat: Introduce worflow level concurrency
1 parent 40ee9cb commit 61b9b92

File tree

20 files changed

+1523
-36
lines changed

20 files changed

+1523
-36
lines changed

ARCHITECTURE.md

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,15 +297,52 @@ When the worker encounters this, it executes all steps within the `Promise.all`
297297
concurrently. It waits for all of them to complete before proceeding. Each step
298298
attempt is persisted individually as a `step_attempt`.
299299

300-
### 5.2. Workflow Concurrency
300+
### 5.2. Worker Concurrency
301301

302302
Workers are configured with a concurrency limit (e.g., 10). A worker will
303303
maintain up to 10 in-flight workflow runs simultaneously. It polls for new work
304304
only when it has available capacity. The Backend's atomic `dequeue` operation
305305
(`FOR UPDATE SKIP LOCKED`) ensures that multiple workers can poll the same table
306306
without race conditions or processing the same run twice.
307307

308-
### 5.3. Handling Crashes During Parallel Execution
308+
### 5.3. Workflow-Run Concurrency
309+
310+
In addition to worker-slot concurrency, workflows can define a per-run
311+
concurrency policy in the workflow spec:
312+
313+
```ts
314+
defineWorkflow(
315+
{
316+
name: "process-order",
317+
concurrency: {
318+
key: ({ input }) => `tenant:${input.tenantId}`,
319+
limit: ({ input }) => input.maxConcurrentOrders,
320+
},
321+
},
322+
async ({ step }) => {
323+
// ...
324+
},
325+
);
326+
```
327+
328+
`key` and `limit` can each be either static values (`string`/`number`) or
329+
functions of the validated workflow input. They are resolved once when the run
330+
is created and persisted on the `workflow_run`.
331+
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
332+
333+
During claim/dequeue, a run is claimable only when the number of active leased
334+
`running` runs in the same bucket is below the run's `limit`. The bucket scope
335+
is:
336+
337+
- `namespace_id`
338+
- `workflow_name`
339+
- `version` (version-aware buckets)
340+
- `concurrency_key`
341+
342+
`pending`, `sleeping`, and expired-lease `running` runs do not consume
343+
concurrency slots.
344+
345+
### 5.4. Handling Crashes During Parallel Execution
309346

310347
The `availableAt` heartbeat mechanism provides robust recovery. If a worker
311348
crashes while executing parallel steps, its heartbeat stops. The `availableAt`

packages/docs/docs/workers.mdx

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ When a worker picks up a workflow:
4949
5. **Execute**: New steps are executed and their results are stored
5050
6. **Complete**: The workflow status is updated to `completed` or `failed`
5151

52-
## Concurrency
52+
## Worker Concurrency
5353

5454
Workers can process multiple workflow runs simultaneously. Configure concurrency
5555
in your `openworkflow.config.ts`:
@@ -88,6 +88,38 @@ bunx @openworkflow/cli worker start --concurrency 10
8888
capacity.
8989
</Note>
9090

91+
## Workflow Concurrency
92+
93+
Workflow specs can also define concurrency buckets that are enforced at claim
94+
time:
95+
96+
```ts
97+
defineWorkflow(
98+
{
99+
name: "process-order",
100+
concurrency: {
101+
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
102+
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
103+
},
104+
},
105+
async ({ step }) => {
106+
// ...
107+
},
108+
);
109+
```
110+
111+
Workers will only claim a run when the bucket has capacity. Bucket scope is:
112+
113+
- namespace
114+
- workflow name
115+
- workflow version
116+
- resolved concurrency key
117+
118+
Only active leased `running` runs consume workflow-concurrency slots.
119+
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
120+
Sleeping runs are non-consuming until they are claimed again as actively leased
121+
`running` runs.
122+
91123
## Heartbeats and Crash Recovery
92124

93125
Workers maintain their claim on workflow runs through a heartbeat mechanism:

packages/docs/docs/workflows.mdx

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,43 @@ defineWorkflow(
189189
Any `retryPolicy` fields you omit fall back to defaults. See
190190
[Retries](/docs/retries) for the full behavior and defaults.
191191

192+
### Concurrency (Optional)
193+
194+
Control how many active leased `running` runs are allowed for a workflow bucket:
195+
196+
```ts
197+
defineWorkflow(
198+
{
199+
name: "process-order",
200+
concurrency: {
201+
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
202+
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
203+
},
204+
},
205+
async ({ input, step }) => {
206+
// ...
207+
},
208+
);
209+
```
210+
211+
- `key` can be a string or a function `({ input }) => string`
212+
- `limit` can be a number or a function `({ input }) => number`
213+
- key must resolve to a non-empty string
214+
- limit must resolve to a positive integer
215+
- resolved keys are stored verbatim; only empty/all-whitespace keys are rejected
216+
217+
When concurrency is configured, runs in the same bucket are constrained by:
218+
219+
- namespace
220+
- workflow name
221+
- workflow version
222+
- resolved `key`
223+
224+
Only actively leased `running` runs consume slots. `pending`, `sleeping`, and
225+
expired-lease runs do not.
226+
Sleeping runs become slot-consuming only after they are claimed again as
227+
actively leased `running` runs.
228+
192229
### Idempotency Key (Optional)
193230

194231
You can prevent duplicate run creation by providing an idempotency key, though

packages/openworkflow/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,38 @@ For more details, check out our [docs](https://openworkflow.dev/docs).
6767
-**Long pauses** - Sleep for seconds or months
6868
-**Scheduled runs** - Start workflows at a specific time
6969
-**Parallel execution** - Run steps concurrently
70+
-**Workflow concurrency** - Limit active runs by key (static or input-based)
7071
-**Idempotency keys** - Deduplicate repeated run requests (24h window)
7172
-**No extra servers** - Uses your existing database
7273
-**Dashboard included** - Monitor and debug workflows
7374
-**Production ready** - PostgreSQL and SQLite support
7475

76+
## Workflow Concurrency
77+
78+
You can limit active leased `running` runs per workflow bucket:
79+
80+
```ts
81+
const workflow = defineWorkflow(
82+
{
83+
name: "process-order",
84+
concurrency: {
85+
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
86+
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
87+
},
88+
},
89+
async ({ step }) => {
90+
// ...
91+
},
92+
);
93+
```
94+
95+
`key` must resolve to a non-empty string and `limit` must resolve to a positive
96+
integer. Invalid values fail run creation.
97+
Keys are stored verbatim (for example, `" foo "` and `"foo"` are different
98+
concurrency keys); only empty or all-whitespace keys are rejected.
99+
Sleeping runs do not consume workflow-concurrency slots until they are claimed
100+
again as actively leased `running` runs.
101+
75102
## Documentation
76103

77104
- [Documentation](https://openworkflow.dev/docs)

0 commit comments

Comments
 (0)