Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WFQ Tests & Utilities #641

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { DBOSEventReceiver, DBOSExecutorContext} from ".";

import { get } from "lodash";
import { wfQueueRunner, WorkflowQueue } from "./wfqueue";
import { debugTriggerPoint, DEBUG_TRIGGER_WORKFLOW_ENQUEUE } from "./debugpoint";

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface DBOSNull { }
Expand Down Expand Up @@ -676,6 +677,7 @@ export class DBOSExecutor implements DBOSExecutorContext {
) {
// TODO: Make this transactional (and with the queue step below)
args = await this.systemDatabase.initWorkflowStatus(internalStatus, args);
await debugTriggerPoint(DEBUG_TRIGGER_WORKFLOW_ENQUEUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this will add overhead to non-debug production workloads? If so, can we disable this in prod?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be extremely surprised if the overhead is measurable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, where it is against several database updates, I agree. However, disabling in prod is not a bad idea.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could get it down to a boolean check pretty easily. But we can do it later.

}

const runWorkflow = async () => {
Expand Down
10 changes: 8 additions & 2 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,14 @@ export class DBOSRuntime {
*/
async destroy() {
await this.scheduler?.destroyScheduler();
wfQueueRunner.stop();
await this.wfQueueRunner;
try {
wfQueueRunner.stop();
await this.wfQueueRunner;
}
catch (err) {
const e = err as Error;
this.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`);
}
for (const evtRcvr of this.dbosExec?.eventReceivers || []) {
await evtRcvr.destroy();
}
Expand Down
76 changes: 76 additions & 0 deletions src/debugpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { sleepms } from "./utils";

export function getCallSiteInfo(): { fileName: string; lineNumber: number } {
const err = new Error();
const stack = err.stack?.split("\n");

if (stack && stack.length > 2) {
// The third line usually contains the callsite information.
// Different environments (Node, browser) format the stack trace differently.
// Adjust the regex to your environment as needed.
const match = stack[2].match(/at\s+(.*):(\d+):(\d+)/);
if (match) {
const fileName = match[1];
const lineNumber = parseInt(match[2], 10);
return { fileName, lineNumber };
}
}
return { fileName: "unknown", lineNumber: -1 };
}

export interface DebugPoint
{
name: string;
fileName: string;
lineNumber: number;
hitCount: number;
}

export class DebugAction
{
sleepms?: number; // Sleep at point
awaitEvent?: Promise<void>; // Wait at point
callback?: () => void;
asyncCallback?: () => Promise<void>;
}

export const pointTriggers: Map<string, DebugAction> = new Map()
export const pointLocations: Map<string, DebugPoint> = new Map()

export async function debugTriggerPoint(name: string): Promise<void> {
const cpi = getCallSiteInfo()
if (!pointLocations.has(name)) {
pointLocations.set(name, {name, ...cpi, hitCount: 0})
}
const p = pointLocations.get(name)!
if (p.fileName !== cpi.fileName || cpi.lineNumber !== p.lineNumber) {
throw new Error(`Duplicate debug point name: ${name}`);
}

if (pointTriggers.has(name)) {
const pt = pointTriggers.get(name)!;
if (pt.sleepms) {
await sleepms(pt.sleepms);
}
if (pt.asyncCallback) {
await pt.asyncCallback();
}
if (pt.callback) {
pt.callback();
}
if (pt.awaitEvent) {
await pt.awaitEvent;
}
}
}

export function setDebugTrigger(name: string, action: DebugAction) {
pointTriggers.set(name, action);
}

export function clearDebugTriggers() {
pointTriggers.clear();
}

export const DEBUG_TRIGGER_WORKFLOW_QUEUE_START = "DEBUG_TRIGGER_WORKFLOW_QUEUE_START";
export const DEBUG_TRIGGER_WORKFLOW_ENQUEUE = "DEBUG_TRIGGER_WORKFLOW_ENQUEUE";
6 changes: 5 additions & 1 deletion src/foundationdb/fdb_system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { deserializeError, serializeError } from "serialize-error";
import { DBOSExecutor, DBOSNull, dbosNull } from "../dbos-executor";
import { WorkflowStatusInternal, SystemDatabase } from "../system_database";
import { GetWorkflowsInput, GetWorkflowsOutput, StatusString, WorkflowStatus } from "../workflow";
import { GetWorkflowQueueInput, GetWorkflowQueueOutput, GetWorkflowsInput, GetWorkflowsOutput, StatusString, WorkflowStatus } from "../workflow";
import * as fdb from "foundationdb";
import { DBOSWorkflowConflictUUIDError } from "../error";
import { NativeValue } from "foundationdb/dist/lib/native";
Expand Down Expand Up @@ -412,6 +412,10 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
throw new Error("Method not implemented.");
}

getWorkflowQueue(_input: GetWorkflowQueueInput): Promise<GetWorkflowQueueOutput> {
throw new Error("Method not implemented.");
}

enqueueWorkflow(_workflowId: string, _queue: WorkflowQueue): Promise<void> {
throw new Error("Method not implemented.");
}
Expand Down
30 changes: 28 additions & 2 deletions src/system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { deserializeError, serializeError } from "serialize-error";
import { DBOSExecutor, dbosNull, DBOSNull } from "./dbos-executor";
import { DatabaseError, Pool, PoolClient, Notification, PoolConfig, Client } from "pg";
import { DBOSWorkflowConflictUUIDError, DBOSNonExistentWorkflowError, DBOSDeadLetterQueueError } from "./error";
import { GetWorkflowsInput, GetWorkflowsOutput, StatusString, WorkflowStatus } from "./workflow";
import { GetWorkflowQueueInput, GetWorkflowQueueOutput, GetWorkflowsInput, GetWorkflowsOutput, StatusString, WorkflowStatus } from "./workflow";
import { notifications, operation_outputs, workflow_status, workflow_events, workflow_inputs, scheduler_state, workflow_queue } from "../schemas/system_db_schema";
import { sleepms, findPackageRoot, DBOSJSON } from "./utils";
import { HTTPRequest } from "./context";
Expand Down Expand Up @@ -61,7 +61,8 @@ export interface SystemDatabase {
setLastScheduledTime(wfn: string, invtime: number): Promise<number | null>; // We are now sure we invoked another

// Workflow management
getWorkflows(input: GetWorkflowsInput): Promise<GetWorkflowsOutput>
getWorkflows(input: GetWorkflowsInput): Promise<GetWorkflowsOutput>;
getWorkflowQueue(input: GetWorkflowQueueInput): Promise<GetWorkflowQueueOutput>;
}

// For internal use, not serialized status.
Expand Down Expand Up @@ -853,6 +854,31 @@ export class PostgresSystemDatabase implements SystemDatabase {
};
}

async getWorkflowQueue(input: GetWorkflowQueueInput): Promise<GetWorkflowQueueOutput> {
let query = this.knexDB<workflow_queue>(`${DBOSExecutor.systemDBSchemaName}.workflow_queue`).orderBy('created_at_epoch_ms', 'desc');
if (input.queueName) {
query = query.where('queue_name', input.queueName);
}
if (input.startTime) {
query = query.where('created_at_epoch_ms', '>=', new Date(input.startTime).getTime());
}
if (input.endTime) {
query = query.where('created_at_at_epoch_ms', '<=', new Date(input.endTime).getTime());
}
if (input.limit) {
query = query.limit(input.limit);
}
const rows = await query.select();
const workflows = rows.map((row) => { return {
workflowID: row.workflow_uuid,
queueName: row.queue_name,
createdAt: row.created_at_epoch_ms,
startedAt: row.started_at_epoch_ms,
completedAt: row.completed_at_epoch_ms,
}});
return { workflows };
}

async enqueueWorkflow(workflowId: string, queue: WorkflowQueue): Promise<void> {
const _res = await this.pool.query<scheduler_state>(`
INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_queue (workflow_uuid, queue_name)
Expand Down
10 changes: 8 additions & 2 deletions src/testing/testing_runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,14 @@ export class TestingRuntimeImpl implements TestingRuntime {
async destroy() {
// Only release once.
if (this.#isInitialized) {
wfQueueRunner.stop();
await this.#wfQueueRunner;
try {
wfQueueRunner.stop();
await this.#wfQueueRunner;
}
catch (err) {
const e = err as Error;
this.#server?.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`);
}
await this.#scheduler?.destroyScheduler();
for (const evtRcvr of this.#server?.dbosExec?.eventReceivers || []) {
await evtRcvr.destroy();
Expand Down
6 changes: 6 additions & 0 deletions src/wfqueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DBOSExecutor } from "./dbos-executor";
import { DEBUG_TRIGGER_WORKFLOW_QUEUE_START, debugTriggerPoint } from "./debugpoint";
import { DBOSInitializationError } from "./error";

/**
Expand Down Expand Up @@ -61,6 +62,11 @@ class WFQueueRunner
// Check queues
for (const [_qn, q] of this.wfQueuesByName) {
const wfids = await exec.systemDatabase.findAndMarkStartableWorkflows(q);

if (wfids.length > 0) {
await debugTriggerPoint(DEBUG_TRIGGER_WORKFLOW_QUEUE_START);
}

for (const wfid of wfids) {
const _wfh = await exec.executeWorkflowUUID(wfid);
}
Expand Down
17 changes: 17 additions & 0 deletions src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,23 @@ export interface GetWorkflowsOutput {
workflowUUIDs: string[];
}

export interface GetWorkflowQueueInput {
queueName?: string; // The name of the workflow function
startTime?: string; // Timestamp in ISO 8601 format
endTime?: string; // Timestamp in ISO 8601 format
limit?: number; // Return up to this many workflows IDs. IDs are ordered by workflow creation time.
}

export interface GetWorkflowQueueOutput {
workflows: {
workflowID: string;
queueName: string;
createdAt: number;
startedAt?: number;
completedAt?: number;
}[];
}

export interface PgTransactionId {
txid: string;
}
Expand Down
94 changes: 84 additions & 10 deletions tests/wfqueue.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { Step, StepContext, StatusString, TestingRuntime, Workflow, WorkflowContext, WorkflowHandle } from "../src";
import { DBOSConfig, DBOSExecutor } from "../src/dbos-executor";
import { StatusString, Step, StepContext, TestingRuntime, Workflow, WorkflowContext, WorkflowHandle } from "../src";
import { DBOSConfig } from "../src/dbos-executor";
import { createInternalTestRuntime, TestingRuntimeImpl } from "../src/testing/testing_runtime";
import { generateDBOSTestConfig, setUpDBOSTestDb } from "./helpers";
import { WorkflowQueue } from "../src";
import { v4 as uuidv4 } from "uuid";
import { sleepms } from "../src/utils";
import { PostgresSystemDatabase } from "../src/system_database";
import { workflow_queue } from "../schemas/system_db_schema";

import {
clearDebugTriggers,
DEBUG_TRIGGER_WORKFLOW_QUEUE_START,
// DEBUG_TRIGGER_WORKFLOW_ENQUEUE,
setDebugTrigger
} from "../src/debugpoint";


const queue = new WorkflowQueue("testQ");
Expand All @@ -22,11 +27,8 @@ async function queueEntriesAreCleanedUp(dbos: TestingRuntimeImpl) {
let maxTries = 10;
let success = false;
while (maxTries > 0) {
const r = await (dbos.getDBOSExec().systemDatabase as PostgresSystemDatabase)
.knexDB<workflow_queue>(`${DBOSExecutor.systemDBSchemaName}.workflow_queue`)
.count()
.first();
if (`${r!.count}` === '0') {
const r = await dbos.getDBOSExec().systemDatabase.getWorkflowQueue({});
if (r.workflows.length === 0) {
success = true;
break;
}
Expand Down Expand Up @@ -190,6 +192,73 @@ describe("queued-wf-tests-simple", () => {
// Verify all queue entries eventually get cleaned up.
expect(await queueEntriesAreCleanedUp(testRuntime as TestingRuntimeImpl)).toBe(true);
}, 10000);

test("test_one_at_a_time_with_crash", async() => {
let wfqRes: () => void = () => { };
const wfqPromise = new Promise<void>((resolve, _rj) => { wfqRes = resolve; });

setDebugTrigger(DEBUG_TRIGGER_WORKFLOW_QUEUE_START, {
callback: () => {
wfqRes();
throw new Error("Interrupt scheduler here");
}
});

const wfh1 = await testRuntime.startWorkflow(TestWFs, undefined, undefined, serialqueue).testWorkflowSimple('a','b');
await wfqPromise;
await testRuntime.destroy();
clearDebugTriggers();
testRuntime = await createInternalTestRuntime(undefined, config);
const wfh2 = await testRuntime.startWorkflow(TestWFs, undefined, undefined, serialqueue).testWorkflowSimple('c','d');

const wfh1b = testRuntime.retrieveWorkflow(wfh1.getWorkflowUUID());
const wfh2b = testRuntime.retrieveWorkflow(wfh2.getWorkflowUUID());
expect (await wfh1b.getResult()).toBe('ab');
expect (await wfh2b.getResult()).toBe('cd');
}, 10000);

/*
// Current result: WF1 does get created in system DB, but never starts running.
// WF2 does run.
test("test_one_at_a_time_with_crash2", async() => {
let wfqRes: () => void = () => { };
const _wfqPromise = new Promise<void>((resolve, _rj) => { wfqRes = resolve; });

setDebugTrigger(DEBUG_TRIGGER_WORKFLOW_ENQUEUE, {
callback: () => {
wfqRes();
throw new Error("Interrupt start workflow here");
}
});

const wfid1 = 'thisworkflowgetshit';
console.log("Start WF1");
try {
const _wfh1 = await testRuntime.startWorkflow(TestWFs, wfid1, undefined, serialqueue).testWorkflowSimple('a','b');
}
catch(e) {
// Expected
const err = e as Error;
expect(err.message.includes('Interrupt')).toBeTruthy();
console.log("Expected error caught");
}
console.log("Destroy runtime");
await testRuntime.destroy();
clearDebugTriggers();
console.log("New runtime");
testRuntime = await createInternalTestRuntime(undefined, config);
console.log("Start WF2");
const wfh2 = await testRuntime.startWorkflow(TestWFs, undefined, undefined, serialqueue).testWorkflowSimple('c','d');

const wfh1b = testRuntime.retrieveWorkflow(wfid1);
const wfh2b = testRuntime.retrieveWorkflow(wfh2.getWorkflowUUID());
console.log("Wait");
expect (await wfh2b.getResult()).toBe('cd');
// Current behavior (undesired) WF1 got created but will stay ENQUEUED and not get run.
expect((await wfh1b.getStatus())?.status).toBe('SUCCESS');
expect (await wfh1b.getResult()).toBe('ab');
}, 10000);
*/
});

class TestWFs
Expand All @@ -211,6 +280,12 @@ class TestWFs
return Promise.resolve(var1 + var2);
}

@Workflow()
static async testWorkflowSimple(_ctx: WorkflowContext, var1: string, var2: string) {
++TestWFs.wfCounter;
return Promise.resolve(var1 + var2);
}

@Step()
static async testStep(_ctx: StepContext, str: string) {
++TestWFs.stepCounter;
Expand Down Expand Up @@ -299,4 +374,3 @@ async function runOneAtATime(testRuntime: TestingRuntime, queue: WorkflowQueue)
expect(TestWFs2.wfCounter).toBe(1);
expect(await queueEntriesAreCleanedUp(testRuntime as TestingRuntimeImpl)).toBe(true);
}