Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store mutable state checksum in SQL storage #5649

Merged
merged 3 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/startreedata/pinot-client-go v0.0.0-20230303070132-3b84c28a9e95 // latest release doesn't support pinot v0.12, so use master branch
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20240206193658-7cafd96fa80e
github.com/uber/cadence-idl v0.0.0-20240208213432-2e3c661bfb7c
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240206193658-7cafd96fa80e h1:/Ie1HeCKuIrCMiMqf9I1TxETqUt2x6jeo5aw0+j2yDw=
github.com/uber/cadence-idl v0.0.0-20240206193658-7cafd96fa80e/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240208213432-2e3c661bfb7c h1:PSZBXcnbY0+cDeAZ9GocEEJgX+ZBvLYDL7qdes9/bfg=
github.com/uber/cadence-idl v0.0.0-20240208213432-2e3c661bfb7c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
11 changes: 8 additions & 3 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ type (
SignalRequestedIDs map[string]struct{}
BufferedEvents []*DataBlob

Checksum checksum.Checksum
// Checksum field is used by Cassandra storage
// ChecksumData is used by All SQL storage
Checksum checksum.Checksum
ChecksumData *DataBlob
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
}

// InternalActivityInfo details for Persistence Interface
Expand Down Expand Up @@ -465,7 +468,8 @@ type (

Condition int64

Checksum checksum.Checksum
Checksum checksum.Checksum
ChecksumData *DataBlob
}

// InternalWorkflowSnapshot is used as generic workflow execution state snapshot for Persistence Interface
Expand All @@ -489,7 +493,8 @@ type (

Condition int64

Checksum checksum.Checksum
Checksum checksum.Checksum
ChecksumData *DataBlob
}

// InternalAppendHistoryEventsRequest is used to append new events to workflow execution history for Persistence Interface
Expand Down
26 changes: 22 additions & 4 deletions common/persistence/executionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func (m *executionManagerImpl) GetWorkflowExecution(
newResponse.State.VersionHistories = versionHistories
newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)

if len(newResponse.State.Checksum.Value) == 0 {
newResponse.State.Checksum, err = m.serializer.DeserializeChecksum(response.State.ChecksumData)
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the err indicate it's checksum deserialization that failed or will it be just "invalid character etc." kind of error? If so maybe wrap the err with more context

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serializer provides the context.

}
}

return newResponse, nil
}

Expand Down Expand Up @@ -635,6 +642,10 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
if err != nil {
return nil, err
}
checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here about err message

}

return &InternalWorkflowMutation{
ExecutionInfo: serializedExecutionInfo,
Expand Down Expand Up @@ -662,8 +673,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

Condition: input.Condition,
Checksum: input.Checksum,
Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
}, nil
}

Expand Down Expand Up @@ -702,6 +714,11 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
return nil, err
}

checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
if err != nil {
return nil, err
}

return &InternalWorkflowSnapshot{
ExecutionInfo: serializedExecutionInfo,
VersionHistories: serializedVersionHistories,
Expand All @@ -720,8 +737,9 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

Condition: input.Condition,
Checksum: input.Checksum,
Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ func (s *ExecutionManagerSuite) newRandomChecksum() checksum.Checksum {
}

func (s *ExecutionManagerSuite) assertChecksumsEqual(expected checksum.Checksum, actual checksum.Checksum) {
if !actual.Flavor.IsValid() {
// not all stores support checksum persistence today
// if its not supported, assert that everything is zero'd out
expected = checksum.Checksum{}
}
s.EqualValues(expected, actual)
}

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,7 @@ func (s *TestBase) UpdateWorkflowExecutionForBufferEvents(
Condition: condition,
ClearBufferedEvents: clearBufferedEvents,
VersionHistories: versionHistories,
Checksum: testWorkflowChecksum,
},
RangeID: s.ShardInfo.RangeID,
Encoding: pickRandomEncoding(),
Expand Down Expand Up @@ -1307,6 +1308,7 @@ func (s *TestBase) UpdateAllMutableState(ctx context.Context, updatedMutableStat
UpsertSignalInfos: sInfos,
UpsertSignalRequestedIDs: srIDs,
VersionHistories: updatedMutableState.VersionHistories,
Checksum: updatedMutableState.Checksum,
},
Encoding: pickRandomEncoding(),
})
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/serialization/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,22 @@ func (w *WorkflowExecutionInfo) GetPartitionConfig() (o map[string]string) {
return
}

