Skip to content

Worker Deployment Versioning #1679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,7 @@ export class WorkflowClient extends BaseClient {
cronSchedule: options.cronSchedule,
header: { fields: headers },
priority: options.priority ? compilePriority(options.priority) : undefined,
versioningOverride: options.versioningOverride ?? undefined,
};
try {
return (await this.workflowService.signalWithStartWorkflowExecution(req)).runId;
Expand Down Expand Up @@ -1296,6 +1297,7 @@ export class WorkflowClient extends BaseClient {
cronSchedule: opts.cronSchedule,
header: { fields: headers },
priority: opts.priority ? compilePriority(opts.priority) : undefined,
versioningOverride: opts.versioningOverride ?? undefined,
};
}

Expand Down
46 changes: 42 additions & 4 deletions packages/client/src/workflow-options.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { CommonWorkflowOptions, SignalDefinition, WithWorkflowArgs, Workflow } from '@temporalio/common';
import {
CommonWorkflowOptions,
SignalDefinition,
WithWorkflowArgs,
Workflow,
VersioningOverride,
toCanonicalString,
} from '@temporalio/common';
import { Duration, msOptionalToTs } from '@temporalio/common/lib/time';
import { Replace } from '@temporalio/common/lib/type-helpers';
import { google } from '@temporalio/proto';
import { google, temporal } from '@temporalio/proto';

export * from '@temporalio/common/lib/workflow-options';

Expand Down Expand Up @@ -38,9 +45,15 @@ export interface WorkflowOptions extends CommonWorkflowOptions {

/**
* Amount of time to wait before starting the workflow.
*
*/
startDelay?: Duration;

/**
* Override the versioning behavior of the Workflow that is about to be started.
*
* @experimental Deployment based versioning is experimental and may change in the future.
*/
versioningOverride?: VersioningOverride;
}

export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
Expand All @@ -50,18 +63,21 @@ export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
workflowRunTimeout?: google.protobuf.IDuration;
workflowTaskTimeout?: google.protobuf.IDuration;
startDelay?: google.protobuf.IDuration;
versioningOverride?: temporal.api.workflow.v1.IVersioningOverride;
}
>;

export function compileWorkflowOptions<T extends WorkflowOptions>(options: T): WithCompiledWorkflowOptions<T> {
const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, ...rest } = options;
const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, versioningOverride, ...rest } =
options;

return {
...rest,
workflowExecutionTimeout: msOptionalToTs(workflowExecutionTimeout),
workflowRunTimeout: msOptionalToTs(workflowRunTimeout),
workflowTaskTimeout: msOptionalToTs(workflowTaskTimeout),
startDelay: msOptionalToTs(startDelay),
versioningOverride: versioningOverrideToProto(versioningOverride),
};
}

Expand Down Expand Up @@ -109,3 +125,25 @@ export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]
* Options for starting a Workflow
*/
export type WorkflowStartOptions<T extends Workflow = Workflow> = WithWorkflowArgs<T, WorkflowOptions>;

function versioningOverrideToProto(
vo: VersioningOverride | undefined
): temporal.api.workflow.v1.IVersioningOverride | undefined {
if (!vo) return undefined;

// TODO: Remove deprecated field assignments when versioning is non-experimental
if (vo === 'AUTO_UPGRADE') {
return {
autoUpgrade: true,
behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE,
};
}

return {
pinned: {
version: vo.pinnedTo,
},
behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED,
pinnedVersion: toCanonicalString(vo.pinnedTo),
};
}
2 changes: 2 additions & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export * from './logger';
export * from './priority';
export * from './retry-policy';
export type { Timestamp, Duration, StringValue } from './time';
export * from './worker-deployments';
export * from './workflow-definition-options';
export * from './workflow-handle';
export * from './workflow-options';
export * from './versioning-intent';
Expand Down
70 changes: 70 additions & 0 deletions packages/common/src/worker-deployments.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import type { temporal } from '@temporalio/proto';
import { makeProtoEnumConverters } from './internal-workflow';

/**
* Represents the version of a specific worker deployment.
*
* @experimental Deployment based versioning is experimental and may change in the future.
*/
export interface WorkerDeploymentVersion {
readonly buildId: string;
readonly deploymentName: string;
}

