Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Deployment Version strings in all user-facing APIs #7219

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions api/deployment/v1/message.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2,207 changes: 1,134 additions & 1,073 deletions api/deployment/v1/message.pb.go

Large diffs are not rendered by default.

851 changes: 426 additions & 425 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2,301 changes: 1,153 additions & 1,148 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

274 changes: 139 additions & 135 deletions api/taskqueue/v1/message.pb.go

Large diffs are not rendered by default.

26 changes: 12 additions & 14 deletions common/testing/testvars/test_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
updatepb "go.temporal.io/api/update/v1"
workflowpb "go.temporal.io/api/workflow/v1"
deploymentspb "go.temporal.io/server/api/deployment/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -260,30 +261,27 @@ func (tv *TestVars) Deployment() *deploymentpb.Deployment {
}
}

func (tv *TestVars) DeploymentVersion() *deploymentpb.WorkerDeploymentVersion {
return &deploymentpb.WorkerDeploymentVersion{
func (tv *TestVars) DeploymentVersion() *deploymentspb.WorkerDeploymentVersion {
return &deploymentspb.WorkerDeploymentVersion{
BuildId: tv.BuildID(),
DeploymentName: tv.DeploymentSeries(),
}
}

func (tv *TestVars) DeploymentVersionString() string {
return worker_versioning.WorkerDeploymentVersionToString(&deploymentpb.WorkerDeploymentVersion{
BuildId: tv.BuildID(),
DeploymentName: tv.DeploymentSeries(),
})
return worker_versioning.WorkerDeploymentVersionToString(tv.DeploymentVersion())
}

func (tv *TestVars) DeploymentVersionTransition() *workflowpb.DeploymentVersionTransition {
return &workflowpb.DeploymentVersionTransition{
DeploymentVersion: worker_versioning.DeploymentVersionFromDeployment(tv.Deployment()),
Version: tv.DeploymentVersionString(),
}
}

func (tv *TestVars) VersioningOverridePinned() *workflowpb.VersioningOverride {
return &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
Deployment: tv.Deployment(),
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
PinnedVersion: tv.DeploymentVersionString(),
}
}

Expand Down Expand Up @@ -417,13 +415,13 @@ func (tv *TestVars) Global() Global {
}

func (tv *TestVars) WorkerDeploymentOptions(versioned bool) *deploymentpb.WorkerDeploymentOptions {
m := enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED
m := enumspb.WORKER_VERSIONING_MODE_UNVERSIONED
if versioned {
m = enumspb.WORKFLOW_VERSIONING_MODE_VERSIONING_BEHAVIORS
m = enumspb.WORKER_VERSIONING_MODE_VERSIONED
}
return &deploymentpb.WorkerDeploymentOptions{
BuildId: tv.BuildID(),
DeploymentName: tv.DeploymentSeries(),
WorkflowVersioningMode: m,
BuildId: tv.BuildID(),
DeploymentName: tv.DeploymentSeries(),
WorkerVersioningMode: m,
}
}
104 changes: 57 additions & 47 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ import (
"math/rand"
"strings"

farm "github.com/dgryski/go-farm"
"github.com/dgryski/go-farm"
"github.com/temporalio/sqlparser"
commonpb "go.temporal.io/api/common/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
deploymentspb "go.temporal.io/server/api/deployment/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -172,7 +171,7 @@ func BuildIdIfUsingVersioning(stamp *commonpb.WorkerVersionStamp) string {
// DeploymentFromCapabilities returns the deployment if it is using versioning V3, otherwise nil.
// It returns the deployment from the `options` if present, otherwise, from `capabilities`,
func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities, options *deploymentpb.WorkerDeploymentOptions) *deploymentpb.Deployment {
if options.GetWorkflowVersioningMode() != enumspb.WORKFLOW_VERSIONING_MODE_UNVERSIONED &&
if options.GetWorkerVersioningMode() != enumspb.WORKER_VERSIONING_MODE_UNVERSIONED &&
options.GetDeploymentName() != "" &&
options.GetBuildId() != "" {
return &deploymentpb.Deployment{
Expand All @@ -189,9 +188,19 @@ func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities
return nil
}

func DeploymentVersionFromOptions(options *deploymentpb.WorkerDeploymentOptions) *deploymentspb.WorkerDeploymentVersion {
if options.GetWorkerVersioningMode() == enumspb.WORKER_VERSIONING_MODE_VERSIONED {
return &deploymentspb.WorkerDeploymentVersion{
DeploymentName: options.GetDeploymentName(),
BuildId: options.GetBuildId(),
}
}
return nil
}

// DeploymentOrVersion Temporary helper function to return a Deployment based on passed Deployment
// or WorkerDeploymentVersion objects, if `v` is not nil, it'll take precedence.
func DeploymentOrVersion(d *deploymentpb.Deployment, v *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
func DeploymentOrVersion(d *deploymentpb.Deployment, v *deploymentspb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if v != nil {
return DeploymentIfValid(DeploymentFromDeploymentVersion(v))
}
Expand Down Expand Up @@ -253,19 +262,19 @@ func MakeDirectiveForWorkflowTask(

// DeploymentVersionFromDeployment Temporary helper function to convert Deployment to
// WorkerDeploymentVersion proto until we update code to use the new proto in all places.
func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentpb.WorkerDeploymentVersion {
func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentspb.WorkerDeploymentVersion {
if deployment == nil {
return nil
}
return &deploymentpb.WorkerDeploymentVersion{
return &deploymentspb.WorkerDeploymentVersion{
BuildId: deployment.GetBuildId(),
DeploymentName: deployment.GetSeriesName(),
}
}

// DeploymentFromDeploymentVersion Temporary helper function to convert WorkerDeploymentVersion to
// Deployment proto until we update code to use the new proto in all places.
func DeploymentFromDeploymentVersion(dv *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment {
func DeploymentFromDeploymentVersion(dv *deploymentspb.WorkerDeploymentVersion) *deploymentpb.Deployment {
if dv == nil {
return nil
}
Expand Down Expand Up @@ -318,19 +327,32 @@ func ValidateDeployment(deployment *deploymentpb.Deployment) error {

// ValidateDeploymentVersion returns error if the deployment version is nil or it has empty version
// or deployment name.
func ValidateDeploymentVersion(version *deploymentpb.WorkerDeploymentVersion) error {
func ValidateDeploymentVersion(version *deploymentspb.WorkerDeploymentVersion) error {
if version == nil {
return serviceerror.NewInvalidArgument("deployment cannot be nil")
ShahabT marked this conversation as resolved.
Show resolved Hide resolved
}
if version.GetDeploymentName() == "" {
return serviceerror.NewInvalidArgument("deployment name name cannot be empty")
return serviceerror.NewInvalidArgument("deployment name cannot be empty")
}
if version.GetBuildId() == "" {
return serviceerror.NewInvalidArgument("build id cannot be empty")
}
return nil
}

// ValidateDeploymentVersionString returns error if the deployment version is nil or it has empty version
// or deployment name.
func ValidateDeploymentVersionString(version string) (*deploymentspb.WorkerDeploymentVersion, error) {
if version == "" {
return nil, serviceerror.NewInvalidArgument("version is required")
}
v, err := WorkerDeploymentVersionFromString(version)
if err != nil {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("invalid version string %q, expected format is \"<deployment_name>/<build_id>\"", version))
carlydf marked this conversation as resolved.
Show resolved Hide resolved
}
return v, nil
}

func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
if override == nil {
return nil
Expand All @@ -339,16 +361,17 @@ func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
case enumspb.VERSIONING_BEHAVIOR_PINNED:
if override.GetDeployment() != nil {
return ValidateDeployment(override.GetDeployment())
} else if override.GetPinnedVersion() != nil {
return ValidateDeploymentVersion(override.GetPinnedVersion())
} else if override.GetPinnedVersion() != "" {
_, err := ValidateDeploymentVersionString(override.GetPinnedVersion())
return err
} else {
return serviceerror.NewInvalidArgument("must provide deployment if behavior is 'PINNED'")
ShahabT marked this conversation as resolved.
Show resolved Hide resolved
}
case enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE:
if override.GetDeployment() != nil {
return serviceerror.NewInvalidArgument("only provide deployment if behavior is 'PINNED'")
}
if override.GetPinnedVersion() != nil {
if override.GetPinnedVersion() != "" {
return serviceerror.NewInvalidArgument("only provide pinned version if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED:
Expand All @@ -361,23 +384,29 @@ func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {

// FindDeploymentVersionForWorkflowID returns the deployment version that should be used for a
// particular workflow ID based on the versioning info of the task queue. Nil means unversioned.
func FindDeploymentVersionForWorkflowID(versioningInfo *taskqueuepb.TaskQueueVersioningInfo, workflowId string) *deploymentpb.WorkerDeploymentVersion {
if versioningInfo == nil {
return nil // unversioned
func FindDeploymentVersionForWorkflowID(
current *deploymentspb.DeploymentVersionData,
ramping *deploymentspb.DeploymentVersionData,
workflowId string,
) *deploymentspb.WorkerDeploymentVersion {
ramp := ramping.GetRampPercentage()
rampingVersion := ramping.GetVersion()
if rampingVersion.GetBuildId() == "" {
// Ramping to unversioned
rampingVersion = nil
}
ramp := versioningInfo.GetRampingVersionPercentage()
if ramp <= 0 {
// No ramp
return versioningInfo.GetCurrentVersion()
return current.GetVersion()
} else if ramp == 100 {
return versioningInfo.GetRampingVersion()
return rampingVersion
}
// Partial ramp. Decide based on workflow ID
wfRampThreshold := calcRampThreshold(workflowId)
if wfRampThreshold <= float64(ramp) {
return versioningInfo.GetRampingVersion()
return rampingVersion
}
return versioningInfo.GetCurrentVersion()
return current.GetVersion()
}

// calcRampThreshold returns a number in [0, 100) that is deterministically calculated based on the
Expand All @@ -391,9 +420,9 @@ func calcRampThreshold(id string) float64 {
}

//revive:disable-next-line:cognitive-complexity
func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData) *taskqueuepb.TaskQueueVersioningInfo {
func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData) (*deploymentspb.DeploymentVersionData, *deploymentspb.DeploymentVersionData) {
if deployments == nil {
return nil
return nil, nil
}

var current *deploymentspb.DeploymentVersionData
Expand Down Expand Up @@ -426,27 +455,7 @@ func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData
}
}

if current == nil && ramping == nil {
return nil // TODO (Shahab): __unversioned__
}

info := &taskqueuepb.TaskQueueVersioningInfo{
CurrentVersion: current.GetVersion(),
UpdateTime: current.GetRoutingUpdateTime(),
}
if ramping.GetRampPercentage() > 0 {
info.RampingVersionPercentage = ramping.GetRampPercentage()
if ramping.GetVersion().GetBuildId() != "" {
// If version is "" it means it's ramping to unversioned, so we do not set RampingVersion.
// todo (carly): handle unversioned
info.RampingVersion = ramping.GetVersion()
}
if info.GetUpdateTime().AsTime().Before(ramping.GetRoutingUpdateTime().AsTime()) {
info.UpdateTime = ramping.GetRoutingUpdateTime()
}
}

return info
return current, ramping
}

func ValidateTaskVersionDirective(
Expand Down Expand Up @@ -491,21 +500,22 @@ func DirectiveDeployment(directive *taskqueuespb.TaskVersionDirective) *deployme
return directive.GetDeployment()
}

func WorkerDeploymentVersionToString(v *deploymentpb.WorkerDeploymentVersion) string {
func WorkerDeploymentVersionToString(v *deploymentspb.WorkerDeploymentVersion) string {
if v == nil {
return "__unversioned__"
}
return v.GetDeploymentName() + WorkerDeploymentVersionIdDelimiter + v.GetBuildId()
}
func WorkerDeploymentVersionFromString(s string) (*deploymentpb.WorkerDeploymentVersion, error) {

func WorkerDeploymentVersionFromString(s string) (*deploymentspb.WorkerDeploymentVersion, error) {
if s == "__unversioned__" {
return nil, nil
}
before, after, found := strings.Cut(s, WorkerDeploymentVersionIdDelimiter)
if !found {
return nil, fmt.Errorf("expected delimiter %s not found in version string %s", WorkerDeploymentVersionIdDelimiter, s)
}
return &deploymentpb.WorkerDeploymentVersion{
return &deploymentspb.WorkerDeploymentVersion{
DeploymentName: before,
BuildId: after,
}, nil
Expand All @@ -522,7 +532,7 @@ func GenerateDeploymentWorkflowID(deploymentName string) string {
// GenerateVersionWorkflowID is a helper that generates a system accepted
// workflowID which are used in our Worker Deployment Version workflows
func GenerateVersionWorkflowID(deploymentName string, buildID string) string {
escapedVersionString := escapeChar(WorkerDeploymentVersionToString(&deploymentpb.WorkerDeploymentVersion{
escapedVersionString := escapeChar(WorkerDeploymentVersionToString(&deploymentspb.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildId: buildID,
}), WorkerDeploymentVersionWorkflowIDEscape, WorkerDeploymentVersionWorkflowIDDelimeter)
Expand Down
Loading
Loading