diff --git a/src/communicator.ts b/src/communicator.ts index 3961616ed..d594ab5b5 100644 --- a/src/communicator.ts +++ b/src/communicator.ts @@ -4,8 +4,8 @@ import { WorkflowContextImpl } from "./workflow"; import { DBOSContext, DBOSContextImpl } from "./context"; import { WorkflowContextDebug } from "./debugger/debug_workflow"; -/* eslint-disable @typescript-eslint/no-explicit-any */ -export type Communicator = (ctxt: CommunicatorContext, ...args: T) => Promise; +/* eslint-disable-next-line @typescript-eslint/no-explicit-any */ +export type Communicator = (ctxt: CommunicatorContext, ...args: any[]) => Promise; export interface CommunicatorConfig { retriesAllowed?: boolean; // Should failures be retried? (default true) diff --git a/src/dbos-executor.ts b/src/dbos-executor.ts index 8f844ac0f..d1159fce2 100644 --- a/src/dbos-executor.ts +++ b/src/dbos-executor.ts @@ -1,5 +1,4 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ -import { + import { DBOSError, DBOSInitializationError, DBOSWorkflowConflictUUIDError, @@ -68,17 +67,17 @@ export interface DBOSConfig { } interface WorkflowInfo { - workflow: Workflow; + workflow: Workflow; config: WorkflowConfig; } interface TransactionInfo { - transaction: Transaction; + transaction: Transaction; config: TransactionConfig; } interface CommunicatorInfo { - communicator: Communicator; + communicator: Communicator; config: CommunicatorConfig; } @@ -258,15 +257,15 @@ export class DBOSExecutor { this.registeredOperations.push(...registeredClassOperations); for (const ro of registeredClassOperations) { if (ro.workflowConfig) { - const wf = ro.registeredFunction as Workflow; + const wf = ro.registeredFunction as Workflow; this.#registerWorkflow(wf, {...ro.workflowConfig}); this.logger.debug(`Registered workflow ${ro.name}`); } else if (ro.txnConfig) { - const tx = ro.registeredFunction as Transaction; + const tx = ro.registeredFunction as Transaction; this.#registerTransaction(tx, ro.txnConfig); this.logger.debug(`Registered transaction ${ro.name}`); } else if (ro.commConfig) { - const comm = ro.registeredFunction as Communicator; + const comm = ro.registeredFunction as Communicator; this.#registerCommunicator(comm, ro.commConfig); this.logger.debug(`Registered communicator ${ro.name}`); } @@ -347,7 +346,7 @@ export class DBOSExecutor { /* WORKFLOW OPERATIONS */ - #registerWorkflow(wf: Workflow, config: WorkflowConfig = {}) { + #registerWorkflow(wf: Workflow, config: WorkflowConfig = {}) { if (wf.name === DBOSExecutor.tempWorkflowName || this.workflowInfoMap.has(wf.name)) { throw new DBOSError(`Repeated workflow name: ${wf.name}`); } @@ -358,7 +357,7 @@ export class DBOSExecutor { this.workflowInfoMap.set(wf.name, workflowInfo); } - #registerTransaction(txn: Transaction, params: TransactionConfig = {}) { + #registerTransaction(txn: Transaction, params: TransactionConfig = {}) { if (this.transactionInfoMap.has(txn.name)) { throw new DBOSError(`Repeated Transaction name: ${txn.name}`); } @@ -369,7 +368,7 @@ export class DBOSExecutor { this.transactionInfoMap.set(txn.name, txnInfo); } - #registerCommunicator(comm: Communicator, params: CommunicatorConfig = {}) { + #registerCommunicator(comm: Communicator, params: CommunicatorConfig = {}) { if (this.communicatorInfoMap.has(comm.name)) { throw new DBOSError(`Repeated Commmunicator name: ${comm.name}`); } @@ -380,7 +379,7 @@ export class DBOSExecutor { this.communicatorInfoMap.set(comm.name, commInfo); } - async workflow(wf: Workflow, params: InternalWorkflowParams, ...args: T): Promise> { + async workflow(wf: Workflow, params: InternalWorkflowParams, ...args: unknown[]): Promise> { if (this.debugMode) { return this.debugWorkflow(wf, params, undefined, undefined, ...args); } @@ -388,7 +387,7 @@ export class DBOSExecutor { } // If callerUUID and functionID are set, it means the workflow is invoked from within a workflow. - async internalWorkflow(wf: Workflow, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise> { + async internalWorkflow(wf: Workflow, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise> { const workflowUUID: string = params.workflowUUID ? params.workflowUUID : this.#generateUUID(); const presetUUID: boolean = params.workflowUUID ? true : false; @@ -494,7 +493,7 @@ export class DBOSExecutor { /** * DEBUG MODE workflow execution, skipping all the recording */ - async debugWorkflow(wf: Workflow, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise> { + async debugWorkflow(wf: Workflow, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise> { // In debug mode, we must have a specific workflow UUID. if (!params.workflowUUID) { throw new DBOSDebuggerError("Workflow UUID not found!"); @@ -535,28 +534,28 @@ export class DBOSExecutor { return new InvokedHandle(this.systemDatabase, workflowPromise, workflowUUID, wf.name, callerUUID, callerFunctionID); } - async transaction(txn: Transaction, params: WorkflowParams, ...args: T): Promise { + async transaction(txn: Transaction, params: WorkflowParams, ...args: unknown[] ): Promise { // Create a workflow and call transaction. - const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => { + const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => { const ctxtImpl = ctxt as WorkflowContextImpl; return await ctxtImpl.transaction(txn, ...args); }; - return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult(); + return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult(); } - async external(commFn: Communicator, params: WorkflowParams, ...args: T): Promise { + async external(commFn: Communicator, params: WorkflowParams, ...args: unknown[]): Promise { // Create a workflow and call external. - const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => { + const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => { const ctxtImpl = ctxt as WorkflowContextImpl; return await ctxtImpl.external(commFn, ...args); }; return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.external, tempWfName: commFn.name }, ...args)).getResult(); } - async send>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise { + async send(destinationUUID: string, message: NonNullable, topic?: string, idempotencyKey?: string): Promise { // Create a workflow and call send. - const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: T, topic?: string) => { - return await ctxt.send(destinationUUID, message, topic); + const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: NonNullable, topic?: string) => { + return await ctxt.send(destinationUUID, message, topic); }; const workflowUUID = idempotencyKey ? destinationUUID + idempotencyKey : undefined; return (await this.workflow(temp_workflow, { workflowUUID: workflowUUID, tempWfType: TempWorkflowType.send }, destinationUUID, message, topic)).getResult(); @@ -565,7 +564,7 @@ export class DBOSExecutor { /** * Wait for a workflow to emit an event, then return its value. */ - async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { + async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { return this.systemDatabase.getEvent(workflowUUID, key, timeoutSeconds); } @@ -585,7 +584,7 @@ export class DBOSExecutor { * A recovery process that by default runs during executor init time. * It runs to completion all pending workflows that were executing when the previous executor failed. */ - async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise[]> { + async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise[]> { const pendingWorkflows: string[] = []; for (const execID of executorIDs) { if (execID == "local" && process.env.DBOS__VMID) { @@ -597,7 +596,7 @@ export class DBOSExecutor { pendingWorkflows.push(...wIDs); } - const handlerArray: WorkflowHandle[] = []; + const handlerArray: WorkflowHandle[] = []; for (const workflowUUID of pendingWorkflows) { try { handlerArray.push(await this.executeWorkflowUUID(workflowUUID)); @@ -620,7 +619,6 @@ export class DBOSExecutor { const wfInfo: WorkflowInfo | undefined = this.workflowInfoMap.get(wfStatus.workflowName); if (wfInfo) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs); } @@ -631,16 +629,15 @@ export class DBOSExecutor { throw new DBOSError(`This should never happen! Cannot find workflow info for a non-temporary workflow! UUID ${workflowUUID}, name ${wfName}`); } - let temp_workflow: Workflow; + let temp_workflow: Workflow; if (nameArr[1] === TempWorkflowType.transaction) { const txnInfo: TransactionInfo | undefined = this.transactionInfoMap.get(nameArr[2]); if (!txnInfo) { this.logger.error(`Cannot find transaction info for UUID ${workflowUUID}, name ${nameArr[2]}`); throw new DBOSNotRegisteredError(nameArr[2]); } - temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => { + temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => { const ctxtImpl = ctxt as WorkflowContextImpl; - // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument return await ctxtImpl.transaction(txnInfo.transaction, ...args); }; } else if (nameArr[1] === TempWorkflowType.external) { @@ -649,21 +646,18 @@ export class DBOSExecutor { this.logger.error(`Cannot find communicator info for UUID ${workflowUUID}, name ${nameArr[2]}`); throw new DBOSNotRegisteredError(nameArr[2]); } - temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => { + temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => { const ctxtImpl = ctxt as WorkflowContextImpl; - // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument return await ctxtImpl.external(commInfo.communicator, ...args); }; } else if (nameArr[1] === TempWorkflowType.send) { - temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - return await ctxt.send(args[0], args[1], args[2]); + temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => { + return await ctxt.send(args[0] as string, args[1] as string, args[2] as string); }; } else { this.logger.error(`Unrecognized temporary workflow! UUID ${workflowUUID}, name ${wfName}`) throw new DBOSNotRegisteredError(wfName); } - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument return this.workflow(temp_workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs); } @@ -711,7 +705,7 @@ export class DBOSExecutor { while (finishedCnt < totalSize) { let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot, created_at) VALUES "; let paramCnt = 1; - const values: any[] = []; + const values: unknown[] = []; const batchUUIDs: string[] = []; for (const [workflowUUID, wfBuffer] of localBuffer) { for (const [funcID, recorded] of wfBuffer) { @@ -732,7 +726,6 @@ export class DBOSExecutor { } } this.logger.debug(sqlStmt); - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument await this.userDatabase.query(sqlStmt, ...values); // Clean up after each batch succeeds diff --git a/src/dbos-runtime/runtime.ts b/src/dbos-runtime/runtime.ts index 74364315c..90c827609 100644 --- a/src/dbos-runtime/runtime.ts +++ b/src/dbos-runtime/runtime.ts @@ -10,8 +10,7 @@ import { DBOSKafka } from '../kafka/kafka'; import { DBOSScheduler } from '../scheduler/scheduler'; interface ModuleExports { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - [key: string]: any; + [key: string]: unknown; } export interface DBOSRuntimeConfig { @@ -74,7 +73,7 @@ export class DBOSRuntime { let exports: ModuleExports; if (fs.existsSync(operations)) { const operationsURL = pathToFileURL(operations).href; - exports = (await import(operationsURL)) as Promise; + exports = (await import(operationsURL)) as ModuleExports; } else { throw new DBOSFailLoadOperationsError(`Failed to load operations from the entrypoint ${entrypoint}`); } diff --git a/src/debugger/debug_workflow.ts b/src/debugger/debug_workflow.ts index dd0688c19..16f3b6c20 100644 --- a/src/debugger/debug_workflow.ts +++ b/src/debugger/debug_workflow.ts @@ -1,5 +1,4 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ -import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor"; + import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor"; import { transaction_outputs } from "../../schemas/user_db_schema"; import { Transaction, TransactionContextImpl } from "../transaction"; import { Communicator } from "../communicator"; @@ -51,16 +50,15 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon invoke(object: T): WFInvokeFuncs { const ops = getRegisteredOperations(object); + const proxy: Record = {}; - const proxy: any = {}; for (const op of ops) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access proxy[op.name] = op.txnConfig - ? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - (...args: any[]) => this.transaction(op.registeredFunction as Transaction, ...args) + ? + (...args: unknown[]) => this.transaction(op.registeredFunction as Transaction, ...args) : op.commConfig - ? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - (...args: any[]) => this.external(op.registeredFunction as Communicator, ...args) + ? + (...args: unknown[]) => this.external(op.registeredFunction as Communicator, ...args) : undefined; } return proxy as WFInvokeFuncs; @@ -99,7 +97,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon * Execute a transactional function in debug mode. * If a debug proxy is provided, it connects to a debug proxy and everything should be read-only. */ - async transaction(txn: Transaction, ...args: T): Promise { + async transaction(txn: Transaction, ...args: unknown[]): Promise { const txnInfo = this.#dbosExec.transactionInfoMap.get(txn.name); if (txnInfo === undefined) { throw new DBOSDebuggerError(`Transaction ${txn.name} not registered!`); @@ -168,7 +166,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon return check.output; // Always return the recorded result. } - async external(commFn: Communicator, ..._args: T): Promise { + async external(commFn: Communicator, ..._args: unknown[]): Promise { const commConfig = this.#dbosExec.communicatorInfoMap.get(commFn.name); if (commConfig === undefined) { throw new DBOSDebuggerError(`Communicator ${commFn.name} not registered!`); @@ -187,22 +185,22 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon } // Invoke the debugWorkflow() function instead. - async startChildWorkflow(wf: Workflow, ...args: T): Promise> { + async startChildWorkflow(wf: Workflow, ...args: unknown[]): Promise> { const funcId = this.functionIDGetIncrement(); const childUUID: string = this.workflowUUID + "-" + funcId; return this.#dbosExec.debugWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args); } - async invokeChildWorkflow(wf: Workflow, ...args: T): Promise { + async invokeChildWorkflow(wf: Workflow, ...args: unknown[]): Promise { return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult()); } // Deprecated - async childWorkflow(wf: Workflow, ...args: T): Promise> { + async childWorkflow(wf: Workflow, ...args: unknown[]): Promise> { return this.startChildWorkflow(wf, ...args); } - async send>(_destinationUUID: string, _message: T, _topic?: string | undefined): Promise { + async send(_destinationUUID: string, _message: NonNullable, _topic?: string | undefined): Promise { const functionID: number = this.functionIDGetIncrement(); // Original result must exist during replay. @@ -214,7 +212,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon return; } - async recv>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise { + async recv>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise { const functionID: number = this.functionIDGetIncrement(); // Original result must exist during replay. @@ -226,7 +224,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon return check as T | null; } - async setEvent>(_key: string, _value: T): Promise { + async setEvent(_key: string, _value: NonNullable): Promise { const functionID: number = this.functionIDGetIncrement(); // Original result must exist during replay. const check: undefined | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput(this.workflowUUID, functionID); @@ -236,7 +234,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon this.logger.debug("Use recorded setEvent output."); } - async getEvent>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise { + async getEvent>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise { const functionID: number = this.functionIDGetIncrement(); // Original result must exist during replay. diff --git a/src/httpServer/handler.ts b/src/httpServer/handler.ts index b94edb547..dbefff24b 100644 --- a/src/httpServer/handler.ts +++ b/src/httpServer/handler.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ import { MethodRegistration, MethodParameter, registerAndWrapFunction, getOrCreateMethodArgsRegistration, MethodRegistrationBase, getRegisteredOperations } from "../decorators"; import { DBOSExecutor, OperationType } from "../dbos-executor"; import { DBOSContext, DBOSContextImpl } from "../context"; @@ -13,6 +12,7 @@ import { Communicator } from "../communicator"; import { APITypes, ArgSources } from "./handlerTypes"; // local type declarations for workflow functions +/* eslint-disable @typescript-eslint/no-explicit-any */ type WFFunc = (ctxt: WorkflowContext, ...args: any[]) => Promise; export type InvokeFuncs = WFInvokeFuncs & AsyncHandlerWfFuncs; @@ -30,8 +30,8 @@ export interface HandlerContext extends DBOSContext { invokeWorkflow(targetClass: T, workflowUUID?: string): SyncHandlerWfFuncs; startWorkflow(targetClass: T, workflowUUID?: string): AsyncHandlerWfFuncs; retrieveWorkflow(workflowUUID: string): WorkflowHandle; - send>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise; - getEvent>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise; + send(destinationUUID: string, message: NonNullable, topic?: string, idempotencyKey?: string): Promise; + getEvent>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise; } export const RequestIDHeader = "x-request-id"; @@ -101,11 +101,11 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex /* PUBLIC INTERFACE */ /////////////////////// - async send>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise { + async send(destinationUUID: string, message: NonNullable, topic?: string, idempotencyKey?: string): Promise { return this.#dbosExec.send(destinationUUID, message, topic, idempotencyKey); } - async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { + async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { return this.#dbosExec.getEvent(workflowUUID, key, timeoutSeconds); } @@ -119,26 +119,21 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex */ mainInvoke(object: T, workflowUUID: string | undefined, asyncWf: boolean): InvokeFuncs { const ops = getRegisteredOperations(object); - const proxy: any = {}; + const proxy: Record = {}; const params = { workflowUUID: workflowUUID, parentCtx: this }; + for (const op of ops) { if (asyncWf) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access proxy[op.name] = op.txnConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => this.#transaction(op.registeredFunction as Transaction, params, ...args) + ? (...args: unknown[]) => this.#transaction(op.registeredFunction as Transaction, params, ...args) : op.workflowConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => this.#workflow(op.registeredFunction as Workflow, params, ...args) + ? (...args: unknown[]) => this.#workflow(op.registeredFunction as Workflow, params, ...args) : op.commConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => this.#external(op.registeredFunction as Communicator, params, ...args) + ? (...args: unknown[]) => this.#external(op.registeredFunction as Communicator, params, ...args) : undefined; } else { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access proxy[op.name] = op.workflowConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => this.#workflow(op.registeredFunction as Workflow, params, ...args).then((handle) => handle.getResult()) + ? (...args: unknown[]) => this.#workflow(op.registeredFunction as Workflow, params, ...args).then((handle) => handle.getResult()) : undefined; } } @@ -161,15 +156,15 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex /* PRIVATE METHODS */ ///////////////////// - async #workflow(wf: Workflow, params: WorkflowParams, ...args: T): Promise> { + async #workflow(wf: Workflow, params: WorkflowParams, ...args: unknown[]): Promise> { return this.#dbosExec.workflow(wf, params, ...args); } - async #transaction(txn: Transaction, params: WorkflowParams, ...args: T): Promise { + async #transaction(txn: Transaction, params: WorkflowParams, ...args: unknown[]): Promise { return this.#dbosExec.transaction(txn, params, ...args); } - async #external(commFn: Communicator, params: WorkflowParams, ...args: T): Promise { + async #external(commFn: Communicator, params: WorkflowParams, ...args: unknown[]): Promise { return this.#dbosExec.external(commFn, params, ...args); } } diff --git a/src/httpServer/server.ts b/src/httpServer/server.ts index 1d8906c64..319b009d7 100644 --- a/src/httpServer/server.ts +++ b/src/httpServer/server.ts @@ -285,11 +285,11 @@ async checkPortAvailability(port: number, host: string): Promise { // - Otherwise, we return 500. const wfParams = { parentCtx: oc, workflowUUID: headerWorkflowUUID }; if (ro.txnConfig) { - koaCtxt.body = await dbosExec.transaction(ro.registeredFunction as Transaction, wfParams, ...args); + koaCtxt.body = await dbosExec.transaction(ro.registeredFunction as Transaction, wfParams, ...args); } else if (ro.workflowConfig) { - koaCtxt.body = await (await dbosExec.workflow(ro.registeredFunction as Workflow, wfParams, ...args)).getResult(); + koaCtxt.body = await (await dbosExec.workflow(ro.registeredFunction as Workflow, wfParams, ...args)).getResult(); } else if (ro.commConfig) { - koaCtxt.body = await dbosExec.external(ro.registeredFunction as Communicator, wfParams, ...args); + koaCtxt.body = await dbosExec.external(ro.registeredFunction as Communicator, wfParams, ...args); } else { // Directly invoke the handler code. const retValue = await ro.invoke(undefined, [oc, ...args]); diff --git a/src/kafka/kafka.ts b/src/kafka/kafka.ts index 87e9db537..035990755 100644 --- a/src/kafka/kafka.ts +++ b/src/kafka/kafka.ts @@ -124,10 +124,10 @@ export class DBOSKafka { // We can only guarantee exactly-once-per-message execution of transactions and workflows. if (ro.txnConfig) { // Execute the transaction - await this.dbosExec.transaction(ro.registeredFunction as Transaction, wfParams, ...args); + await this.dbosExec.transaction(ro.registeredFunction as Transaction, wfParams, ...args); } else if (ro.workflowConfig) { // Safely start the workflow - await this.dbosExec.workflow(ro.registeredFunction as Workflow, wfParams, ...args); + await this.dbosExec.workflow(ro.registeredFunction as Workflow, wfParams, ...args); } }, }) diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index 6fd72a86e..3c8d16eed 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -168,7 +168,7 @@ class DetachableLoop { // We currently only support scheduled workflows if (this.scheduledMethod.workflowConfig) { // Execute the workflow - await this.dbosExec.workflow(this.scheduledMethod.registeredFunction as Workflow, wfParams, ...args); + await this.dbosExec.workflow(this.scheduledMethod.registeredFunction as Workflow, wfParams, ...args); } else { this.dbosExec.logger.error(`Function ${this.scheduledMethod.name} is @scheduled but not a workflow`); diff --git a/src/system_database.ts b/src/system_database.ts index 01e3d0311..9a81207c3 100644 --- a/src/system_database.ts +++ b/src/system_database.ts @@ -1,5 +1,3 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ - import { deserializeError, serializeError } from "serialize-error"; import { DBOSExecutor, dbosNull, DBOSNull } from "./dbos-executor"; import { DatabaseError, Pool, PoolClient, Notification, PoolConfig, Client } from "pg"; @@ -17,14 +15,14 @@ export interface SystemDatabase { destroy(): Promise; checkWorkflowOutput(workflowUUID: string): Promise; - initWorkflowStatus(bufferedStatus: WorkflowStatusInternal, args: T): Promise; + initWorkflowStatus(bufferedStatus: WorkflowStatusInternal, args: T): Promise; bufferWorkflowOutput(workflowUUID: string, status: WorkflowStatusInternal): void; flushWorkflowSystemBuffers(): Promise; recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise; getPendingWorkflows(executorID: string): Promise>; - bufferWorkflowInputs(workflowUUID: string, args: T) : void; - getWorkflowInputs(workflowUUID: string): Promise; + bufferWorkflowInputs(workflowUUID: string, args: T) : void; + getWorkflowInputs(workflowUUID: string): Promise; checkOperationOutput(workflowUUID: string, functionID: number): Promise; recordOperationOutput(workflowUUID: string, functionID: number, output: R): Promise; @@ -35,11 +33,11 @@ export interface SystemDatabase { sleepms(workflowUUID: string, functionID: number, duration: number): Promise; - send>(workflowUUID: string, functionID: number, destinationUUID: string, message: T, topic?: string): Promise; - recv>(workflowUUID: string, functionID: number, topic?: string, timeoutSeconds?: number): Promise; + send(workflowUUID: string, functionID: number, destinationUUID: string, message: NonNullable, topic?: string): Promise; + recv>(workflowUUID: string, functionID: number, topic?: string, timeoutSeconds?: number): Promise; - setEvent>(workflowUUID: string, functionID: number, key: string, value: T): Promise; - getEvent>(workflowUUID: string, key: string, timeoutSeconds: number, callerUUID?: string, functionID?: number): Promise; + setEvent(workflowUUID: string, functionID: number, key: string, value: NonNullable): Promise; + getEvent>(workflowUUID: string, key: string, timeoutSeconds: number, callerUUID?: string, functionID?: number): Promise; // Scheduler queries // These two maintain exactly once - make sure we kick off the workflow at least once, and wf unique ID does the rest @@ -90,7 +88,7 @@ export class PostgresSystemDatabase implements SystemDatabase { readonly workflowEventsMap: Record void> = {}; readonly workflowStatusBuffer: Map = new Map(); - readonly workflowInputsBuffer: Map = new Map(); + readonly workflowInputsBuffer: Map = new Map(); readonly flushBatchSize = 100; static readonly connectionTimeoutMillis = 10000; // 10 second timeout @@ -134,7 +132,7 @@ export class PostgresSystemDatabase implements SystemDatabase { } } - async initWorkflowStatus(initStatus: WorkflowStatusInternal, args: T): Promise { + async initWorkflowStatus(initStatus: WorkflowStatusInternal, args: T): Promise { await this.pool.query( `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id, created_at) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (workflow_uuid) DO NOTHING`, [initStatus.workflowUUID, initStatus.status, initStatus.name, initStatus.authenticatedUser, initStatus.assumedRole, JSON.stringify(initStatus.authenticatedRoles), JSON.stringify(initStatus.request), null, initStatus.executorID, initStatus.createdAt] @@ -168,7 +166,7 @@ export class PostgresSystemDatabase implements SystemDatabase { while (finishedCnt < totalSize) { let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id, created_at, updated_at) VALUES `; let paramCnt = 1; - const values: any[] = []; + const values: unknown[] = []; const batchUUIDs: string[] = []; for (const [workflowUUID, status] of localBuffer) { if (paramCnt > 1) { @@ -221,7 +219,7 @@ export class PostgresSystemDatabase implements SystemDatabase { return rows.map(i => i.workflow_uuid); } - bufferWorkflowInputs(workflowUUID: string, args: T): void { + bufferWorkflowInputs(workflowUUID: string, args: T): void { this.workflowInputsBuffer.set(workflowUUID, args); } @@ -234,7 +232,7 @@ export class PostgresSystemDatabase implements SystemDatabase { while (finishedCnt < totalSize) { let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_inputs (workflow_uuid, inputs) VALUES `; let paramCnt = 1; - const values: any[] = []; + const values: unknown[] = []; const batchUUIDs: string[] = []; for (const [workflowUUID, args] of localBuffer) { finishedCnt++; @@ -277,7 +275,7 @@ export class PostgresSystemDatabase implements SystemDatabase { return; } - async getWorkflowInputs(workflowUUID: string): Promise { + async getWorkflowInputs(workflowUUID: string): Promise { const { rows } = await this.pool.query( `SELECT inputs FROM ${DBOSExecutor.systemDBSchemaName}.workflow_inputs WHERE workflow_uuid=$1`, [workflowUUID] @@ -364,7 +362,7 @@ export class PostgresSystemDatabase implements SystemDatabase { readonly nullTopic = "__null__topic__"; - async send>(workflowUUID: string, functionID: number, destinationUUID: string, message: T, topic?: string): Promise { + async send(workflowUUID: string, functionID: number, destinationUUID: string, message: NonNullable, topic?: string): Promise { topic = topic ?? this.nullTopic; const client: PoolClient = await this.pool.connect(); @@ -398,7 +396,7 @@ export class PostgresSystemDatabase implements SystemDatabase { client.release(); } - async recv>(workflowUUID: string, functionID: number, topic?: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { + async recv>(workflowUUID: string, functionID: number, topic?: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { topic = topic ?? this.nullTopic; // First, check for previous executions. const checkRows = (await this.pool.query(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID])).rows; @@ -450,7 +448,7 @@ export class PostgresSystemDatabase implements SystemDatabase { [workflowUUID, topic])).rows; let message: T | null = null; if (finalRecvRows.length > 0) { - message = JSON.parse(finalRecvRows[0].message) as T; + message = JSON.parse(finalRecvRows[0].message) as T } await this.recordNotificationOutput(client, workflowUUID, functionID, message); await client.query(`COMMIT`); @@ -458,7 +456,7 @@ export class PostgresSystemDatabase implements SystemDatabase { return message; } - async setEvent>(workflowUUID: string, functionID: number, key: string, message: T): Promise { + async setEvent(workflowUUID: string, functionID: number, key: string, message: NonNullable): Promise { const client: PoolClient = await this.pool.connect(); await client.query("BEGIN ISOLATION LEVEL READ COMMITTED"); @@ -482,7 +480,7 @@ export class PostgresSystemDatabase implements SystemDatabase { client.release(); } - async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number, callerUUID?: string, functionID?: number): Promise { + async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number, callerUUID?: string, functionID?: number): Promise { // Check if the operation has been done before for OAOO (only do this inside a workflow). if (callerUUID !== undefined && functionID !== undefined) { const { rows } = await this.pool.query(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [callerUUID, functionID]); diff --git a/src/testing/testing_runtime.ts b/src/testing/testing_runtime.ts index 96377e93b..a7a6d2dae 100644 --- a/src/testing/testing_runtime.ts +++ b/src/testing/testing_runtime.ts @@ -1,5 +1,4 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ -import { IncomingMessage } from "http"; + import { IncomingMessage } from "http"; import { Communicator } from "../communicator"; import { HTTPRequest, DBOSContextImpl } from "../context"; import { getRegisteredOperations } from "../decorators"; @@ -53,8 +52,8 @@ export interface TestingRuntime { invokeWorkflow(targetClass: T, workflowUUID?: string, params?: WorkflowInvokeParams): SyncHandlerWfFuncs; startWorkflow(targetClass: T, workflowUUID?: string, params?: WorkflowInvokeParams): AsyncHandlerWfFuncs; retrieveWorkflow(workflowUUID: string): WorkflowHandle; - send>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise; - getEvent>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise; + send(destinationUUID: string, message: NonNullable, topic?: string, idempotencyKey?: string): Promise; + getEvent>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise; getHandlersCallback(): (req: IncomingMessage | Http2ServerRequest, res: ServerResponse | Http2ServerResponse) => Promise; getAdminCallback(): (req: IncomingMessage | Http2ServerRequest, res: ServerResponse | Http2ServerResponse) => Promise; @@ -64,7 +63,7 @@ export interface TestingRuntime { setConfig(key: string, newValue: T): void; // User database operations. - queryUserDB(sql: string, ...params: any[]): Promise; // Execute a raw SQL query on the user database. + queryUserDB(sql: string, ...params: unknown[]): Promise; // Execute a raw SQL query on the user database. createUserSchema(): Promise; // Only valid if using TypeORM. Create tables based on the provided schema. dropUserSchema(): Promise; // Only valid if using TypeORM. Drop all tables created by createUserSchema(). @@ -144,8 +143,7 @@ export class TestingRuntimeImpl implements TestingRuntime { mainInvoke(object: T, workflowUUID: string | undefined, params: WorkflowInvokeParams | undefined, asyncWf: boolean): InvokeFuncs { const dbosExec = this.getDBOSExec(); const ops = getRegisteredOperations(object); - - const proxy: any = {}; + const proxy: Record = {}; // Creates a context to pass in necessary info. const span = dbosExec.tracer.startSpan("test"); @@ -157,22 +155,16 @@ export class TestingRuntimeImpl implements TestingRuntime { const wfParams: WorkflowParams = { workflowUUID: workflowUUID, parentCtx: oc }; for (const op of ops) { if (asyncWf) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access proxy[op.name] = op.txnConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => dbosExec.transaction(op.registeredFunction as Transaction, wfParams, ...args) + ? (...args: unknown[]) => dbosExec.transaction(op.registeredFunction as Transaction, wfParams, ...args) : op.workflowConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => dbosExec.workflow(op.registeredFunction as Workflow, wfParams, ...args) + ? (...args: unknown[]) => dbosExec.workflow(op.registeredFunction as Workflow, wfParams, ...args) : op.commConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => dbosExec.external(op.registeredFunction as Communicator, wfParams, ...args) + ? (...args: unknown[]) => dbosExec.external(op.registeredFunction as Communicator, wfParams, ...args) : undefined; } else { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access proxy[op.name] = op.workflowConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => dbosExec.workflow(op.registeredFunction as Workflow, wfParams, ...args).then((handle) => handle.getResult()) + ? (...args: unknown[]) => dbosExec.workflow(op.registeredFunction as Workflow, wfParams, ...args).then((handle) => handle.getResult()) : undefined; } } @@ -208,11 +200,11 @@ export class TestingRuntimeImpl implements TestingRuntime { return this.#server.adminApp.callback(); } - async send>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise { + async send(destinationUUID: string, message: NonNullable, topic?: string, idempotencyKey?: string): Promise { return this.getDBOSExec().send(destinationUUID, message, topic, idempotencyKey); } - async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { + async getEvent>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { return this.getDBOSExec().getEvent(workflowUUID, key, timeoutSeconds); } @@ -220,8 +212,7 @@ export class TestingRuntimeImpl implements TestingRuntime { return this.getDBOSExec().retrieveWorkflow(workflowUUID); } - async queryUserDB(sql: string, ...params: any[]): Promise { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + async queryUserDB(sql: string, ...params: unknown[]): Promise { return this.getDBOSExec().userDatabase.query(sql, ...params); } diff --git a/src/transaction.ts b/src/transaction.ts index b864c0c05..63ff07872 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ import { UserDatabaseName, UserDatabaseClient } from "./user_database"; import { WorkflowContextImpl } from "./workflow"; import { Span } from "@opentelemetry/sdk-trace-base"; @@ -8,7 +7,7 @@ import { GlobalLogger as Logger } from "./telemetry/logs"; import { WorkflowContextDebug } from "./debugger/debug_workflow"; // Can we call it TransactionFunction -export type Transaction = (ctxt: TransactionContext, ...args: T) => Promise; +export type Transaction = (ctxt: TransactionContext, ...args: unknown[]) => Promise; export interface TransactionConfig { isolationLevel?: IsolationLevel; diff --git a/src/user_database.ts b/src/user_database.ts index e645edb33..1f9c533e4 100644 --- a/src/user_database.ts +++ b/src/user_database.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ import { Pool, PoolConfig, PoolClient, DatabaseError as PGDatabaseError, QueryResultRow } from "pg"; import { createUserDBSchema, userDBIndex, userDBSchema } from "../schemas/user_db_schema"; import { IsolationLevel, TransactionConfig } from "./transaction"; @@ -409,9 +408,7 @@ export class KnexUserDatabase implements UserDatabase { async queryWithClient(client: Knex, sql: string, ...uparams: T): Promise { const knexSql = sql.replace(/\$\d+/g, '?'); // Replace $1, $2... with ? - let params = uparams as any[]; - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - params = params.map(i => i === undefined ? null : i); // Set undefined parameters to null. + const params = uparams.map(i => i === undefined ? null : i); // Set undefined parameters to null. const rows = await client.raw(knexSql, params) as { rows: R[] }; return rows.rows; } diff --git a/src/workflow.ts b/src/workflow.ts index db3d6299b..0b7aa0093 100644 --- a/src/workflow.ts +++ b/src/workflow.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "./dbos-executor"; import { transaction_outputs } from "../schemas/user_db_schema"; import { IsolationLevel, Transaction, TransactionContext, TransactionContextImpl } from "./transaction"; @@ -13,14 +12,18 @@ import { Span } from "@opentelemetry/sdk-trace-base"; import { HTTPRequest, DBOSContext, DBOSContextImpl } from './context'; import { getRegisteredOperations } from "./decorators"; -export type Workflow = (ctxt: WorkflowContext, ...args: T) => Promise; +/* eslint-disable-next-line @typescript-eslint/no-explicit-any */ +export type Workflow = (ctxt: WorkflowContext, ...args: any[]) => Promise; // Utility type that removes the initial parameter of a function +/* eslint-disable-next-line @typescript-eslint/no-explicit-any */ export type TailParameters any> = T extends (arg: any, ...args: infer P) => any ? P : never; // local type declarations for transaction and communicator functions -type TxFunc = (ctxt: TransactionContext, ...args: any[]) => Promise; -type CommFunc = (ctxt: CommunicatorContext, ...args: any[]) => Promise; +/* eslint-disable-next-line @typescript-eslint/no-explicit-any */ +type TxFunc = (ctxt: TransactionContext, ...args: any[]) => Promise; +/* eslint-disable-next-line @typescript-eslint/no-explicit-any */ +type CommFunc = (ctxt: CommunicatorContext, ...args: any[]) => Promise; // Utility type that only includes transaction/communicator functions + converts the method signature to exclude the context parameter export type WFInvokeFuncs = { @@ -62,15 +65,15 @@ export const StatusString = { export interface WorkflowContext extends DBOSContext { invoke(targetClass: T): WFInvokeFuncs; - startChildWorkflow(wf: Workflow, ...args: T): Promise>; - invokeChildWorkflow(wf: Workflow, ...args: T): Promise; - childWorkflow(wf: Workflow, ...args: T): Promise>; // Deprecated, calls startChildWorkflow + startChildWorkflow(wf: Workflow, ...args: unknown[]): Promise>; + invokeChildWorkflow(wf: Workflow, ...args: unknown[]): Promise; + childWorkflow(wf: Workflow, ...args: unknown[]): Promise>; // Deprecated, calls startChildWorkflow - send>(destinationUUID: string, message: T, topic?: string): Promise; - recv>(topic?: string, timeoutSeconds?: number): Promise; - setEvent>(key: string, value: T): Promise; + send(destinationUUID: string, message: NonNullable, topic?: string): Promise; + recv>(topic?: string, timeoutSeconds?: number): Promise; + setEvent(key: string, value: NonNullable): Promise; - getEvent>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise; + getEvent>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise; retrieveWorkflow(workflowUUID: string): WorkflowHandle; sleepms(durationMS: number): Promise; @@ -175,7 +178,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont try { let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot, created_at) VALUES "; let paramCnt = 1; - const values: any[] = []; + const values: unknown[] = []; for (const funcID of funcIDs) { // Capture output and also transaction snapshot information. // Initially, no txn_id because no queries executed. @@ -190,7 +193,6 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont values.push(this.workflowUUID, funcID, JSON.stringify(output), JSON.stringify(null), txnSnapshot, createdAt); } this.logger.debug(sqlStmt); - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument await this.#dbosExec.userDatabase.queryWithClient(client, sqlStmt, ...values); } catch (error) { if (this.#dbosExec.userDatabase.isKeyConflictError(error)) { @@ -243,19 +245,19 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont * We pass in itself as a parent context and assign the child workflow with a deterministic UUID "this.workflowUUID-functionID". * We also pass in its own workflowUUID and function ID so the invoked handle is deterministic. */ - async startChildWorkflow(wf: Workflow, ...args: T): Promise> { + async startChildWorkflow(wf: Workflow, ...args: unknown[]): Promise> { // Note: cannot use invoke for childWorkflow because of potential recursive types on the workflow itself. const funcId = this.functionIDGetIncrement(); const childUUID: string = this.workflowUUID + "-" + funcId; return this.#dbosExec.internalWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args); } - async invokeChildWorkflow(wf: Workflow, ...args: T): Promise { + async invokeChildWorkflow(wf: Workflow, ...args: unknown[]): Promise { return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult()); } // Deprecated - async childWorkflow(wf: Workflow, ...args: T): Promise> { + async childWorkflow(wf: Workflow, ...args: unknown[]): Promise> { return this.startChildWorkflow(wf, ...args); } @@ -265,7 +267,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont * If the transaction encounters a Postgres serialization error, retry it. * If it encounters any other error, throw it. */ - async transaction(txn: Transaction, ...args: T): Promise { + async transaction(txn: Transaction, ...args: unknown[]): Promise { const txnInfo = this.#dbosExec.transactionInfoMap.get(txn.name); if (txnInfo === undefined) { throw new DBOSNotRegisteredError(txn.name); @@ -376,7 +378,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont * If it encounters any error, retry according to its configured retry policy until the maximum number of attempts is reached, then throw an DBOSError. * The communicator may execute many times, but once it is complete, it will not re-execute. */ - async external(commFn: Communicator, ...args: T): Promise { + async external(commFn: Communicator, ...args: unknown[]): Promise { const commInfo = this.#dbosExec.communicatorInfoMap.get(commFn.name); if (commInfo === undefined) { throw new DBOSNotRegisteredError(commFn.name); @@ -471,7 +473,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont * Send a message to a workflow identified by a UUID. * The message can optionally be tagged with a topic. */ - async send>(destinationUUID: string, message: T, topic?: string): Promise { + async send(destinationUUID: string, message: NonNullable, topic?: string): Promise { const functionID: number = this.functionIDGetIncrement(); await this.#dbosExec.userDatabase.transaction(async (client: UserDatabaseClient) => { @@ -487,7 +489,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont * If a topic is specified, retrieve the oldest message tagged with that topic. * Otherwise, retrieve the oldest message with no topic. */ - async recv>(topic?: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { + async recv>(topic?: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { const functionID: number = this.functionIDGetIncrement(); await this.#dbosExec.userDatabase.transaction(async (client: UserDatabaseClient) => { @@ -502,7 +504,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont * Emit a workflow event, represented as a key-value pair. * Events are immutable once set. */ - async setEvent>(key: string, value: T) { + async setEvent(key: string, value: NonNullable) { const functionID: number = this.functionIDGetIncrement(); await this.#dbosExec.userDatabase.transaction(async (client: UserDatabaseClient) => { @@ -519,16 +521,13 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont */ invoke(object: T): WFInvokeFuncs { const ops = getRegisteredOperations(object); + const proxy: Record = {}; - const proxy: any = {}; for (const op of ops) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access proxy[op.name] = op.txnConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => this.transaction(op.registeredFunction as Transaction, ...args) + ? (...args: unknown[]) => this.transaction(op.registeredFunction as Transaction, ...args) : op.commConfig - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - ? (...args: any[]) => this.external(op.registeredFunction as Communicator, ...args) + ? (...args: unknown[]) => this.external(op.registeredFunction as Communicator, ...args) : undefined; } return proxy as WFInvokeFuncs; @@ -537,7 +536,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont /** * Wait for a workflow to emit an event, then return its value. */ - getEvent>(targetUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { + getEvent>(targetUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise { const functionID: number = this.functionIDGetIncrement(); return this.#dbosExec.systemDatabase.getEvent(targetUUID, key, timeoutSeconds, this.workflowUUID, functionID); } diff --git a/tests/dbos.test.ts b/tests/dbos.test.ts index db5e0c627..5f6eb5dc6 100644 --- a/tests/dbos.test.ts +++ b/tests/dbos.test.ts @@ -315,7 +315,6 @@ class DBOSTestClass { await ctxt.send(destinationUUID, "message2"); } - @Workflow() static async setEventWorkflow(ctxt: WorkflowContext) { await ctxt.setEvent("key1", "value1");