Skip to content

Commit 29e6c12

Browse files
committed
WorkflowRunOperation
1 parent d29dee1 commit 29e6c12

File tree

10 files changed

+368
-9
lines changed

10 files changed

+368
-9
lines changed

packages/client/src/internal.ts

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { temporal } from '@temporalio/proto';
2+
3+
// A key used internally to pass "hidden options to the WorkflowClient.start() call.
4+
export const InternalWorkflowStartOptionsKey = Symbol.for('__temporal_client_internal_workflow_start_options');
5+
export interface InternalWorkflowStartOptions {
6+
requestId?: string;
7+
/**
8+
* Callbacks to be called by the server when this workflow reaches a terminal state.
9+
* If the workflow continues-as-new, these callbacks will be carried over to the new execution.
10+
* Callback addresses must be whitelisted in the server's dynamic configuration.
11+
*/
12+
completionCallbacks?: temporal.api.common.v1.ICallback[];
13+
/** Links to be associated with the workflow. */
14+
links?: temporal.api.common.v1.ILink[];
15+
/**
16+
* Backlink copied by the client from the StartWorkflowExecutionResponse. Only populated in servers newer than 1.27.
17+
*/
18+
backLink?: temporal.api.common.v1.ILink;
19+
20+
/**
21+
* Conflict options for when USE_EXISTING is specified.
22+
*
23+
* Used by the nexus WorkflowRunOperations to attach to a callback to a running workflow.
24+
*/
25+
onConflictOptions?: temporal.api.workflow.v1.IOnConflictOptions;
26+
}

packages/client/src/workflow-client.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import {
9191
} from './base-client';
9292
import { mapAsyncIterable } from './iterators-utils';
9393
import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage';
94+
import { InternalWorkflowStartOptions, InternalWorkflowStartOptionsKey } from './internal';
9495

9596
const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
9697