/**
* @returns The canonical representation of a deployment version, which is a string in the format
* `deploymentName.buildId`.
*/
export function toCanonicalString(version: WorkerDeploymentVersion): string {
return `${version.deploymentName}.${version.buildId}`;
}

/**
* Specifies when a workflow might move from a worker of one Build Id to another.
*
* * 'PINNED' - The workflow will be pinned to the current Build ID unless manually moved.
* * 'AUTO_UPGRADE' - The workflow will automatically move to the latest version (default Build ID
* of the task queue) when the next task is dispatched.
*
* @experimental Deployment based versioning is experimental and may change in the future.
*/
export const VersioningBehavior = {
PINNED: 'PINNED',
AUTO_UPGRADE: 'AUTO_UPGRADE',
} as const;
export type VersioningBehavior = (typeof VersioningBehavior)[keyof typeof VersioningBehavior];

export const [encodeVersioningBehavior, decodeVersioningBehavior] = makeProtoEnumConverters<
temporal.api.enums.v1.VersioningBehavior,
typeof temporal.api.enums.v1.VersioningBehavior,
keyof typeof temporal.api.enums.v1.VersioningBehavior,
typeof VersioningBehavior,
'VERSIONING_BEHAVIOR_'
>(
{
[VersioningBehavior.PINNED]: 1,
[VersioningBehavior.AUTO_UPGRADE]: 2,
UNSPECIFIED: 0,
} as const,
'VERSIONING_BEHAVIOR_'
);

/**
* Represents versioning overrides. For example, when starting workflows.
*/
export type VersioningOverride = PinnedVersioningOverride | 'AUTO_UPGRADE';

/**
* Workflow will be pinned to a specific deployment version.
*/
export interface PinnedVersioningOverride {
/**
* The worker deployment version to pin the workflow to.
*/
pinnedTo: WorkerDeploymentVersion;
}

/**
* The workflow will auto-upgrade to the current deployment version on the next workflow task.
*/
export type AutoUpgradeVersioningOverride = 'AUTO_UPGRADE';
20 changes: 20 additions & 0 deletions packages/common/src/workflow-definition-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { VersioningBehavior } from './worker-deployments';

/**
* Options that can be used when defining a workflow via {@link setWorkflowOptions}.
*/
export interface WorkflowDefinitionOptions {
versioningBehavior?: VersioningBehavior;
}

type AsyncFunction<Args extends any[], ReturnType> = (...args: Args) => Promise<ReturnType>;
export type WorkflowDefinitionOptionsOrGetter = WorkflowDefinitionOptions | (() => WorkflowDefinitionOptions);

/**
* @internal
* @hidden
* A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}.
*/
export interface WorkflowFunctionWithOptions<Args extends any[], ReturnType> extends AsyncFunction<Args, ReturnType> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have any value being exported publicly? If yes, then AsyncFunction should also be exported. If not, then please add @internal and @hidden (I will have to experiment a bit more to figure out how to reduce this to a single annotation, but in the mean time, let's use both).

workflowDefinitionOptions: WorkflowDefinitionOptionsOrGetter;
}
5 changes: 4 additions & 1 deletion packages/common/src/workflow-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Duration } from './time';
import { makeProtoEnumConverters } from './internal-workflow';
import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes';
import { Priority } from './priority';
import { WorkflowFunctionWithOptions } from './workflow-definition-options';

/**
* Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow.
Expand Down Expand Up @@ -243,7 +244,9 @@ export interface WorkflowDurationOptions {

export type CommonWorkflowOptions = BaseWorkflowOptions & WorkflowDurationOptions;

export function extractWorkflowType<T extends Workflow>(workflowTypeOrFunc: string | T): string {
export function extractWorkflowType<T extends Workflow>(
workflowTypeOrFunc: string | T | WorkflowFunctionWithOptions<any[], any>
): string {
if (typeof workflowTypeOrFunc === 'string') return workflowTypeOrFunc as string;
if (typeof workflowTypeOrFunc === 'function') {
if (workflowTypeOrFunc?.name) return workflowTypeOrFunc.name;
Expand Down
Loading
Loading