Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions packages/docs/docs/postgres.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const backend = await BackendPostgres.connect(url, {
// Namespace for multi-tenant isolation (default: "default")
namespaceId: "production",

// Database schema for OpenWorkflow tables (default: "openworkflow")
schema: "openworkflow",

// Whether to run migrations on connect (default: true)
runMigrations: true,
});
Expand All @@ -68,7 +71,8 @@ const backend = await BackendPostgres.connect(url, {
## Migrations

By default, `BackendPostgres.connect()` runs database migrations automatically.
This creates the `openworkflow` schema and required tables.
This creates the configured schema (default: `openworkflow`) and required
tables.

To disable automatic migrations:

Expand All @@ -82,10 +86,11 @@ When disabled, ensure you run migrations separately before starting workers.

## Schema

OpenWorkflow creates tables in the `openworkflow` schema:
OpenWorkflow creates tables in the configured schema (default:
`openworkflow`):

- `openworkflow.workflow_runs` - Stores workflow run state
- `openworkflow.step_attempts` - Stores step execution history
- `<schema>.workflow_runs` - Stores workflow run state
- `<schema>.step_attempts` - Stores step execution history

This keeps OpenWorkflow data separate from your application tables.

Expand Down Expand Up @@ -135,4 +140,4 @@ automatically and connections are reused across workflow executions.
- The connecting user needs permissions to:
- Create schemas (for migrations)
- Create tables (for migrations)
- Read/write to the `openworkflow` schema
- Read/write to the configured schema (default: `openworkflow`)
14 changes: 8 additions & 6 deletions packages/docs/docs/production.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ For most production use cases, use PostgreSQL 14 or later:

- The connecting user needs permissions to create schemas and tables (for
migrations)
- Workflow state is stored in the `openworkflow` schema
- Workflow state is stored in the configured schema (default: `openworkflow`)

For single-server deployments, SQLite works well.

Expand All @@ -63,6 +63,7 @@ export default defineConfig({
process.env.OPENWORKFLOW_POSTGRES_URL!,
{
namespaceId: process.env.OPENWORKFLOW_NAMESPACE_ID || "production",
schema: process.env.OPENWORKFLOW_SCHEMA || "openworkflow",
},
),
dirs: ["./openworkflow"],
Expand All @@ -72,16 +73,17 @@ export default defineConfig({
});
```

In this example, we use the `OPENWORKFLOW_POSTGRES_URL` and
`OPENWORKFLOW_NAMESPACE_ID` env vars to dynamically set the Postgres URL and
namespace, but you can set this up however you'd like.
In this example, we use `OPENWORKFLOW_POSTGRES_URL`,
`OPENWORKFLOW_NAMESPACE_ID`, and `OPENWORKFLOW_SCHEMA` env vars to dynamically
set the Postgres URL, namespace, and schema, but you can set this up however
you'd like.

### 3. Migrations

Migrations run automatically when the backend connects. OpenWorkflow creates:

- `openworkflow.workflow_runs` - Stores workflow run state
- `openworkflow.step_attempts` - Stores step execution history
- `<schema>.workflow_runs` - Stores workflow run state
- `<schema>.step_attempts` - Stores step execution history

### 4. Deploy Workers

Expand Down
4 changes: 3 additions & 1 deletion packages/openworkflow/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,15 @@ describe("OpenWorkflow", () => {
const internalBackend = backend as unknown as {
pg: Postgres;
namespaceId: string;
schema: string;
};
const staleCreatedAt = new Date(
Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS - 60_000,
);
const workflowRunsTable = internalBackend.pg`${internalBackend.pg(internalBackend.schema)}.${internalBackend.pg("workflow_runs")}`;

await internalBackend.pg`
UPDATE "openworkflow"."workflow_runs"
UPDATE ${workflowRunsTable}
SET "created_at" = ${staleCreatedAt}
WHERE "namespace_id" = ${internalBackend.namespaceId}
AND "id" = ${first.workflowRun.id}
Expand Down
61 changes: 60 additions & 1 deletion packages/openworkflow/postgres/backend.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { testBackend } from "../backend.testsuite.js";
import { BackendPostgres } from "./backend.js";
import { DEFAULT_POSTGRES_URL } from "./postgres.js";
import {
DEFAULT_POSTGRES_URL,
Postgres,
dropSchema,
newPostgresMaxOne,
} from "./postgres.js";
import assert from "node:assert";
import { randomUUID } from "node:crypto";
import { describe, expect, test } from "vitest";
Expand All @@ -26,4 +31,58 @@ describe("BackendPostgres.connect errors", () => {
/Postgres backend failed to connect.*postgresql:\/\/user:pass@host:port\/db.*:/,
);
});

test("throws a clear error for invalid schema names", async () => {
await expect(
BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
schema: "invalid-schema",
}),
).rejects.toThrow(/Invalid schema name/);
});
});

describe("BackendPostgres schema option", () => {
test("stores workflow data in the configured schema", async () => {
const schema = `test_schema_${randomUUID().replaceAll("-", "_")}`;
const namespaceId = randomUUID();
const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
namespaceId,
schema,
});

try {
const workflowRun = await backend.createWorkflowRun({
workflowName: "schema-test",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
availableAt: null,
deadlineAt: null,
});

const internalBackend = backend as unknown as {
pg: Postgres;
schema: string;
};
const workflowRunsTable = internalBackend.pg`${internalBackend.pg(internalBackend.schema)}.${internalBackend.pg("workflow_runs")}`;

const [record] = await internalBackend.pg<{ id: string }[]>`
SELECT "id"
FROM ${workflowRunsTable}
WHERE "namespace_id" = ${namespaceId}
AND "id" = ${workflowRun.id}
LIMIT 1
`;

expect(record?.id).toBe(workflowRun.id);
} finally {
await backend.stop();

const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
await dropSchema(pg, schema);
await pg.end();
}
});
});
Loading