Skip to content

Commit

Permalink
Fix data conversion from serialization.WorkflowExecutionInfo to persi…
Browse files Browse the repository at this point in the history
…stence.InternalWorkflowExecutionInfo (cadence-workflow#4758)
  • Loading branch information
Shaddoll authored Mar 2, 2022
1 parent 6980508 commit f1a0983
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 145 deletions.
174 changes: 174 additions & 0 deletions common/persistence/serialization/persistence_mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// The MIT License (MIT)
//
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package serialization

import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
)

func ToInternalWorkflowExecutionInfo(info *WorkflowExecutionInfo) *persistence.InternalWorkflowExecutionInfo {
result := &persistence.InternalWorkflowExecutionInfo{
CompletionEventBatchID: common.EmptyEventID,
TaskList: info.GetTaskList(),
WorkflowTypeName: info.GetWorkflowTypeName(),
WorkflowTimeout: info.GetWorkflowTimeout(),
DecisionStartToCloseTimeout: info.GetDecisionTaskTimeout(),
ExecutionContext: info.GetExecutionContext(),
State: int(info.GetState()),
CloseStatus: int(info.GetCloseStatus()),
LastFirstEventID: info.GetLastFirstEventID(),
LastEventTaskID: info.GetLastEventTaskID(),
LastProcessedEvent: info.GetLastProcessedEvent(),
StartTimestamp: info.GetStartTimestamp(),
LastUpdatedTimestamp: info.GetLastUpdatedTimestamp(),
CreateRequestID: info.GetCreateRequestID(),
SignalCount: int32(info.GetSignalCount()),
DecisionVersion: info.GetDecisionVersion(),
DecisionScheduleID: info.GetDecisionScheduleID(),
DecisionStartedID: info.GetDecisionStartedID(),
DecisionRequestID: info.GetDecisionRequestID(),
DecisionTimeout: info.GetDecisionTimeout(),
DecisionAttempt: info.GetDecisionAttempt(),
DecisionStartedTimestamp: info.GetDecisionStartedTimestamp(),
DecisionScheduledTimestamp: info.GetDecisionScheduledTimestamp(),
DecisionOriginalScheduledTimestamp: info.GetDecisionOriginalScheduledTimestamp(),
StickyTaskList: info.GetStickyTaskList(),
StickyScheduleToStartTimeout: info.GetStickyScheduleToStartTimeout(),
ClientLibraryVersion: info.GetClientLibraryVersion(),
ClientFeatureVersion: info.GetClientFeatureVersion(),
ClientImpl: info.GetClientImpl(),
Attempt: int32(info.GetRetryAttempt()),
HasRetryPolicy: info.GetHasRetryPolicy(),
InitialInterval: info.GetRetryInitialInterval(),
BackoffCoefficient: info.GetRetryBackoffCoefficient(),
MaximumInterval: info.GetRetryMaximumInterval(),
ExpirationTime: info.GetRetryExpirationTimestamp(),
MaximumAttempts: info.GetRetryMaximumAttempts(),
NonRetriableErrors: info.GetRetryNonRetryableErrors(),
BranchToken: info.GetEventBranchToken(),
CronSchedule: info.GetCronSchedule(),
ExpirationInterval: info.GetRetryExpiration(),
Memo: info.GetMemo(),
SearchAttributes: info.GetSearchAttributes(),
HistorySize: info.GetHistorySize(),
}
if info.ParentDomainID != nil {
result.ParentDomainID = info.ParentDomainID.String()
result.ParentWorkflowID = info.GetParentWorkflowID()
result.ParentRunID = info.ParentRunID.String()
result.InitiatedID = info.GetInitiatedID()
}

if info.GetCancelRequested() {
result.CancelRequested = true
result.CancelRequestID = info.GetCancelRequestID()
}

if info.CompletionEventBatchID != nil {
result.CompletionEventBatchID = info.GetCompletionEventBatchID()
}

if info.CompletionEvent != nil {
result.CompletionEvent = persistence.NewDataBlob(info.CompletionEvent,
common.EncodingType(info.GetCompletionEventEncoding()))
}

if info.AutoResetPoints != nil {
result.AutoResetPoints = persistence.NewDataBlob(info.AutoResetPoints,
common.EncodingType(info.GetAutoResetPointsEncoding()))
}
return result
}

