Have you ever wanted to run a series of functions reliably and durably, where each can have its own retry behavior, the overall workflow will survive server restarts, and you can have long-running workflows spanning months that can be canceled? Do you want to observe the status of a workflow reactively, as well as the results written from each step?
And do you want to do this with code, instead of a DSL?
Welcome to the world of Convex workflows.
- Run workflows asynchronously, and observe their status reactively via subscriptions, from one or many users simultaneously, even on page refreshes.
- Workflows can run for months, and survive server restarts. You can specify delays or custom times to run each step.
- Run steps in parallel, or in sequence.
- Output from previous steps is available to pass to subsequent steps.
- Run queries, mutations, and actions.
- Specify retry behavior on a per-step basis, along with a default policy.
- Specify how many workflows can run in parallel to manage load.
- Cancel long-running workflows.
- Clean up workflows after they're done.
import { WorkflowManager } from "@convex-dev/workflow";
import { components } from "./_generated/api";
export const workflow = new WorkflowManager(components.workflow);
export const exampleWorkflow = workflow.define({
args: {
storageId: v.id("_storage"),
},
handler: async (step, args) => {
const transcription = await step.runAction(
internal.index.computeTranscription,
{ storageId: args.storageId },
);
const embedding = await step.runAction(
internal.index.computeEmbedding,
{ transcription },
// Run this a month after the transcription is computed.
{ runAfter: 30 * 24 * 60 * 60 * 1000 },
);
console.log(embedding);
},
});
This component adds durably executed workflows to Convex. Combine Convex queries, mutations, and actions into long-lived workflows, and the system will always fully execute a workflow to completion.
Open a GitHub issue with any feedback or bugs you find.
First, add @convex-dev/workflow
to your Convex project:
npm install @convex-dev/workflow
Then, install the component within your convex/convex.config.ts
file:
// convex/convex.config.ts
import workflow from "@convex-dev/workflow/convex.config";
import { defineApp } from "convex/server";
const app = defineApp();
app.use(workflow);
export default app;
Finally, create a workflow manager within your convex/
folder, and point it
to the installed component:
// convex/index.ts
import { WorkflowManager } from "@convex-dev/workflow";
import { components } from "./_generated/api";
export const workflow = new WorkflowManager(components.workflow);
The first step is to define a workflow using workflow.define()
. This function
is designed to feel like a Convex action but with a few restrictions:
- The workflow runs in the background, so it can't return a value.
- The workflow must be deterministic, so it should implement most of its logic
by calling out to other Convex functions. We will be lifting some of these
restrictions over time by implementing
Math.random()
,Date.now()
, andfetch
within our workflow environment.
export const exampleWorkflow = workflow.define({
args: { name: v.string() },
handler: async (step, args): Promise<string> => {
const queryResult = await step.runQuery(
internal.example.exampleQuery,
args,
);
const actionResult = await step.runAction(
internal.example.exampleAction,
{ queryResult }, // pass in results from previous steps!
);
return actionResult;
},
});
export const exampleQuery = internalQuery({
args: { name: v.string() },
handler: async (ctx, args) => {
return `The query says... Hi ${args.name}!`;
},
});
export const exampleAction = internalAction({
args: { queryResult: v.string() },
handler: async (ctx, args) => {
return args.queryResult + " The action says... Hi back!";
},
});
Once you've defined a workflow, you can start it from a mutation or action
using workflow.start()
.
export const kickoffWorkflow = mutation({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name: "James" },
);
},
});
You can handle the workflow's result with onComplete
. This is useful for
cleaning up any resources used by the workflow.
Note: when you return things from a workflow, you'll need to specify the return
type of your handler
to break type cycles due to using internal.*
functions
in the body, which then inform the type of the workflow, which is included in
the internal.*
type.
You can also specify a returns
validator to do runtime validation on the
return value. If it fails, your onComplete
handler will be called with an
error instead of success. You can also do validation in the onComplete
handler
to have more control over handling that situation.
import { vWorkflowId } from "@convex-dev/workflow";
import { vResultValidator } from "@convex-dev/workpool";
export const foo = mutation({
handler: async (ctx) => {
const name = "James";
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name },
{
onComplete: internal.example.handleOnComplete,
context: name, // can be anything
},
);
},
});
export const handleOnComplete = mutation({
args: {
workflowId: vWorkflowId,
result: vResultValidator,
context: v.any(), // used to pass through data from the start site.
}
handler: async (ctx, args) => {
const name = (args.context as { name: string }).name;
if (args.result.kind === "success") {
const text = args.result.returnValue;
console.log(`${name} result: ${text}`);
} else if (args.result.kind === "error") {
console.error("Workflow failed", args.result.error);
} else if (args.result.kind === "canceled") {
console.log("Workflow canceled", args.context);
}
},
});
You can run steps in parallel by calling step.runAction()
multiple times in
a Promise.all()
call.
export const exampleWorkflow = workflow.define({
args: { name: v.string() },
handler: async (step, args) => {
const [result1, result2] = await Promise.all([
step.runAction(internal.example.myAction, args),
step.runAction(internal.example.myAction, args),
]);
},
});
Note: The workflow will not proceed until all steps fired off at once have completed.
Sometimes actions fail due to transient errors, whether it was an unreliable third-party API or a server restart. You can have the workflow automatically retry actions using best practices (exponential backoff & jitter). By default there are no retries, and the workflow will fail.
You can specify default retry behavior for all workflows on the WorkflowManager, or override it on a per-workflow basis.
You can also specify a custom retry behavior per-step, to opt-out of retries for actions that may want at-most-once semantics.
Workpool options:
If you specify any of these, it will override the
DEFAULT_RETRY_BEHAVIOR
.
defaultRetryBehavior
: The default retry behavior for all workflows.maxAttempts
: The maximum number of attempts to retry an action.initialBackoffMs
: The initial backoff time in milliseconds.base
: The base multiplier for the backoff. Default is 2.
retryActionsByDefault
: Whether to retry actions, by default is false.- If you specify a retry behavior at the step level, it will always retry.
At the step level, you can also specify true
or false
to disable or use
the default policy.
const workflow = new WorkflowManager(components.workflow, {
defaultRetryBehavior: {
maxAttempts: 3,
initialBackoffMs: 100,
base: 2,
},
// If specified, this sets the defaults, overridden per-workflow or per-step.
workpoolOptions: { ... }
});
export const exampleWorkflow = workflow.define({
args: { name: v.string() },
handler: async (step, args) => {
// Uses default retry behavior & retryActionsByDefault
await step.runAction(internal.example.myAction, args);
// Retries will be attempted with the default behavior
await step.runAction(internal.example.myAction, args, { retry: true });
// No retries will be attempted
await step.runAction(internal.example.myAction, args, { retry: false });
// Custom retry behavior will be used
await step.runAction(internal.example.myAction, args, {
retry: { maxAttempts: 2, initialBackoffMs: 100, base: 2 },
});
},
// If specified, this will override the workflow manager's default
workpoolOptions: { ... },
});
You can specify how many workflows can run in parallel by setting the maxParallelism
workpool option. It has a reasonable default.
const workflow = new WorkflowManager(components.workflow, {
workpoolOptions: {
// You must only set this to one value per components.xyz!
// You can set different values if you "use" multiple different components
// in convex.config.ts.
maxParallelism: 10,
},
});
The workflow.start()
method returns a WorkflowId
, which can then be used for querying
a workflow's status.
export const kickoffWorkflow = action({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name: "James" },
);
await new Promise((resolve) => setTimeout(resolve, 1000));
const status = await workflow.status(ctx, workflowId);
console.log("Workflow status after 1s", status);
},
});
You can cancel a workflow with workflow.cancel()
, halting the workflow's execution immmediately.
In-progress calls to step.runAction()
, however, will finish executing.
export const kickoffWorkflow = action({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name: "James" },
);
await new Promise((resolve) => setTimeout(resolve, 1000));
// Cancel the workflow after 1 second.
await workflow.cancel(ctx, workflowId);
},
});
After a workflow has completed, you can clean up its storage with workflow.cleanup()
.
Completed workflows are not automatically cleaned up by the system.
export const kickoffWorkflow = action({
handler: async (ctx) => {
const workflowId = await workflow.start(
ctx,
internal.example.exampleWorkflow,
{ name: "James" },
);
try {
while (true) {
const status = await workflow.status(ctx, workflowId);
if (status.type === "inProgress") {
await new Promise((resolve) => setTimeout(resolve, 1000));
continue;
}
console.log("Workflow completed with status:", status);
break;
}
} finally {
await workflow.cleanup(ctx, workflowId);
}
},
});
You can specify a custom name for a step by passing a name
option to the step.
export const exampleWorkflow = workflow.define({
args: { name: v.string() },
handler: async (step, args) => {
await step.runAction(internal.example.myAction, args, { name: "FOO" });
},
});
This allows the events emitted to your logs to be more descriptive.
By default it uses the file/folder:function
name.
Here are a few limitations to keep in mind:
- Steps can only take in and return a total of 1 MiB of data within a single workflow execution. If you run into journal size limits, you can work around this by storing results in the DB from your step functions and passing IDs around within the the workflow.
console.log()
isn't currently captured, so you may see duplicate log lines within your Convex dashboard if you log within the workflow definition.- We currently do not collect backtraces from within function calls from workflows.
- If you need to use side effects like
fetch
,Math.random()
, orDate.now()
, you'll need to do that in a step, not in the workflow definition. - If the implementation of the workflow meaningfully changes (steps added, removed, or reordered) then it will fail with a determinism violation. The implementation should stay stable for the lifetime of active workflows. See this issue for ideas on how to make this better.