Skip to content

Commit

Permalink
minimize changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Demetris Manikas committed May 22, 2024
1 parent b529a21 commit b02d6a6
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
4 changes: 1 addition & 3 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ export class DBOSExecutor {
const wfInfo: WorkflowInfo | undefined = this.workflowInfoMap.get(wfStatus.workflowName);

if (wfInfo) {
return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

// Should be temporary workflows. Parse the name of the workflow.
Expand Down Expand Up @@ -658,7 +658,6 @@ export class DBOSExecutor {
this.logger.error(`Unrecognized temporary workflow! UUID ${workflowUUID}, name ${wfName}`)
throw new DBOSNotRegisteredError(wfName);
}

return this.workflow(temp_workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand Down Expand Up @@ -727,7 +726,6 @@ export class DBOSExecutor {
}
}
this.logger.debug(sqlStmt);

await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
Expand Down
4 changes: 2 additions & 2 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
const check: NonNullable<unknown> | null | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<NonNullable<unknown> | null>(this.workflowUUID, functionID);
const check: T | null | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<T | null>(this.workflowUUID, functionID);
if (check === dbosNull) {
throw new DBOSDebuggerError(`Cannot find recorded recv. Shouldn't happen in debug mode!`);
}
Expand All @@ -238,7 +238,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
const check: NonNullable<unknown> | null | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<NonNullable<unknown> | null>(this.workflowUUID, functionID);
const check: T | null | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<T | null>(this.workflowUUID, functionID);
if (check === dbosNull) {
throw new DBOSDebuggerError(`Cannot find recorded getEvent. Shouldn't happen in debug mode!`);
}
Expand Down
14 changes: 7 additions & 7 deletions src/system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,14 @@ export class PostgresSystemDatabase implements SystemDatabase {
AND notifications.created_at_epoch_ms = oldest_entry.created_at_epoch_ms
RETURNING notifications.*;`,
[workflowUUID, topic])).rows;
let message: NonNullable<unknown>| null = null;
let message: T | null = null;
if (finalRecvRows.length > 0) {
message = JSON.parse(finalRecvRows[0].message) as NonNullable<unknown>;
message = JSON.parse(finalRecvRows[0].message) as T
}
await this.recordNotificationOutput(client, workflowUUID, functionID, message);
await client.query(`COMMIT`);
client.release();
return message as T;
return message;
}

async setEvent(workflowUUID: string, functionID: number, key: string, message: NonNullable<unknown>): Promise<void> {
Expand Down Expand Up @@ -511,22 +511,22 @@ export class PostgresSystemDatabase implements SystemDatabase {
clearTimeout(timer!);

// Return the value if it's in the DB, otherwise return null.
let value: NonNullable<unknown> | null = null;
let value: T | null = null;
if (initRecvRows.length > 0) {
value = JSON.parse(initRecvRows[0].value) as NonNullable<unknown>;
value = JSON.parse(initRecvRows[0].value) as T;
} else {
// Read it again from the database.
const finalRecvRows = (await this.pool.query<workflow_events>(`SELECT value FROM ${DBOSExecutor.systemDBSchemaName}.workflow_events WHERE workflow_uuid=$1 AND key=$2;`, [workflowUUID, key])).rows;
if (finalRecvRows.length > 0) {
value = JSON.parse(finalRecvRows[0].value) as NonNullable<unknown>;
value = JSON.parse(finalRecvRows[0].value) as T;
}
}

// Record the output if it is inside a workflow.
if (callerUUID !== undefined && functionID !== undefined) {
await this.recordOperationOutput(callerUUID, functionID, value);
}
return value as T;
return value;
}

async getWorkflowStatus(workflowUUID: string, callerUUID?: string, functionID?: number): Promise<WorkflowStatus | null> {
Expand Down

0 comments on commit b02d6a6

Please sign in to comment.