Skip to content

Commit

Permalink
first blood
Browse files Browse the repository at this point in the history
  • Loading branch information
Demetris Manikas committed May 18, 2024
1 parent 182b6f2 commit 383a962
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 92 deletions.
4 changes: 2 additions & 2 deletions src/communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { WorkflowContextImpl } from "./workflow";
import { DBOSContext, DBOSContextImpl } from "./context";
import { WorkflowContextDebug } from "./debugger/debug_workflow";

/* eslint-disable @typescript-eslint/no-explicit-any */
export type Communicator<T extends any[], R> = (ctxt: CommunicatorContext, ...args: T) => Promise<R>;
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
export type Communicator<R> = (ctxt: CommunicatorContext, ...args: any[]) => Promise<R>;

export interface CommunicatorConfig {
retriesAllowed?: boolean; // Should failures be retried? (default true)
Expand Down
56 changes: 26 additions & 30 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ export interface DBOSConfig {
}

interface WorkflowInfo {
workflow: Workflow<any, any>;
workflow: Workflow<unknown>;
config: WorkflowConfig;
}

interface TransactionInfo {
transaction: Transaction<any, any>;
transaction: Transaction<unknown>;
config: TransactionConfig;
}

interface CommunicatorInfo {
communicator: Communicator<any, any>;
communicator: Communicator<unknown>;
config: CommunicatorConfig;
}

Expand Down Expand Up @@ -258,15 +258,15 @@ export class DBOSExecutor {
this.registeredOperations.push(...registeredClassOperations);
for (const ro of registeredClassOperations) {
if (ro.workflowConfig) {
const wf = ro.registeredFunction as Workflow<any, any>;
const wf = ro.registeredFunction as Workflow<unknown>;
this.#registerWorkflow(wf, {...ro.workflowConfig});
this.logger.debug(`Registered workflow ${ro.name}`);
} else if (ro.txnConfig) {
const tx = ro.registeredFunction as Transaction<any, any>;
const tx = ro.registeredFunction as Transaction<unknown>;
this.#registerTransaction(tx, ro.txnConfig);
this.logger.debug(`Registered transaction ${ro.name}`);
} else if (ro.commConfig) {
const comm = ro.registeredFunction as Communicator<any, any>;
const comm = ro.registeredFunction as Communicator<unknown>;
this.#registerCommunicator(comm, ro.commConfig);
this.logger.debug(`Registered communicator ${ro.name}`);
}
Expand Down Expand Up @@ -347,7 +347,7 @@ export class DBOSExecutor {

/* WORKFLOW OPERATIONS */

#registerWorkflow<T extends any[], R>(wf: Workflow<T, R>, config: WorkflowConfig = {}) {
#registerWorkflow<R>(wf: Workflow<R>, config: WorkflowConfig = {}) {
if (wf.name === DBOSExecutor.tempWorkflowName || this.workflowInfoMap.has(wf.name)) {
throw new DBOSError(`Repeated workflow name: ${wf.name}`);
}
Expand All @@ -358,7 +358,7 @@ export class DBOSExecutor {
this.workflowInfoMap.set(wf.name, workflowInfo);
}

#registerTransaction<T extends any[], R>(txn: Transaction<T, R>, params: TransactionConfig = {}) {
#registerTransaction<R>(txn: Transaction<R>, params: TransactionConfig = {}) {
if (this.transactionInfoMap.has(txn.name)) {
throw new DBOSError(`Repeated Transaction name: ${txn.name}`);
}
Expand All @@ -369,7 +369,7 @@ export class DBOSExecutor {
this.transactionInfoMap.set(txn.name, txnInfo);
}

#registerCommunicator<T extends any[], R>(comm: Communicator<T, R>, params: CommunicatorConfig = {}) {
#registerCommunicator<R>(comm: Communicator<R>, params: CommunicatorConfig = {}) {
if (this.communicatorInfoMap.has(comm.name)) {
throw new DBOSError(`Repeated Commmunicator name: ${comm.name}`);
}
Expand All @@ -380,15 +380,15 @@ export class DBOSExecutor {
this.communicatorInfoMap.set(comm.name, commInfo);
}

async workflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, ...args: T): Promise<WorkflowHandle<R>> {
async workflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, ...args: unknown[]): Promise<WorkflowHandle<R>> {
if (this.debugMode) {
return this.debugWorkflow(wf, params, undefined, undefined, ...args);
}
return this.internalWorkflow(wf, params, undefined, undefined, ...args);
}

// If callerUUID and functionID are set, it means the workflow is invoked from within a workflow.
async internalWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
async internalWorkflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
const workflowUUID: string = params.workflowUUID ? params.workflowUUID : this.#generateUUID();
const presetUUID: boolean = params.workflowUUID ? true : false;

Expand Down Expand Up @@ -494,7 +494,7 @@ export class DBOSExecutor {
/**
* DEBUG MODE workflow execution, skipping all the recording
*/
async debugWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
async debugWorkflow<R>(wf: Workflow<R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
// In debug mode, we must have a specific workflow UUID.
if (!params.workflowUUID) {
throw new DBOSDebuggerError("Workflow UUID not found!");
Expand Down Expand Up @@ -535,18 +535,18 @@ export class DBOSExecutor {
return new InvokedHandle(this.systemDatabase, workflowPromise, workflowUUID, wf.name, callerUUID, callerFunctionID);
}

async transaction<T extends any[], R>(txn: Transaction<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async transaction<R>(txn: Transaction<R>, params: WorkflowParams, ...args: unknown[] ): Promise<R> {
// Create a workflow and call transaction.
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.transaction(txn, ...args);
};
return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
return (await this.workflow<R>(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
}

async external<T extends any[], R>(commFn: Communicator<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async external<R>(commFn: Communicator<R>, params: WorkflowParams, ...args: unknown[]): Promise<R> {
// Create a workflow and call external.
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.external(commFn, ...args);
};
Expand Down Expand Up @@ -620,8 +620,7 @@ export class DBOSExecutor {
const wfInfo: WorkflowInfo | undefined = this.workflowInfoMap.get(wfStatus.workflowName);

if (wfInfo) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

// Should be temporary workflows. Parse the name of the workflow.
Expand All @@ -631,16 +630,15 @@ export class DBOSExecutor {
throw new DBOSError(`This should never happen! Cannot find workflow info for a non-temporary workflow! UUID ${workflowUUID}, name ${wfName}`);
}

let temp_workflow: Workflow<any, any>;
let temp_workflow: Workflow<unknown>;
if (nameArr[1] === TempWorkflowType.transaction) {
const txnInfo: TransactionInfo | undefined = this.transactionInfoMap.get(nameArr[2]);
if (!txnInfo) {
this.logger.error(`Cannot find transaction info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.transaction(txnInfo.transaction, ...args);
};
} else if (nameArr[1] === TempWorkflowType.external) {
Expand All @@ -649,21 +647,19 @@ export class DBOSExecutor {
this.logger.error(`Cannot find communicator info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.external(commInfo.communicator, ...args);
};
} else if (nameArr[1] === TempWorkflowType.send) {
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return await ctxt.send<any>(args[0], args[1], args[2]);
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
return await ctxt.send<any>(args[0] as string, args[1], args[2] as string);
};
} else {
this.logger.error(`Unrecognized temporary workflow! UUID ${workflowUUID}, name ${wfName}`)
throw new DBOSNotRegisteredError(wfName);
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument

return this.workflow(temp_workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand Down Expand Up @@ -711,7 +707,7 @@ export class DBOSExecutor {
while (finishedCnt < totalSize) {
let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot, created_at) VALUES ";
let paramCnt = 1;
const values: any[] = [];
const values: unknown[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, wfBuffer] of localBuffer) {
for (const [funcID, recorded] of wfBuffer) {
Expand All @@ -732,7 +728,7 @@ export class DBOSExecutor {
}
}
this.logger.debug(sqlStmt);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument

await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
Expand Down
18 changes: 9 additions & 9 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
for (const op of ops) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.transaction(op.registeredFunction as Transaction<any[], any>, ...args)
?
(...args: unknown[]) => this.transaction(op.registeredFunction as Transaction<unknown>, ...args)
: op.commConfig
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.external(op.registeredFunction as Communicator<any[], any>, ...args)
?
(...args: unknown[]) => this.external(op.registeredFunction as Communicator<unknown>, ...args)
: undefined;
}
return proxy as WFInvokeFuncs<T>;
Expand Down Expand Up @@ -99,7 +99,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
* Execute a transactional function in debug mode.
* If a debug proxy is provided, it connects to a debug proxy and everything should be read-only.
*/
async transaction<T extends any[], R>(txn: Transaction<T, R>, ...args: T): Promise<R> {
async transaction<R>(txn: Transaction<R>, ...args: unknown[]): Promise<R> {
const txnInfo = this.#dbosExec.transactionInfoMap.get(txn.name);
if (txnInfo === undefined) {
throw new DBOSDebuggerError(`Transaction ${txn.name} not registered!`);
Expand Down Expand Up @@ -168,7 +168,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return check.output; // Always return the recorded result.
}

async external<T extends any[], R>(commFn: Communicator<T, R>, ..._args: T): Promise<R> {
async external<R>(commFn: Communicator<R>, ..._args: unknown[]): Promise<R> {
const commConfig = this.#dbosExec.communicatorInfoMap.get(commFn.name);
if (commConfig === undefined) {
throw new DBOSDebuggerError(`Communicator ${commFn.name} not registered!`);
Expand All @@ -187,18 +187,18 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}

// Invoke the debugWorkflow() function instead.
async startChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async startChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
const funcId = this.functionIDGetIncrement();
const childUUID: string = this.workflowUUID + "-" + funcId;
return this.#dbosExec.debugWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args);
}

async invokeChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<R> {
async invokeChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<R> {
return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult());
}

// Deprecated
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async childWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
return this.startChildWorkflow(wf, ...args);
}

Expand Down
18 changes: 7 additions & 11 deletions src/httpServer/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,16 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
if (asyncWf) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#transaction(op.registeredFunction as Transaction<any[], any>, params, ...args)
? (...args: unknown[]) => this.#transaction(op.registeredFunction as Transaction<unknown>, params, ...args)
: op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#workflow(op.registeredFunction as Workflow<any[], any>, params, ...args)
? (...args: unknown[]) => this.#workflow(op.registeredFunction as Workflow<unknown>, params, ...args)
: op.commConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#external(op.registeredFunction as Communicator<any[], any>, params, ...args)
? (...args: unknown[]) => this.#external(op.registeredFunction as Communicator<unknown>, params, ...args)
: undefined;
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.workflowConfig
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
? (...args: any[]) => this.#workflow(op.registeredFunction as Workflow<any[], any>, params, ...args).then((handle) => handle.getResult())
? (...args: unknown[]) => this.#workflow(op.registeredFunction as Workflow<unknown>, params, ...args).then((handle) => handle.getResult())
: undefined;
}
}
Expand All @@ -161,15 +157,15 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
/* PRIVATE METHODS */
/////////////////////

async #workflow<T extends any[], R>(wf: Workflow<T, R>, params: WorkflowParams, ...args: T): Promise<WorkflowHandle<R>> {
async #workflow<R>(wf: Workflow<R>, params: WorkflowParams, ...args: unknown[]): Promise<WorkflowHandle<R>> {
return this.#dbosExec.workflow(wf, params, ...args);
}

async #transaction<T extends any[], R>(txn: Transaction<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async #transaction<R>(txn: Transaction<R>, params: WorkflowParams, ...args: unknown[]): Promise<R> {
return this.#dbosExec.transaction(txn, params, ...args);
}

async #external<T extends any[], R>(commFn: Communicator<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async #external<R>(commFn: Communicator<R>, params: WorkflowParams, ...args: unknown[]): Promise<R> {
return this.#dbosExec.external(commFn, params, ...args);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/httpServer/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,11 @@ async checkPortAvailability(port: number, host: string): Promise<void> {
// - Otherwise, we return 500.
const wfParams = { parentCtx: oc, workflowUUID: headerWorkflowUUID };
if (ro.txnConfig) {
koaCtxt.body = await dbosExec.transaction(ro.registeredFunction as Transaction<unknown[], unknown>, wfParams, ...args);
koaCtxt.body = await dbosExec.transaction(ro.registeredFunction as Transaction<unknown>, wfParams, ...args);
} else if (ro.workflowConfig) {
koaCtxt.body = await (await dbosExec.workflow(ro.registeredFunction as Workflow<unknown[], unknown>, wfParams, ...args)).getResult();
koaCtxt.body = await (await dbosExec.workflow(ro.registeredFunction as Workflow<unknown>, wfParams, ...args)).getResult();
} else if (ro.commConfig) {
koaCtxt.body = await dbosExec.external(ro.registeredFunction as Communicator<unknown[], unknown>, wfParams, ...args);
koaCtxt.body = await dbosExec.external(ro.registeredFunction as Communicator<unknown>, wfParams, ...args);
} else {
// Directly invoke the handler code.
const retValue = await ro.invoke(undefined, [oc, ...args]);
Expand Down
4 changes: 2 additions & 2 deletions src/kafka/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ export class DBOSKafka {
// We can only guarantee exactly-once-per-message execution of transactions and workflows.
if (ro.txnConfig) {
// Execute the transaction
await this.dbosExec.transaction(ro.registeredFunction as Transaction<unknown[], unknown>, wfParams, ...args);
await this.dbosExec.transaction(ro.registeredFunction as Transaction<unknown>, wfParams, ...args);
} else if (ro.workflowConfig) {
// Safely start the workflow
await this.dbosExec.workflow(ro.registeredFunction as Workflow<unknown[], unknown>, wfParams, ...args);
await this.dbosExec.workflow(ro.registeredFunction as Workflow<unknown>, wfParams, ...args);
}
},
})
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class DetachableLoop {
// We currently only support scheduled workflows
if (this.scheduledMethod.workflowConfig) {
// Execute the workflow
await this.dbosExec.workflow(this.scheduledMethod.registeredFunction as Workflow<ScheduledArgs, unknown>, wfParams, ...args);
await this.dbosExec.workflow(this.scheduledMethod.registeredFunction as Workflow<unknown>, wfParams, ...args);
}
else {
this.dbosExec.logger.error(`Function ${this.scheduledMethod.name} is @scheduled but not a workflow`);
Expand Down
Loading

0 comments on commit 383a962

Please sign in to comment.