@@ -1251,7 +1252,12 @@ export class WorkflowClient extends BaseClient {
12511252
const req = await this.createStartWorkflowRequest(input);
12521253
const { options: opts, workflowType } = input;
12531254
try {
1254-
return (await this.workflowService.startWorkflowExecution(req)).runId;
1255+
const response = await this.workflowService.startWorkflowExecution(req);
1256+
const internalOptions = (opts as any)[InternalWorkflowStartOptionsKey] as InternalWorkflowStartOptions | undefined;
1257+
if (internalOptions != null) {
1258+
internalOptions.backLink = response.link ?? undefined;
1259+
}
1260+
return response.runId;
12551261
} catch (err: any) {
12561262
if (err.code === grpcStatus.ALREADY_EXISTS) {
12571263
throw new WorkflowExecutionAlreadyStartedError(
@@ -1267,11 +1273,13 @@ export class WorkflowClient extends BaseClient {
12671273
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
12681274
const { options: opts, workflowType, headers } = input;
12691275
const { identity, namespace } = this.options;
1276+
const internalOptions = (opts as any)[InternalWorkflowStartOptionsKey] as InternalWorkflowStartOptions | undefined;
12701277

1278+
console.log(internalOptions);
12711279
return {
12721280
namespace,
12731281
identity,
1274-
requestId: uuid4(),
1282+
requestId: internalOptions?.requestId ?? uuid4(),
12751283
workflowId: opts.workflowId,
12761284
workflowIdReusePolicy: encodeWorkflowIdReusePolicy(opts.workflowIdReusePolicy),
12771285
workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(opts.workflowIdConflictPolicy),
@@ -1296,6 +1304,7 @@ export class WorkflowClient extends BaseClient {
12961304
cronSchedule: opts.cronSchedule,
12971305
header: { fields: headers },
12981306
priority: opts.priority ? compilePriority(opts.priority) : undefined,
1307+
...internalOptions,
12991308
};
13001309
}
13011310

packages/nexus/src/context.ts

+82-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
import { HandlerContext as BaseHandlerContext, getHandlerContext } from 'nexus-rpc/lib/handler';
2-
import { Logger, LogLevel, LogMetadata } from '@temporalio/common';
3-
import { Client } from '@temporalio/client';
1+
import * as nexus from 'nexus-rpc';
2+
import { HandlerContext as BaseHandlerContext, getHandlerContext, handlerLinks } from 'nexus-rpc/lib/handler';
3+
import { Logger, LogLevel, LogMetadata, Workflow } from '@temporalio/common';
4+
import { Client, WorkflowStartOptions } from '@temporalio/client';
5+
import { temporal } from '@temporalio/proto';
6+
import { InternalWorkflowStartOptionsKey, InternalWorkflowStartOptions } from '@temporalio/client/lib/internal';
7+
import { generateWorkflowRunOperationToken, loadWorkflowRunOperationToken } from './token';
8+
import { convertNexusLinkToWorkflowEventLink, convertWorkflowEventLinkToNexusLink } from './link-converter';
49

510
export interface HandlerContext extends BaseHandlerContext {
611
log: Logger;
712
client: Client;
13+
namespace: string;
814
}
915

1016
function getLogger() {
@@ -35,6 +41,8 @@ export const log: Logger = {
3541
},
3642
};
3743

44+
// TODO: also support getting a metrics handler.
45+
3846
/**
3947
* Returns a client to be used in a Nexus Operation's context, this Client is powered by the same Connection that the
4048
* worker was created with.
@@ -43,4 +51,74 @@ export function getClient(): Client {
4351
return getHandlerContext<HandlerContext>().client;
4452
}
4553

46-
// TODO: also support getting a metrics handler.
54+
export interface WorkflowHandle<_T> {
55+
readonly workflowId: string;
56+
readonly runId: string;
57+
}
58+
59+
export async function startWorkflow<T extends Workflow>(workflowTypeOrFunc: string | T, workflowOptions: WorkflowStartOptions<T>, nexusOptions: nexus.StartOperationOptions): Promise<WorkflowHandle<T>> {
60+
const links = Array<temporal.api.common.v1.ILink>();
61+
if (nexusOptions.links?.length > 0) {
62+
for (const l of nexusOptions.links) {
63+
try {
64+
links.push({
65+
workflowEvent: convertNexusLinkToWorkflowEventLink(l),
66+
});
67+
} catch (error) {
68+
log.warn('failed to convert Nexus link to Workflow event link', { error });
69+
}
70+
}
71+
}
72+
const internalOptions: InternalWorkflowStartOptions = { links, requestId: nexusOptions.requestId };
73+
74+
if (workflowOptions.workflowIdConflictPolicy === 'USE_EXISTING') {
75+
internalOptions.onConflictOptions = {
76+
attachLinks: true,
77+
attachCompletionCallbacks: true,
78+
attachRequestId: true,
79+
};
80+
}
81+
82+
if (nexusOptions.callbackURL) {
83+
internalOptions.completionCallbacks = [
84+
{
85+
nexus: {url: nexusOptions.callbackURL, header: nexusOptions.callbackHeaders},
86+
links, // pass in links here as well, the server dedupes them.
87+
},
88+
];
89+
}
90+
(workflowOptions as any)[InternalWorkflowStartOptionsKey] = internalOptions;
91+
const handle = await getClient().workflow.start<T>(workflowTypeOrFunc, workflowOptions);
92+
if (internalOptions.backLink?.workflowEvent != null) {
93+
try {
94+
handlerLinks().push(convertWorkflowEventLinkToNexusLink(internalOptions.backLink.workflowEvent));
95+
} catch (error) {
96+
log.warn('failed to convert Workflow event link to Nexus link', { error });
97+
}
98+
}
99+
return { workflowId: handle.workflowId, runId: handle.firstExecutionRunId };
100+
}
101+
102+
export type WorkflowRunOperationHandler<I, O> = (input: I, options: nexus.StartOperationOptions) => Promise<WorkflowHandle<O>>;
103+
104+
export class WorkflowRunOperation<I, O> implements nexus.OperationHandler<I, O> {
105+
constructor(readonly handler: WorkflowRunOperationHandler<I, O>) { }
106+
107+
async start(input: I, options: nexus.StartOperationOptions): Promise<nexus.HandlerStartOperationResult<O>> {
108+
const { namespace } = getHandlerContext<HandlerContext>();
109+
const handle = await this.handler(input, options);
110+
return { token: generateWorkflowRunOperationToken(namespace, handle.workflowId) };
111+
}
112+
getResult(_token: string, _options: nexus.GetOperationResultOptions): Promise<O> {
113+
// Not implemented in Temporal yet.
114+
throw new nexus.HandlerError({ type: 'NOT_IMPLEMENTED', message: 'Method not implemented' });
115+
}
116+
getInfo(_token: string, _options: nexus.GetOperationInfoOptions): Promise<nexus.OperationInfo> {
117+
// Not implemented in Temporal yet.
118+
throw new nexus.HandlerError({ type: 'NOT_IMPLEMENTED', message: 'Method not implemented' });
119+
}
120+
async cancel(token: string, _options: nexus.CancelOperationOptions): Promise<void> {
121+
const decoded = loadWorkflowRunOperationToken(token);
122+
await getClient().workflow.getHandle(decoded.wid).cancel();
123+
}
124+
}

packages/nexus/src/index.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
1-
export { log, getClient } from './context';
1+
export {
2+
log,
3+
getClient,
4+
startWorkflow,
5+
WorkflowHandle,
6+
WorkflowRunOperation,
7+
WorkflowRunOperationHandler,
8+
} from './context';

packages/nexus/src/token.ts

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
const OPERATION_TOKEN_TYPE_WORKFLOW_RUN = 1;
2+
3+
// OperationTokenType is used to identify the type of operation token.
4+
// Currently, we only have one type of operation token: WorkflowRun.
5+
type OperationTokenType = typeof OPERATION_TOKEN_TYPE_WORKFLOW_RUN;
6+
7+
interface WorkflowRunOperationToken {
8+
// Version of the token, by default we assume we're on version 1, this field is not emitted as part of the output,
9+
// it's only used to reject newer token versions on load.
10+
v?: number;
11+
// Type of the operation. Must be OPERATION_TOKEN_TYPE_WORKFLOW_RUN.
12+
t: OperationTokenType;
13+
ns: string;
14+
wid: string;
15+
}
16+
17+
/**
18+
* Generate a workflow run operation token.
19+
*/
20+
export function generateWorkflowRunOperationToken(namespace: string, workflowID: string): string {
21+
const token: WorkflowRunOperationToken = {
22+
t: OPERATION_TOKEN_TYPE_WORKFLOW_RUN,
23+
ns: namespace,
24+
wid: workflowID,
25+
};
26+
return base64URLEncodeNoPadding(JSON.stringify(token));
27+
}
28+
29+
/**
30+
* Load and validate a workflow run operation token.
31+
*/
32+
export function loadWorkflowRunOperationToken(data: string): WorkflowRunOperationToken {
33+
if (!data) {
34+
throw new TypeError('invalid workflow run token: token is empty');
35+
}
36+
let decoded: string;
37+
try {
38+
decoded = base64URLDecodeNoPadding(data);
39+
} catch (err) {
40+
throw new TypeError('failed to decode token', {cause: err});
41+
}
42+
let token: WorkflowRunOperationToken;
43+
try {
44+
token = JSON.parse(decoded);
45+
} catch (err) {
46+
throw new TypeError('failed to unmarshal workflow run operation token', {cause: err});
47+
}
48+
if (token.t !== OPERATION_TOKEN_TYPE_WORKFLOW_RUN) {
49+
throw new TypeError(`invalid workflow token type: ${token.t}, expected: ${OPERATION_TOKEN_TYPE_WORKFLOW_RUN}`);
50+
}
51+
if (token.v !== undefined && token.v !== 0) {
52+
throw new TypeError('invalid workflow run token: "v" field should not be present');
53+
}
54+
if (!token.wid) {
55+
throw new TypeError('invalid workflow run token: missing workflow ID (wid)');
56+
}
57+
return token;
58+
}
59+
60+
// Exported for use in tests.
61+
export function base64URLEncodeNoPadding(str: string): string {
62+
const base64 = Buffer.from(str).toString('base64url');
63+
return base64.replace(/=+$/, '');
64+
}
65+
66+
function base64URLDecodeNoPadding(str: string): string {
67+
// Validate the string contains only valid base64URL characters
68+
if (!/^[-A-Za-z0-9_]*$/.test(str)) {
69+
throw new TypeError('invalid base64URL encoded string: contains invalid characters');
70+
}
71+
72+
const paddingLength = str.length % 4;
73+
if (paddingLength > 0) {
74+
str += '='.repeat(4 - paddingLength);
75+
}
76+
77+
return Buffer.from(str, 'base64url').toString('utf-8');
78+
}

0 commit comments

Comments
 (0)