Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ jobs:
--dynamic-config-value matching.useNewMatcher=true \
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true &
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
--dynamic-config-value system.enableDeploymentVersions=true &
sleep 10s

- name: Run unit tests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.common;

import io.temporal.worker.WorkerDeploymentOptions;

/** Specifies when a workflow might move from a worker of one Build Id to another. */
@Experimental
public enum VersioningBehavior {
/**
* An unspecified versioning behavior. By default, workers opting into worker versioning will be
* required to specify a behavior. See {@link
* io.temporal.worker.WorkerOptions.Builder#setDeploymentOptions(WorkerDeploymentOptions)}.
*/
UNSPECIFIED,
/** The workflow will be pinned to the current Build ID unless manually moved. */
PINNED,
/**
* The workflow will automatically move to the latest version (default Build ID of the task queue)
* when the next task is dispatched.
*/
AUTO_UPGRADE
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue tracking the client-side changes? Specifically, version info for list, describe, and scheduling, and the new versioning calls on the client. (asking at top of this file because this stuff is easier in a thread than PR comment and there's no better place)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.common;

import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** Represents the version of a specific worker deployment. */
@Experimental
public class WorkerDeploymentVersion {
private final String deploymentName;
private final String buildId;

/** Build a worker deployment version from an explicit deployment name and build ID. */
public WorkerDeploymentVersion(@Nonnull String deploymentName, @Nonnull String buildId) {
this.deploymentName = deploymentName;
this.buildId = buildId;
}

/**
* Build a worker deployment version from a canonical string representation.
*
* @param canonicalString The canonical string representation of the worker deployment version,
* formatted as "deploymentName.buildId". Deployment name must not have a "." in it.
* @return A new instance of {@link WorkerDeploymentVersion}.
* @throws IllegalArgumentException if the input string is not in the expected format.
*/
public static WorkerDeploymentVersion fromCanonicalString(String canonicalString) {
String[] parts = canonicalString.split("\\.", 2);
if (parts.length != 2) {
throw new IllegalArgumentException(
"Invalid canonical string format. Expected 'deploymentName.buildId'");
}
return new WorkerDeploymentVersion(parts[0], parts[1]);
}

/**
* @return The canonical string representation of this worker deployment version.
*/
public String toCanonicalString() {
return deploymentName + "." + buildId;
}
Comment on lines +56 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this specific class, I think just having this be the toString instead of the generated one below would be fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the current approach since the canonical format is different then the standard toString implementation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meh, not sure there is such thing as a "standard" toString implementation, especially for a basic dot-delimited tuple like this. Same as there isn't a standard builder expectation for such a simple class. Having said that, I don't mind if we keep them separate.

Copy link
Member Author

@Sushisource Sushisource Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I looked into this when I wrote it and it seems the consensus is that toString is more for debugging than canonical representations


/**
* @return The name of the deployment.
*/
@Nullable // Marked nullable for future compatibility with custom strings
public String getDeploymentName() {
return deploymentName;
}

/**
* @return The Build ID of this version.
*/
@Nullable // Marked nullable for future compatibility with custom strings
public String getBuildId() {
return buildId;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
WorkerDeploymentVersion that = (WorkerDeploymentVersion) o;
return Objects.equals(deploymentName, that.deploymentName)
&& Objects.equals(buildId, that.buildId);
}

@Override
public int hashCode() {
return Objects.hash(deploymentName, buildId);
}

@Override
public String toString() {
return "WorkerDeploymentVersion{"
+ "deploymentName='"
+ deploymentName
+ '\''
+ ", buildId='"
+ buildId
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import com.google.common.collect.ImmutableList;
import io.temporal.common.Experimental;
import io.temporal.common.VersioningBehavior;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.env.ReflectionUtils;
import io.temporal.workflow.WorkflowVersioningBehavior;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -68,6 +71,7 @@ public int hashCode() {
}
}

private final Class<?> implementationClass;
private final List<POJOWorkflowInterfaceMetadata> workflowInterfaces;
private final List<POJOWorkflowMethodMetadata> workflowMethods;
private final List<POJOWorkflowMethodMetadata> signalMethods;
Expand Down Expand Up @@ -111,6 +115,7 @@ private POJOWorkflowImplMetadata(
throw new IllegalArgumentException("concrete class expected: " + implClass);
}

implementationClass = implClass;
List<POJOWorkflowInterfaceMetadata> workflowInterfaces = new ArrayList<>();
Map<String, POJOWorkflowMethodMetadata> workflowMethods = new HashMap<>();
Map<String, POJOWorkflowMethodMetadata> queryMethods = new HashMap<>();
Expand Down Expand Up @@ -238,4 +243,35 @@ public List<POJOWorkflowMethodMetadata> getUpdateValidatorMethods() {
public @Nullable Constructor<?> getWorkflowInit() {
return workflowInit;
}

/**
* @return The {@link VersioningBehavior} for the workflow method on the implementation class. If
* the method is annotated with {@link WorkflowVersioningBehavior}.
* @throws RuntimeException if the method is not found on the implementation class or is not a
* workflow method.
*/
@Experimental
@Nullable
public static VersioningBehavior getVersioningBehaviorForMethod(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to make this a bit more generic if we plan to add other annotations to workflow impl. methods. Since this is experimental I am fine leaving it as is.

Class<?> implementationClass, POJOWorkflowMethodMetadata workflowMethod) {
Method method = workflowMethod.getWorkflowMethod();
// Find the same method on the implementation class
Method implMethod;
try {
implMethod = implementationClass.getMethod(method.getName(), method.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new RuntimeException(
"Unable to find workflow method "
+ workflowMethod.getName()
+ " in implementation class "
+ implementationClass.getName(),
e);
}
if (implMethod.isAnnotationPresent(WorkflowVersioningBehavior.class)) {
WorkflowVersioningBehavior vb = implMethod.getAnnotation(WorkflowVersioningBehavior.class);
return vb.value();
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.common.metadata;

import io.temporal.workflow.*;
import io.temporal.workflow.WorkflowVersioningBehavior;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
Expand Down Expand Up @@ -426,6 +427,14 @@ private static boolean validateAndQualifiedForWorkflowMethod(POJOWorkflowMethod
}
}

if (method.getAnnotation(WorkflowVersioningBehavior.class) != null) {
// This annotation is only allowed in implementation classes, not interfaces
throw new IllegalArgumentException(
"@WorkflowVersioningBehavior annotation is not allowed on interface methods, only on"
+ " implementation methods: "
+ method);
}

if (isAnnotatedWorkflowMethod) {
// all methods explicitly marked with one of workflow method qualifiers
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ public WorkflowTaskResult handleWorkflowTask(
if (workflowStateMachines.sdkVersionToWrite() != null) {
result.setWriteSdkVersion(workflowStateMachines.sdkVersionToWrite());
}
if (workflow.getWorkflowContext() != null) {
result.setVersioningBehavior(workflow.getWorkflowContext().getVersioningBehavior());
}
return result.build();
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ private Result createCompletedWFTRequest(
result.getNonfirstLocalActivityAttempts())
.build())
.setReturnNewWorkflowTask(result.isForceWorkflowTask())
.setVersioningBehavior(
WorkerVersioningProtoUtils.behaviorToProto(result.getVersioningBehavior()))
.setCapabilities(
RespondWorkflowTaskCompletedRequest.Capabilities.newBuilder()
.setDiscardSpeculativeWorkflowTaskWithEvents(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.replay;

import io.temporal.api.failure.v1.Failure;
import io.temporal.common.VersioningBehavior;
import io.temporal.common.context.ContextPropagator;
import io.temporal.internal.sync.SignalHandlerInfo;
import io.temporal.internal.sync.UpdateHandlerInfo;
Expand Down Expand Up @@ -72,4 +73,6 @@ public interface WorkflowContext {
Map<Long, SignalHandlerInfo> getRunningSignalHandlers();

Map<String, UpdateHandlerInfo> getRunningUpdateHandlers();

VersioningBehavior getVersioningBehavior();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.api.command.v1.Command;
import io.temporal.api.protocol.v1.Message;
import io.temporal.api.query.v1.WorkflowQueryResult;
import io.temporal.common.VersioningBehavior;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -43,6 +44,7 @@ public static final class Builder {
private List<Integer> sdkFlags;
private String writeSdkName;
private String writeSdkVersion;
private VersioningBehavior versioningBehavior;

public Builder setCommands(List<Command> commands) {
this.commands = commands;
Expand Down Expand Up @@ -89,6 +91,11 @@ public Builder setWriteSdkVersion(String writeSdkVersion) {
return this;
}

public Builder setVersioningBehavior(VersioningBehavior versioningBehavior) {
this.versioningBehavior = versioningBehavior;
return this;
}

public WorkflowTaskResult build() {
return new WorkflowTaskResult(
commands == null ? Collections.emptyList() : commands,
Expand All @@ -99,7 +106,8 @@ public WorkflowTaskResult build() {
nonfirstLocalActivityAttempts,
sdkFlags == null ? Collections.emptyList() : sdkFlags,
writeSdkName,
writeSdkVersion);
writeSdkVersion,
versioningBehavior == null ? VersioningBehavior.UNSPECIFIED : versioningBehavior);
}
}

Expand All @@ -112,6 +120,7 @@ public WorkflowTaskResult build() {
private final List<Integer> sdkFlags;
private final String writeSdkName;
private final String writeSdkVersion;
private final VersioningBehavior versioningBehavior;

private WorkflowTaskResult(
List<Command> commands,
Expand All @@ -122,7 +131,8 @@ private WorkflowTaskResult(
int nonfirstLocalActivityAttempts,
List<Integer> sdkFlags,
String writeSdkName,
String writeSdkVersion) {
String writeSdkVersion,
VersioningBehavior versioningBehavior) {
this.commands = commands;
this.messages = messages;
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
Expand All @@ -135,6 +145,7 @@ private WorkflowTaskResult(
this.sdkFlags = sdkFlags;
this.writeSdkName = writeSdkName;
this.writeSdkVersion = writeSdkVersion;
this.versioningBehavior = versioningBehavior;
}

public List<Command> getCommands() {
Expand Down Expand Up @@ -173,4 +184,8 @@ public String getWriteSdkName() {
public String getWriteSdkVersion() {
return writeSdkVersion;
}

public VersioningBehavior getVersioningBehavior() {
return versioningBehavior;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.sync;

import io.temporal.api.common.v1.Payloads;
import io.temporal.common.VersioningBehavior;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.converter.Values;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void initialize(Optional<Payloads> input) {
SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
RootWorkflowInboundCallsInterceptor rootWorkflowInvoker =
new RootWorkflowInboundCallsInterceptor(workflowContext, input);
this.rootWorkflowInvoker = rootWorkflowInvoker;
workflowInvoker = rootWorkflowInvoker;
for (WorkerInterceptor workerInterceptor : workerInterceptors) {
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
Expand All @@ -81,6 +83,14 @@ public Object getInstance() {
return rootWorkflowInvoker.getInstance();
}

@Override
public VersioningBehavior getVersioningBehavior() {
if (rootWorkflowInvoker == null || rootWorkflowInvoker.workflow == null) {
return VersioningBehavior.UNSPECIFIED;
}
return rootWorkflowInvoker.workflow.getVersioningBehavior();
}

class RootWorkflowInboundCallsInterceptor extends BaseRootWorkflowInboundCallsInterceptor {
private DynamicWorkflow workflow;
private Optional<Payloads> input;
Expand Down
Loading
Loading