func FromInternalWorkflowExecutionInfo(executionInfo *persistence.InternalWorkflowExecutionInfo) *WorkflowExecutionInfo {
info := &WorkflowExecutionInfo{
TaskList: executionInfo.TaskList,
WorkflowTypeName: executionInfo.WorkflowTypeName,
WorkflowTimeout: executionInfo.WorkflowTimeout,
DecisionTaskTimeout: executionInfo.DecisionStartToCloseTimeout,
ExecutionContext: executionInfo.ExecutionContext,
State: int32(executionInfo.State),
CloseStatus: int32(executionInfo.CloseStatus),
LastFirstEventID: executionInfo.LastFirstEventID,
LastEventTaskID: executionInfo.LastEventTaskID,
LastProcessedEvent: executionInfo.LastProcessedEvent,
StartTimestamp: executionInfo.StartTimestamp,
LastUpdatedTimestamp: executionInfo.LastUpdatedTimestamp,
CreateRequestID: executionInfo.CreateRequestID,
DecisionVersion: executionInfo.DecisionVersion,
DecisionScheduleID: executionInfo.DecisionScheduleID,
DecisionStartedID: executionInfo.DecisionStartedID,
DecisionRequestID: executionInfo.DecisionRequestID,
DecisionTimeout: executionInfo.DecisionTimeout,
DecisionAttempt: executionInfo.DecisionAttempt,
DecisionStartedTimestamp: executionInfo.DecisionStartedTimestamp,
DecisionScheduledTimestamp: executionInfo.DecisionScheduledTimestamp,
DecisionOriginalScheduledTimestamp: executionInfo.DecisionOriginalScheduledTimestamp,
StickyTaskList: executionInfo.StickyTaskList,
StickyScheduleToStartTimeout: executionInfo.StickyScheduleToStartTimeout,
ClientLibraryVersion: executionInfo.ClientLibraryVersion,
ClientFeatureVersion: executionInfo.ClientFeatureVersion,
ClientImpl: executionInfo.ClientImpl,
SignalCount: int64(executionInfo.SignalCount),
HistorySize: executionInfo.HistorySize,
CronSchedule: executionInfo.CronSchedule,
CompletionEventBatchID: &executionInfo.CompletionEventBatchID,
HasRetryPolicy: executionInfo.HasRetryPolicy,
RetryAttempt: int64(executionInfo.Attempt),
RetryInitialInterval: executionInfo.InitialInterval,
RetryBackoffCoefficient: executionInfo.BackoffCoefficient,
RetryMaximumInterval: executionInfo.MaximumInterval,
RetryMaximumAttempts: executionInfo.MaximumAttempts,
RetryExpiration: executionInfo.ExpirationInterval,
RetryExpirationTimestamp: executionInfo.ExpirationTime,
RetryNonRetryableErrors: executionInfo.NonRetriableErrors,
EventStoreVersion: persistence.EventStoreVersion,
EventBranchToken: executionInfo.BranchToken,
AutoResetPoints: executionInfo.AutoResetPoints.Data,
AutoResetPointsEncoding: string(executionInfo.AutoResetPoints.GetEncoding()),
SearchAttributes: executionInfo.SearchAttributes,
Memo: executionInfo.Memo,
CompletionEventEncoding: string(common.EncodingTypeEmpty),
VersionHistoriesEncoding: string(common.EncodingTypeEmpty),
InitiatedID: common.EmptyEventID,
}

if executionInfo.CompletionEvent != nil {
info.CompletionEvent = executionInfo.CompletionEvent.Data
info.CompletionEventEncoding = string(executionInfo.CompletionEvent.Encoding)
}

if executionInfo.ParentDomainID != "" {
info.ParentDomainID = MustParseUUID(executionInfo.ParentDomainID)
info.ParentWorkflowID = executionInfo.ParentWorkflowID
info.ParentRunID = MustParseUUID(executionInfo.ParentRunID)
info.InitiatedID = executionInfo.InitiatedID
}

if executionInfo.CancelRequested {
info.CancelRequested = true
info.CancelRequestID = executionInfo.CancelRequestID
}
return info
}
143 changes: 143 additions & 0 deletions common/persistence/serialization/persistence_mapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package serialization