// GetCheckSum internal sql blob getter
func (w *WorkflowExecutionInfo) GetChecksum() (o []byte) {
if w != nil {
return w.Checksum
}
return
}

// GetCheckSumEncoding internal sql blob getter
func (w *WorkflowExecutionInfo) GetChecksumEncoding() (o string) {
if w != nil {
return w.ChecksumEncoding
}
return
}

// GetVersion internal sql blob getter
func (a *ActivityInfo) GetVersion() (o int64) {
if a != nil {
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/serialization/getters_fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ var expectedNil = map[string]map[string]any{
"GetVersionHistoriesEncoding": "",
"GetWorkflowTimeout": time.Duration(0),
"GetWorkflowTypeName": "",
"GetChecksum": []uint8(nil),
"GetChecksumEncoding": "",
},
"*serialization.TimerTaskInfo": {
"GetDomainID": []uint8(nil),
Expand Down Expand Up @@ -320,6 +322,8 @@ var expectedEmpty = map[string]map[string]any{
"GetVersionHistoriesEncoding": "",
"GetWorkflowTimeout": time.Duration(0),
"GetWorkflowTypeName": "",
"GetChecksum": []uint8(nil),
"GetChecksumEncoding": "",
},
"*serialization.TimerTaskInfo": {
"GetDomainID": []uint8(nil),
Expand Down Expand Up @@ -547,6 +551,8 @@ var expectedNonEmpty = map[string]map[string]any{
"GetVersionHistoriesEncoding": "",
"GetWorkflowTimeout": time.Duration(3),
"GetWorkflowTypeName": "workflowTypeName",
"GetChecksum": []uint8(nil),
"GetChecksumEncoding": "",
},
"*serialization.TimerTaskInfo": {
"GetDomainID": []byte(taskDomainID),
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/serialization/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ type (
VersionHistoriesEncoding string
FirstExecutionRunID UUID
PartitionConfig map[string]string
Checksum []byte
ChecksumEncoding string
}

// ActivityInfo blob in a serialization agnostic format
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/serialization/thrift_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func workflowExecutionInfoToThrift(info *WorkflowExecutionInfo) *sqlblobs.Workfl
VersionHistoriesEncoding: &info.VersionHistoriesEncoding,
FirstExecutionRunID: info.FirstExecutionRunID,
PartitionConfig: info.PartitionConfig,
Checksum: info.Checksum,
ChecksumEncoding: &info.ChecksumEncoding,
}
}

Expand Down Expand Up @@ -325,6 +327,8 @@ func workflowExecutionInfoFromThrift(info *sqlblobs.WorkflowExecutionInfo) *Work
FirstExecutionRunID: info.FirstExecutionRunID,
PartitionConfig: info.PartitionConfig,
IsCron: info.GetCronSchedule() != "",
Checksum: info.Checksum,
ChecksumEncoding: info.GetChecksumEncoding(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/persistence/serialization/thrift_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ func TestWorkflowExecutionInfo(t *testing.T) {
VersionHistoriesEncoding: "VersionHistoriesEncoding",
FirstExecutionRunID: UUID(uuid.New()),
PartitionConfig: map[string]string{"zone": "dca1"},
Checksum: []byte("Checksum"),
ChecksumEncoding: "ChecksumEncoding",
}
actual := workflowExecutionInfoFromThrift(workflowExecutionInfoToThrift(expected))
assert.Equal(t, expected.ParentDomainID, actual.ParentDomainID)
Expand Down Expand Up @@ -293,6 +295,8 @@ func TestWorkflowExecutionInfo(t *testing.T) {
assert.True(t, (expected.RetryExpiration-actual.RetryExpiration) < time.Second)
assert.Equal(t, expected.FirstExecutionRunID, actual.FirstExecutionRunID)
assert.Equal(t, expected.PartitionConfig, actual.PartitionConfig)
assert.Equal(t, expected.Checksum, actual.Checksum)
assert.Equal(t, expected.ChecksumEncoding, actual.ChecksumEncoding)
}

func TestActivityInfo(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions common/persistence/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/cadence/.gen/go/replicator"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
Expand Down Expand Up @@ -81,6 +82,10 @@ type (
// serialize/deserialize async workflow configuration
SerializeAsyncWorkflowsConfig(config *types.AsyncWorkflowConfiguration, encodingType common.EncodingType) (*DataBlob, error)
DeserializeAsyncWorkflowsConfig(data *DataBlob) (*types.AsyncWorkflowConfiguration, error)

// serialize/deserialize checksum
SerializeChecksum(sum checksum.Checksum, encodingType common.EncodingType) (*DataBlob, error)
DeserializeChecksum(data *DataBlob) (checksum.Checksum, error)
}

// CadenceSerializationError is an error type for cadence serialization
Expand Down Expand Up @@ -298,6 +303,27 @@ func (t *serializerImpl) DeserializeAsyncWorkflowsConfig(data *DataBlob) (*types
return &cfg, err
}

func (t *serializerImpl) SerializeChecksum(sum checksum.Checksum, encodingType common.EncodingType) (*DataBlob, error) {
return t.serialize(sum, encodingType)
}

func (t *serializerImpl) DeserializeChecksum(data *DataBlob) (checksum.Checksum, error) {
if data == nil {
return checksum.Checksum{}, nil
}

var sum checksum.Checksum
if len(data.Data) == 0 {
return sum, nil
}

err := t.deserialize(data, &sum)
if err != nil {
return checksum.Checksum{}, err
}
return sum, err
}

func (t *serializerImpl) serialize(input interface{}, encodingType common.EncodingType) (*DataBlob, error) {
if input == nil {
return nil, nil
Expand Down
26 changes: 26 additions & 0 deletions common/persistence/serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/types"
)
Expand Down Expand Up @@ -210,13 +211,30 @@ func TestSerializers(t *testing.T) {
return serializer.DeserializeAsyncWorkflowsConfig(data)
},
},
{
name: "checksum",
payloads: map[string]any{
"normal": generateChecksum(),
},
serializeFn: func(payload any, encoding common.EncodingType) (*DataBlob, error) {
return serializer.SerializeChecksum(payload.(checksum.Checksum), encoding)
},
deserializeFn: func(data *DataBlob) (any, error) {
return serializer.DeserializeChecksum(data)
},
},
}

// generate runnable test cases here so actual test body is not 3 level nested
var runnableTests []runnableTest
for _, td := range tests {
for encoding, supported := range encodingTypes {
for payloadName, payload := range td.payloads {
if _, ok := payload.(checksum.Checksum); ok {
if encoding != common.EncodingTypeJSON {
continue
}
}
runnableTests = append(runnableTests, runnableTest{
testDef: td,
encoding: encoding,
Expand Down Expand Up @@ -468,3 +486,11 @@ func generateAsyncWorkflowConfig() *types.AsyncWorkflowConfiguration {
},
}
}

func generateChecksum() checksum.Checksum {
return checksum.Checksum{
Flavor: checksum.FlavorIEEECRC32OverThriftBinary,
Version: 1,
Value: []byte("test-checksum"),
}
}
7 changes: 7 additions & 0 deletions common/persistence/sql/sql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,13 @@ func (m *sqlExecutionStore) populateWorkflowMutableState(
)
}

if info.GetChecksum() != nil {
state.ChecksumData = p.NewDataBlob(
info.GetChecksum(),
common.EncodingType(info.GetChecksumEncoding()),
)
}

return state, nil
}

Expand Down
Loading