Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions packages/docs/docs/postgres.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const backend = await BackendPostgres.connect(url, {
// Namespace for multi-tenant isolation (default: "default")
namespaceId: "production",

// Database schema for OpenWorkflow tables (default: "openworkflow")
schema: "openworkflow",

// Whether to run migrations on connect (default: true)
runMigrations: true,
});
Expand All @@ -68,7 +71,7 @@ const backend = await BackendPostgres.connect(url, {
## Migrations

By default, `BackendPostgres.connect()` runs database migrations automatically.
This creates the `openworkflow` schema and required tables.
This creates the configured schema (default: `openworkflow`) and required tables.

To disable automatic migrations:

Expand All @@ -82,10 +85,10 @@ When disabled, ensure you run migrations separately before starting workers.

## Schema

OpenWorkflow creates tables in the `openworkflow` schema:
OpenWorkflow creates tables in the configured schema (default: `openworkflow`):

- `openworkflow.workflow_runs` - Stores workflow run state
- `openworkflow.step_attempts` - Stores step execution history
- `<schema>.workflow_runs` - Stores workflow run state
- `<schema>.step_attempts` - Stores step execution history

This keeps OpenWorkflow data separate from your application tables.

Expand Down Expand Up @@ -135,4 +138,4 @@ automatically and connections are reused across workflow executions.
- The connecting user needs permissions to:
- Create schemas (for migrations)
- Create tables (for migrations)
- Read/write to the `openworkflow` schema
- Read/write to the configured schema (default: `openworkflow`)
14 changes: 8 additions & 6 deletions packages/docs/docs/production.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ For most production use cases, use PostgreSQL 14 or later:

- The connecting user needs permissions to create schemas and tables (for
migrations)
- Workflow state is stored in the `openworkflow` schema
- Workflow state is stored in the configured schema (default: `openworkflow`)

For single-server deployments, SQLite works well.

Expand All @@ -63,6 +63,7 @@ export default defineConfig({
process.env.OPENWORKFLOW_POSTGRES_URL!,
{
namespaceId: process.env.OPENWORKFLOW_NAMESPACE_ID || "production",
schema: process.env.OPENWORKFLOW_SCHEMA || "openworkflow",
},
),
dirs: ["./openworkflow"],
Expand All @@ -72,16 +73,17 @@ export default defineConfig({
});
```

In this example, we use the `OPENWORKFLOW_POSTGRES_URL` and
`OPENWORKFLOW_NAMESPACE_ID` env vars to dynamically set the Postgres URL and
namespace, but you can set this up however you'd like.
In this example, we use `OPENWORKFLOW_POSTGRES_URL`,
`OPENWORKFLOW_NAMESPACE_ID`, and `OPENWORKFLOW_SCHEMA` env vars to dynamically
set the Postgres URL, namespace, and schema, but you can set this up however
you'd like.

### 3. Migrations

Migrations run automatically when the backend connects. OpenWorkflow creates:

- `openworkflow.workflow_runs` - Stores workflow run state
- `openworkflow.step_attempts` - Stores step execution history
- `<schema>.workflow_runs` - Stores workflow run state
- `<schema>.step_attempts` - Stores step execution history

### 4. Deploy Workers

Expand Down
39 changes: 23 additions & 16 deletions packages/openworkflow/client.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js";
import { OpenWorkflow } from "./client.js";
import { BackendPostgres } from "./postgres.js";
import { DEFAULT_POSTGRES_URL, Postgres } from "./postgres/postgres.js";
import {
DEFAULT_POSTGRES_URL,
DEFAULT_SCHEMA,
newPostgresMaxOne,
} from "./postgres/postgres.js";
import {
DEFAULT_WORKFLOW_RETRY_POLICY,
defineWorkflowSpec,
Expand Down Expand Up @@ -372,20 +376,21 @@ describe("OpenWorkflow", () => {

const first = await workflow.run({ value: 1 }, { idempotencyKey: key });

const internalBackend = backend as unknown as {
pg: Postgres;
namespaceId: string;
};
const staleCreatedAt = new Date(
Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS - 60_000,
);

await internalBackend.pg`
UPDATE "openworkflow"."workflow_runs"
SET "created_at" = ${staleCreatedAt}
WHERE "namespace_id" = ${internalBackend.namespaceId}
AND "id" = ${first.workflowRun.id}
`;
const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
try {
const workflowRunsTable = pg`${pg(DEFAULT_SCHEMA)}.${pg("workflow_runs")}`;
await pg`
UPDATE ${workflowRunsTable}
SET "created_at" = ${staleCreatedAt}
WHERE "namespace_id" = ${first.workflowRun.namespaceId}
AND "id" = ${first.workflowRun.id}
`;
} finally {
await pg.end();
}

const second = await workflow.run({ value: 2 }, { idempotencyKey: key });
expect(second.workflowRun.id).not.toBe(first.workflowRun.id);
Expand Down Expand Up @@ -531,10 +536,12 @@ describe("OpenWorkflow", () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });

const workflow = client.defineWorkflow(
{ name: "define-wrap-test" },
({ input }) => ({ doubled: (input as { n: number }).n * 2 }),
);
const workflow = client.defineWorkflow<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but I'll keep

{ n: number },
{ doubled: number }
>({ name: "define-wrap-test" }, ({ input }) => ({
doubled: input.n * 2,
}));

const handle = await workflow.run({ n: 21 });
const worker = client.newWorker();
Expand Down
69 changes: 68 additions & 1 deletion packages/openworkflow/postgres/backend.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { testBackend } from "../backend.testsuite.js";
import { BackendPostgres } from "./backend.js";
import { DEFAULT_POSTGRES_URL } from "./postgres.js";
import {
DEFAULT_POSTGRES_URL,
dropSchema,
newPostgresMaxOne,
} from "./postgres.js";
import assert from "node:assert";
import { randomUUID } from "node:crypto";
import { describe, expect, test } from "vitest";
Expand All @@ -26,4 +30,67 @@ describe("BackendPostgres.connect errors", () => {
/Postgres backend failed to connect.*postgresql:\/\/user:pass@host:port\/db.*:/,
);
});

test("throws a clear error for invalid schema names", async () => {
await expect(
BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
schema: "invalid-schema",
}),
).rejects.toThrow(/Invalid schema name/);
});

test("throws for schema names longer than 63 bytes", async () => {
await expect(
BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
schema: "a".repeat(64),
}),
).rejects.toThrow(/at most 63 bytes/i);
});
});

describe("BackendPostgres schema option", () => {
test("stores workflow data in the configured schema", async () => {
const schema = `test_schema_${randomUUID().replaceAll("-", "_")}`;
const namespaceId = randomUUID();
const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
namespaceId,
schema,
});

try {
const workflowRun = await backend.createWorkflowRun({
workflowName: "schema-test",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
availableAt: null,
deadlineAt: null,
});

const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
try {
const workflowRunsTable = pg`${pg(schema)}.${pg("workflow_runs")}`;

const [record] = await pg<{ id: string }[]>`
SELECT "id"
FROM ${workflowRunsTable}
WHERE "namespace_id" = ${namespaceId}
AND "id" = ${workflowRun.id}
LIMIT 1
`;

expect(record?.id).toBe(workflowRun.id);
} finally {
await pg.end();
}
} finally {
await backend.stop();

const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
await dropSchema(pg, schema);
await pg.end();
}
});
});
Loading
Loading