import (
"math/rand"
"testing"
"time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
)

func TestInternalWorkflowExecutionInfo(t *testing.T) {
expected := &persistence.InternalWorkflowExecutionInfo{
ParentDomainID: uuid.New(),
ParentWorkflowID: "ParentWorkflowID",
ParentRunID: uuid.New(),
InitiatedID: int64(rand.Intn(1000)),
CompletionEventBatchID: int64(rand.Intn(1000)),
CompletionEvent: persistence.NewDataBlob([]byte(`CompletionEvent`), common.EncodingTypeJSON),
TaskList: "TaskList",
WorkflowTypeName: "WorkflowTypeName",
WorkflowTimeout: time.Minute * time.Duration(rand.Intn(10)),
DecisionStartToCloseTimeout: time.Minute * time.Duration(rand.Intn(10)),
ExecutionContext: []byte("ExecutionContext"),
State: rand.Intn(1000),
CloseStatus: rand.Intn(1000),
LastFirstEventID: int64(rand.Intn(1000)),
LastEventTaskID: int64(rand.Intn(1000)),
LastProcessedEvent: int64(rand.Intn(1000)),
StartTimestamp: time.Now(),
LastUpdatedTimestamp: time.Now(),
CreateRequestID: "CreateRequestID",
SignalCount: int32(rand.Intn(1000)),
DecisionVersion: int64(rand.Intn(1000)),
DecisionScheduleID: int64(rand.Intn(1000)),
DecisionStartedID: int64(rand.Intn(1000)),
DecisionRequestID: "DecisionRequestID",
DecisionTimeout: time.Minute * time.Duration(rand.Intn(10)),
DecisionAttempt: int64(rand.Intn(1000)),
DecisionStartedTimestamp: time.Now(),
DecisionScheduledTimestamp: time.Now(),
DecisionOriginalScheduledTimestamp: time.Now(),
CancelRequested: true,
CancelRequestID: "CancelRequestID",
StickyTaskList: "StickyTaskList",
StickyScheduleToStartTimeout: time.Minute * time.Duration(rand.Intn(10)),
ClientLibraryVersion: "ClientLibraryVersion",
ClientFeatureVersion: "ClientFeatureVersion",
ClientImpl: "ClientImpl",
AutoResetPoints: persistence.NewDataBlob([]byte("AutoResetPoints"), common.EncodingTypeJSON),
Attempt: int32(rand.Intn(1000)),
HasRetryPolicy: true,
InitialInterval: time.Minute * time.Duration(rand.Intn(10)),
BackoffCoefficient: rand.Float64() * 1000,
MaximumInterval: time.Minute * time.Duration(rand.Intn(10)),
ExpirationTime: time.Now(),
MaximumAttempts: int32(rand.Intn(1000)),
NonRetriableErrors: []string{"RetryNonRetryableErrors"},
BranchToken: []byte("EventBranchToken"),
CronSchedule: "CronSchedule",
ExpirationInterval: time.Minute * time.Duration(rand.Intn(10)),
Memo: map[string][]byte{"key_1": []byte("Memo")},
SearchAttributes: map[string][]byte{"key_1": []byte("SearchAttributes")},
HistorySize: int64(rand.Intn(1000)),
}
actual := ToInternalWorkflowExecutionInfo(FromInternalWorkflowExecutionInfo(expected))
assert.Equal(t, expected.ParentDomainID, actual.ParentDomainID)
assert.Equal(t, expected.ParentWorkflowID, actual.ParentWorkflowID)
assert.Equal(t, expected.ParentRunID, actual.ParentRunID)
assert.Equal(t, expected.InitiatedID, actual.InitiatedID)
assert.Equal(t, expected.CompletionEventBatchID, actual.CompletionEventBatchID)
assert.Equal(t, expected.CompletionEvent, actual.CompletionEvent)
assert.Equal(t, expected.TaskList, actual.TaskList)
assert.Equal(t, expected.WorkflowTypeName, actual.WorkflowTypeName)
assert.True(t, (expected.WorkflowTimeout-actual.WorkflowTimeout) < time.Second)
assert.True(t, (expected.DecisionStartToCloseTimeout-actual.DecisionStartToCloseTimeout) < time.Second)
assert.Equal(t, expected.ExecutionContext, actual.ExecutionContext)
assert.Equal(t, expected.State, actual.State)
assert.Equal(t, expected.CloseStatus, actual.CloseStatus)
assert.Equal(t, expected.LastFirstEventID, actual.LastFirstEventID)
assert.Equal(t, expected.LastEventTaskID, actual.LastEventTaskID)
assert.Equal(t, expected.LastProcessedEvent, actual.LastProcessedEvent)
assert.Equal(t, expected.StartTimestamp.Sub(actual.StartTimestamp), time.Duration(0))
assert.Equal(t, expected.LastUpdatedTimestamp.Sub(actual.LastUpdatedTimestamp), time.Duration(0))
assert.Equal(t, expected.CreateRequestID, actual.CreateRequestID)
assert.Equal(t, expected.SignalCount, actual.SignalCount)
assert.Equal(t, expected.DecisionVersion, actual.DecisionVersion)
assert.Equal(t, expected.DecisionScheduleID, actual.DecisionScheduleID)
assert.Equal(t, expected.DecisionStartedID, actual.DecisionStartedID)
assert.Equal(t, expected.DecisionRequestID, actual.DecisionRequestID)
assert.True(t, (expected.DecisionTimeout-actual.DecisionTimeout) < time.Second)
assert.Equal(t, expected.DecisionAttempt, actual.DecisionAttempt)
assert.Equal(t, expected.DecisionStartedTimestamp.Sub(actual.DecisionStartedTimestamp), time.Duration(0))
assert.Equal(t, expected.DecisionScheduledTimestamp.Sub(actual.DecisionScheduledTimestamp), time.Duration(0))
assert.Equal(t, expected.DecisionOriginalScheduledTimestamp.Sub(actual.DecisionOriginalScheduledTimestamp), time.Duration(0))
assert.Equal(t, expected.CancelRequested, actual.CancelRequested)
assert.Equal(t, expected.CancelRequestID, actual.CancelRequestID)
assert.Equal(t, expected.StickyTaskList, actual.StickyTaskList)
assert.True(t, (expected.StickyScheduleToStartTimeout-actual.StickyScheduleToStartTimeout) < time.Second)
assert.Equal(t, expected.ClientLibraryVersion, actual.ClientLibraryVersion)
assert.Equal(t, expected.ClientFeatureVersion, actual.ClientFeatureVersion)
assert.Equal(t, expected.ClientImpl, actual.ClientImpl)
assert.Equal(t, expected.AutoResetPoints, actual.AutoResetPoints)
assert.Equal(t, expected.Attempt, actual.Attempt)
assert.Equal(t, expected.HasRetryPolicy, actual.HasRetryPolicy)
assert.True(t, (expected.InitialInterval-actual.InitialInterval) < time.Second)
assert.Equal(t, expected.BackoffCoefficient, actual.BackoffCoefficient)
assert.True(t, (expected.MaximumInterval-actual.MaximumInterval) < time.Second)
assert.Equal(t, expected.ExpirationTime.Sub(actual.ExpirationTime), time.Duration(0))
assert.Equal(t, expected.MaximumAttempts, actual.MaximumAttempts)
assert.Equal(t, expected.NonRetriableErrors, actual.NonRetriableErrors)
assert.Equal(t, expected.BranchToken, actual.BranchToken)
assert.Equal(t, expected.CronSchedule, actual.CronSchedule)
assert.True(t, (expected.ExpirationInterval-actual.ExpirationInterval) < time.Second)
assert.Equal(t, expected.Memo, actual.Memo)
assert.Equal(t, expected.SearchAttributes, actual.SearchAttributes)
assert.Equal(t, expected.HistorySize, actual.HistorySize)
}
Loading

0 comments on commit f1a0983

Please sign in to comment.