diff --git a/client/clientBean.go b/client/clientBean.go index fc3b8757f86..13eee22699f 100644 --- a/client/clientBean.go +++ b/client/clientBean.go @@ -86,14 +86,14 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust remoteAdminClients := map[string]admin.Client{} remoteFrontendClients := map[string]frontend.Client{} - for cluster, address := range clusterMetadata.GetAllClientAddress() { - dispatcher, err := dispatcherProvider.Get(address.RPCName, address.RPCAddress) + for cluster, info := range clusterMetadata.GetAllClusterInfo() { + dispatcher, err := dispatcherProvider.Get(info.RPCName, info.RPCAddress) if err != nil { return nil, err } adminClient, err := factory.NewAdminClientWithTimeoutAndDispatcher( - address.RPCName, + info.RPCName, admin.DefaultTimeout, dispatcher, ) @@ -102,7 +102,7 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust } frontendclient, err := factory.NewFrontendClientWithTimeoutAndDispatcher( - address.RPCName, + info.RPCName, frontend.DefaultTimeout, frontend.DefaultLongPollTimeout, dispatcher, diff --git a/cmd/server/server.go b/cmd/server/server.go index 1d4966b9b29..ff41f0c3f14 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -24,10 +24,11 @@ import ( "log" "time" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/client" "github.com/uber/cadence/common" "github.com/uber/cadence/common/blobstore/filestore" - "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/log/tag" @@ -130,15 +131,19 @@ func (s *server) startService() common.Daemon { params.DCRedirectionPolicy = s.cfg.DCRedirectionPolicy params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) + clustersInformation := s.cfg.ClustersInformation + // TODO remove when ClustersInfo is fully deprecated + if len(s.cfg.ClustersInfo.CurrentClusterName) != 0 { + clustersInformation = s.cfg.ClustersInfo.ToClusterInformation() + } params.ClusterMetadata = cluster.NewMetadata( params.Logger, params.MetricsClient, enableGlobalDomain, - s.cfg.ClustersInfo.FailoverVersionIncrement, - s.cfg.ClustersInfo.MasterClusterName, - s.cfg.ClustersInfo.CurrentClusterName, - s.cfg.ClustersInfo.ClusterInitialFailoverVersions, - s.cfg.ClustersInfo.ClusterAddress, + clustersInformation.VersionIncrement, + clustersInformation.MasterClusterName, + clustersInformation.CurrentClusterName, + clustersInformation.ClusterInformation, archivalStatus, s.cfg.Archival.DefaultBucket, enableReadFromArchival, diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 38d309351f6..bd58c1f5fd2 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -150,12 +150,17 @@ func newDomainCacheEntry(clusterMetadata cluster.Metadata) *DomainCacheEntry { } // NewDomainCacheEntryWithReplicationForTest returns an entry with test data -func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo, config *persistence.DomainConfig, repConfig *persistence.DomainReplicationConfig, clusterMetadata cluster.Metadata) *DomainCacheEntry { +func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo, + config *persistence.DomainConfig, + repConfig *persistence.DomainReplicationConfig, + failoverVersion int64, + clusterMetadata cluster.Metadata) *DomainCacheEntry { return &DomainCacheEntry{ info: info, config: config, isGlobalDomain: true, replicationConfig: repConfig, + failoverVersion: failoverVersion, clusterMetadata: clusterMetadata, } } diff --git a/common/cluster/metadata.go b/common/cluster/metadata.go index bc0995debb2..346b8d79564 100644 --- a/common/cluster/metadata.go +++ b/common/cluster/metadata.go @@ -22,6 +22,7 @@ package cluster import ( "fmt" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -45,12 +46,10 @@ type ( GetMasterClusterName() string // GetCurrentClusterName return the current cluster name GetCurrentClusterName() string - // GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version - GetAllClusterFailoverVersions() map[string]int64 + // GetAllClusterInfo return the all cluster name -> corresponding info + GetAllClusterInfo() map[string]config.ClusterInformation // ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version ClusterNameForFailoverVersion(failoverVersion int64) string - // GetAllClientAddress return the frontend address for each cluster name - GetAllClientAddress() map[string]config.Address // ArchivalConfig returns the archival config of the cluster ArchivalConfig() *ArchivalConfig @@ -62,17 +61,17 @@ type ( // EnableGlobalDomain whether the global domain is enabled, // this attr should be discarded when cross DC is made public enableGlobalDomain dynamicconfig.BoolPropertyFn - // failoverVersionIncrement is the increment of each cluster failover version - failoverVersionIncrement int64 + // versionIncrement is the increment of each cluster's version when failover happen + versionIncrement int64 // masterClusterName is the name of the master cluster, only the master cluster can register / update domain // all clusters can do domain failover masterClusterName string // currentClusterName is the name of the current cluster currentClusterName string - // clusterInitialFailoverVersions contains all cluster name -> corresponding initial failover version - clusterInitialFailoverVersions map[string]int64 - // clusterInitialFailoverVersions contains all initial failover version -> corresponding cluster name - initialFailoverVersionClusters map[int64]string + // clusterInfo contains all cluster name -> corresponding information + clusterInfo map[string]config.ClusterInformation + // versionToClusterName contains all initial version -> corresponding cluster name + versionToClusterName map[int64]string // clusterToAddress contains the cluster name to corresponding frontend client clusterToAddress map[string]config.Address @@ -90,53 +89,52 @@ func NewMetadata( logger log.Logger, metricsClient metrics.Client, enableGlobalDomain dynamicconfig.BoolPropertyFn, - failoverVersionIncrement int64, + versionIncrement int64, masterClusterName string, currentClusterName string, - clusterInitialFailoverVersions map[string]int64, - clusterToAddress map[string]config.Address, + clusterInfo map[string]config.ClusterInformation, archivalStatus dynamicconfig.StringPropertyFn, defaultBucket string, enableReadFromArchival dynamicconfig.BoolPropertyFn, ) Metadata { - if len(clusterInitialFailoverVersions) == 0 { - panic("Empty initial failover versions for cluster") + if len(clusterInfo) == 0 { + panic("Empty cluster information") } else if len(masterClusterName) == 0 { panic("Master cluster name is empty") } else if len(currentClusterName) == 0 { panic("Current cluster name is empty") + } else if versionIncrement == 0 { + panic("Version increment is 0") } - initialFailoverVersionClusters := make(map[int64]string) - for clusterName, initialFailoverVersion := range clusterInitialFailoverVersions { - if failoverVersionIncrement <= initialFailoverVersion { + + versionToClusterName := make(map[int64]string) + for clusterName, info := range clusterInfo { + if versionIncrement <= info.InitialVersion || info.InitialVersion < 0 { panic(fmt.Sprintf( - "Failover version increment %v is smaller than initial value: %v.", - failoverVersionIncrement, - clusterInitialFailoverVersions, + "Version increment %v is smaller than initial version: %v.", + versionIncrement, + info.InitialVersion, )) } if len(clusterName) == 0 { panic("Cluster name in all cluster names is empty") } - initialFailoverVersionClusters[initialFailoverVersion] = clusterName + versionToClusterName[info.InitialVersion] = clusterName + + if info.Enabled && (len(info.RPCName) == 0 || len(info.RPCAddress) == 0) { + panic(fmt.Sprintf("Cluster %v: rpc name / address is empty", clusterName)) + } } - if _, ok := clusterInitialFailoverVersions[currentClusterName]; !ok { - panic("Current cluster is not specified in all cluster names") + if _, ok := clusterInfo[currentClusterName]; !ok { + panic("Current cluster is not specified in cluster info") } - if _, ok := clusterInitialFailoverVersions[masterClusterName]; !ok { - panic("Master cluster is not specified in all cluster names") + if _, ok := clusterInfo[masterClusterName]; !ok { + panic("Master cluster is not specified in cluster info") } - if len(initialFailoverVersionClusters) != len(clusterInitialFailoverVersions) { - panic("Cluster to initial failover versions have duplicate initial versions") - } - - // only check whether a cluster in cluster -> initial failover versions exists in cluster -> address - for clusterName := range clusterInitialFailoverVersions { - if _, ok := clusterToAddress[clusterName]; !ok { - panic("Cluster -> initial failover version does not have an address") - } + if len(versionToClusterName) != len(clusterInfo) { + panic("Cluster info initial versions have duplicates") } status, err := getArchivalStatus(archivalStatus()) @@ -149,18 +147,17 @@ func NewMetadata( } return &metadataImpl{ - logger: logger, - metricsClient: metricsClient, - enableGlobalDomain: enableGlobalDomain, - failoverVersionIncrement: failoverVersionIncrement, - masterClusterName: masterClusterName, - currentClusterName: currentClusterName, - clusterInitialFailoverVersions: clusterInitialFailoverVersions, - initialFailoverVersionClusters: initialFailoverVersionClusters, - clusterToAddress: clusterToAddress, - archivalStatus: archivalStatus, - defaultBucket: defaultBucket, - enableReadFromArchival: enableReadFromArchival, + logger: logger, + metricsClient: metricsClient, + enableGlobalDomain: enableGlobalDomain, + versionIncrement: versionIncrement, + masterClusterName: masterClusterName, + currentClusterName: currentClusterName, + clusterInfo: clusterInfo, + versionToClusterName: versionToClusterName, + archivalStatus: archivalStatus, + defaultBucket: defaultBucket, + enableReadFromArchival: enableReadFromArchival, } } @@ -172,24 +169,24 @@ func (metadata *metadataImpl) IsGlobalDomainEnabled() bool { // GetNextFailoverVersion return the next failover version based on input func (metadata *metadataImpl) GetNextFailoverVersion(cluster string, currentFailoverVersion int64) int64 { - initialFailoverVersion, ok := metadata.clusterInitialFailoverVersions[cluster] + info, ok := metadata.clusterInfo[cluster] if !ok { panic(fmt.Sprintf( "Unknown cluster name: %v with given cluster initial failover version map: %v.", cluster, - metadata.clusterInitialFailoverVersions, + metadata.clusterInfo, )) } - failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + initialFailoverVersion + failoverVersion := currentFailoverVersion/metadata.versionIncrement*metadata.versionIncrement + info.InitialVersion if failoverVersion < currentFailoverVersion { - return failoverVersion + metadata.failoverVersionIncrement + return failoverVersion + metadata.versionIncrement } return failoverVersion } // IsVersionFromSameCluster return true if 2 version are used for the same cluster func (metadata *metadataImpl) IsVersionFromSameCluster(version1 int64, version2 int64) bool { - return (version1-version2)%metadata.failoverVersionIncrement == 0 + return (version1-version2)%metadata.versionIncrement == 0 } func (metadata *metadataImpl) IsMasterCluster() bool { @@ -206,31 +203,26 @@ func (metadata *metadataImpl) GetCurrentClusterName() string { return metadata.currentClusterName } -// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version -func (metadata *metadataImpl) GetAllClusterFailoverVersions() map[string]int64 { - return metadata.clusterInitialFailoverVersions +// GetAllClusterInfo return the all cluster name -> corresponding information +func (metadata *metadataImpl) GetAllClusterInfo() map[string]config.ClusterInformation { + return metadata.clusterInfo } // ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int64) string { - initialFailoverVersion := failoverVersion % metadata.failoverVersionIncrement - clusterName, ok := metadata.initialFailoverVersionClusters[initialFailoverVersion] + initialVersion := failoverVersion % metadata.versionIncrement + clusterName, ok := metadata.versionToClusterName[initialVersion] if !ok { panic(fmt.Sprintf( "Unknown initial failover version %v with given cluster initial failover version map: %v and failover version increment %v.", - initialFailoverVersion, - metadata.clusterInitialFailoverVersions, - metadata.failoverVersionIncrement, + initialVersion, + metadata.clusterInfo, + metadata.versionIncrement, )) } return clusterName } -// GetAllClientAddress return the frontend address for each cluster name -func (metadata *metadataImpl) GetAllClientAddress() map[string]config.Address { - return metadata.clusterToAddress -} - // ArchivalConfig returns the archival config of the cluster. // This method always return a well formed ArchivalConfig (this means ArchivalConfig().IsValid always returns true). func (metadata *metadataImpl) ArchivalConfig() (retCfg *ArchivalConfig) { diff --git a/common/cluster/metadataTestBase.go b/common/cluster/metadataTestBase.go index fb5f4d684f3..5037f326dc2 100644 --- a/common/cluster/metadataTestBase.go +++ b/common/cluster/metadataTestBase.go @@ -48,26 +48,32 @@ const ( var ( // TestAllClusterNames is the all cluster names used for test TestAllClusterNames = []string{TestCurrentClusterName, TestAlternativeClusterName} - // TestAllClusterFailoverVersions is the same as above, juse convinent for test mocking - TestAllClusterFailoverVersions = map[string]int64{ - TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion, - TestAlternativeClusterName: TestAlternativeClusterInitialFailoverVersion, - } - // TestAllClusterAddress is the same as above, juse convinent for test mocking - TestAllClusterAddress = map[string]config.Address{ - TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress}, - TestAlternativeClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestAlternativeClusterFrontendAddress}, + // TestAllClusterInfo is the same as above, just convenient for test mocking + TestAllClusterInfo = map[string]config.ClusterInformation{ + TestCurrentClusterName: config.ClusterInformation{ + Enabled: true, + InitialVersion: TestCurrentClusterInitialFailoverVersion, + RPCName: common.FrontendServiceName, + RPCAddress: TestCurrentClusterFrontendAddress, + }, + TestAlternativeClusterName: config.ClusterInformation{ + Enabled: true, + InitialVersion: TestAlternativeClusterInitialFailoverVersion, + RPCName: common.FrontendServiceName, + RPCAddress: TestAlternativeClusterFrontendAddress, + }, } // TestSingleDCAllClusterNames is the all cluster names used for test TestSingleDCAllClusterNames = []string{TestCurrentClusterName} - // TestSingleDCAllClusterFailoverVersions is the same as above, juse convinent for test mocking - TestSingleDCAllClusterFailoverVersions = map[string]int64{ - TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion, - } - // TestSingleDCAllClusterAddress is the same as above, juse convinent for test mocking - TestSingleDCAllClusterAddress = map[string]config.Address{ - TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress}, + // TestSingleDCClusterInfo is the same as above, just convenient for test mocking + TestSingleDCClusterInfo = map[string]config.ClusterInformation{ + TestCurrentClusterName: config.ClusterInformation{ + Enabled: true, + InitialVersion: TestCurrentClusterInitialFailoverVersion, + RPCName: common.FrontendServiceName, + RPCAddress: TestCurrentClusterFrontendAddress, + }, } ) @@ -92,8 +98,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl TestFailoverVersionIncrement, masterClusterName, TestCurrentClusterName, - TestAllClusterFailoverVersions, - TestAllClusterAddress, + TestAllClusterInfo, dynamicconfig.GetStringPropertyFn(archivalStatus), clusterDefaultBucket, dynamicconfig.GetBoolPropertyFn(enableArchival), @@ -107,8 +112,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl TestFailoverVersionIncrement, TestCurrentClusterName, TestCurrentClusterName, - TestSingleDCAllClusterFailoverVersions, - TestSingleDCAllClusterAddress, + TestSingleDCClusterInfo, dynamicconfig.GetStringPropertyFn(archivalStatus), clusterDefaultBucket, dynamicconfig.GetBoolPropertyFn(enableArchival), diff --git a/common/messaging/kafkaConfig.go b/common/messaging/kafkaConfig.go index 1c9466cf696..caf3109451d 100644 --- a/common/messaging/kafkaConfig.go +++ b/common/messaging/kafkaConfig.go @@ -78,7 +78,6 @@ func (k *KafkaConfig) Validate(checkCluster bool, checkApp bool) { } for _, topics := range k.ClusterToTopic { validateTopicsFn(topics.Topic) - validateTopicsFn(topics.RetryTopic) validateTopicsFn(topics.DLQTopic) } } diff --git a/common/mocks/ClusterMetadata.go b/common/mocks/ClusterMetadata.go index c971cea51e0..5f34ab17171 100644 --- a/common/mocks/ClusterMetadata.go +++ b/common/mocks/ClusterMetadata.go @@ -21,7 +21,7 @@ package mocks import ( - mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/mock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/service/config" ) @@ -45,16 +45,16 @@ func (_m *ClusterMetadata) ClusterNameForFailoverVersion(failoverVersion int64) return r0 } -// GetAllClusterFailoverVersions provides a mock function with given fields: -func (_m *ClusterMetadata) GetAllClusterFailoverVersions() map[string]int64 { +// GetAllClusterInfo provides a mock function with given fields: +func (_m *ClusterMetadata) GetAllClusterInfo() map[string]config.ClusterInformation { ret := _m.Called() - var r0 map[string]int64 - if rf, ok := ret.Get(0).(func() map[string]int64); ok { + var r0 map[string]config.ClusterInformation + if rf, ok := ret.Get(0).(func() map[string]config.ClusterInformation); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]int64) + r0 = ret.Get(0).(map[string]config.ClusterInformation) } } @@ -159,20 +159,6 @@ func (_m *ClusterMetadata) IsMasterCluster() bool { return r0 } -// GetAllClientAddress provides a mock function with given fields: -func (_m *ClusterMetadata) GetAllClientAddress() map[string]config.Address { - ret := _m.Called() - - var r0 map[string]config.Address - if rf, ok := ret.Get(0).(func() map[string]config.Address); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(map[string]config.Address) - } - - return r0 -} - // ArchivalConfig provides a mock function with given fields: func (_m *ClusterMetadata) ArchivalConfig() *cluster.ArchivalConfig { ret := _m.Called() diff --git a/common/service/config/clusterInformation.go b/common/service/config/clusterInformation.go new file mode 100644 index 00000000000..bd19c063528 --- /dev/null +++ b/common/service/config/clusterInformation.go @@ -0,0 +1,49 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package config + +import "fmt" + +// ToClusterInformation convert deprecated ClustersInfo to ClustersInformation +// Deprecated: pleasee use ClustersInformation +func (c ClustersInfo) ToClusterInformation() ClustersInformation { + clustersInfo := ClustersInformation{} + clustersInfo.EnableGlobalDomain = c.EnableGlobalDomain + clustersInfo.VersionIncrement = c.FailoverVersionIncrement + clustersInfo.MasterClusterName = c.MasterClusterName + clustersInfo.CurrentClusterName = c.CurrentClusterName + clustersInfo.ClusterInformation = map[string]ClusterInformation{} + for k, v := range c.ClusterInitialFailoverVersions { + address, ok := c.ClusterAddress[k] + if !ok { + panic(fmt.Sprintf("unable to find address for cluster %v", k)) + } + + clusterInfo := ClusterInformation{} + clusterInfo.Enabled = true + clusterInfo.InitialVersion = v + clusterInfo.RPCName = address.RPCName + clusterInfo.RPCAddress = address.RPCAddress + clustersInfo.ClusterInformation[k] = clusterInfo + } + + return clustersInfo +} diff --git a/common/service/config/config.go b/common/service/config/config.go index b9799f637e6..e4d06821cf7 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -44,7 +44,10 @@ type ( // Log is the logging config Log Logger `yaml:"log"` // ClustersInfo is the config containing all valid clusters and active cluster + // Deprecated: please use ClusterInformation instead ClustersInfo ClustersInfo `yaml:"clustersInfo"` + // ClustersInformation is the config containing all valid clusters and active cluster + ClustersInformation ClustersInformation `yaml:"clustersInformation"` // DCRedirectionPolicy contains the frontend datacenter redirection policy DCRedirectionPolicy DCRedirectionPolicy `yaml:"dcRedirectionPolicy"` // Services is a map of service name to service config items @@ -219,6 +222,8 @@ type ( } // ClustersInfo contains the all cluster names and active cluster + // + // Deprecated: please use ClusterInformation instead ClustersInfo struct { // EnableGlobalDomain whether the global domain is enabled, this attr should be discarded when // cross DC is made public @@ -237,6 +242,8 @@ type ( } // Address indicate the remote cluster's service name and address + // + // Deprecated: please use ClusterInformation instead Address struct { // RPCName indicate the remote service name RPCName string `yaml:"rpcName"` @@ -244,6 +251,30 @@ type ( RPCAddress string `yaml:"rpcAddress"` } + // ClustersInformation contains the all cluster which participated in cross DC + ClustersInformation struct { + EnableGlobalDomain bool `yaml:"enableGlobalDomain"` + // VersionIncrement is the increment of each cluster version when failover happens + VersionIncrement int64 `yaml:"versionIncrement"` + // MasterClusterName is the master cluster name, only the master cluster can register / update domain + // all clusters can do domain failover + MasterClusterName string `yaml:"masterClusterName"` + // CurrentClusterName is the name of the current cluster + CurrentClusterName string `yaml:"currentClusterName"` + // ClusterInformation contains all cluster names to corresponding information about that cluster + ClusterInformation map[string]ClusterInformation `yaml:"clusterInformation"` + } + + // ClusterInformation contains the information about each cluster which participated in cross DC + ClusterInformation struct { + Enabled bool `yaml:"enabled"` + InitialVersion int64 `yaml:"initialVersion"` + // RPCName indicate the remote service name + RPCName string `yaml:"rpcName"` + // Address indicate the remote service IP address + RPCAddress string `yaml:"rpcAddress"` + } + // DCRedirectionPolicy contains the frontend datacenter redirection policy DCRedirectionPolicy struct { Policy string `yaml:"policy"` diff --git a/config/development.yaml b/config/development.yaml index bcdfb8d1575..12ee3b64fa5 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -65,6 +65,8 @@ services: pprof: port: 7940 +# use clustersInfo for testing compatibility +# NOTE: clustersInfo is deprecated, plz use clustersInformation below clustersInfo: enableGlobalDomain: false failoverVersionIncrement: 10 @@ -77,6 +79,18 @@ clustersInfo: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" +#clustersInformation: +# enableGlobalDomain: false +# versionIncrement: 10 +# masterClusterName: "active" +# currentClusterName: "active" +# clusterInformation: +# active: +# enabled: true +# initialVersion: 0 +# rpcName: "cadence-frontend" +# rpcAddress: "127.0.0.1:7933" + dcRedirectionPolicy: policy: "noop" toDC: "" diff --git a/config/development_active.yaml b/config/development_active.yaml index 7f3d94e319c..e9559890217 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -65,19 +65,20 @@ services: pprof: port: 7941 -clustersInfo: +clustersInformation: enableGlobalDomain: true - failoverVersionIncrement: 10 + versionIncrement: 10 masterClusterName: "active" currentClusterName: "active" - clusterInitialFailoverVersion: - active: 1 - standby: 0 - clusterAddress: + clusterInformation: active: + enabled: true + initialVersion: 1 rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" standby: + enabled: true + initialVersion: 0 rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:8933" @@ -93,24 +94,18 @@ kafka: topics: active: cluster: test - active-retry: - cluster: test active-dlq: cluster: test standby: cluster: test - standby-retry: - cluster: test standby-dlq: cluster: test cadence-cluster-topics: active: topic: active - retry-topic: active-retry dlq-topic: active-dlq standby: topic: standby - retry-topic: standby-retry dlq-topic: standby-dlq archival: diff --git a/config/development_prometheus.yaml b/config/development_prometheus.yaml index a6bf6844b7e..0f6a3458aa3 100644 --- a/config/development_prometheus.yaml +++ b/config/development_prometheus.yaml @@ -65,15 +65,15 @@ services: pprof: port: 7940 -clustersInfo: +clustersInformation: enableGlobalDomain: false - failoverVersionIncrement: 10 + versionIncrement: 10 masterClusterName: "active" currentClusterName: "active" - clusterInitialFailoverVersion: - active: 0 - clusterAddress: + clusterInformation: active: + enabled: true + initialVersion: 0 rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" diff --git a/config/development_standby.yaml b/config/development_standby.yaml index 1b16c2fa793..3b94b771ddf 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -65,19 +65,20 @@ services: pprof: port: 8941 -clustersInfo: +clustersInformation: enableGlobalDomain: true - failoverVersionIncrement: 10 + versionIncrement: 10 masterClusterName: "active" currentClusterName: "standby" - clusterInitialFailoverVersion: - active: 1 - standby: 0 - clusterAddress: + clusterInformation: active: + enabled: true + initialVersion: 1 rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" standby: + enabled: true + initialVersion: 0 rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:8933" @@ -93,24 +94,18 @@ kafka: topics: active: cluster: test - active-retry: - cluster: test active-dlq: cluster: test standby: cluster: test - standby-retry: - cluster: test standby-dlq: cluster: test cadence-cluster-topics: active: topic: active - retry-topic: active-retry dlq-topic: active-dlq standby: topic: standby - retry-topic: standby-retry dlq-topic: standby-dlq archival: diff --git a/docker/config_template.yaml b/docker/config_template.yaml index 7fc7c99815f..95dc378eea7 100644 --- a/docker/config_template.yaml +++ b/docker/config_template.yaml @@ -61,15 +61,15 @@ services: hostPort: "${STATSD_ENDPOINT}" prefix: "cadence-worker" -clustersInfo: +clustersInformation: enableGlobalDomain: false - failoverVersionIncrement: 10 + versionIncrement: 10 masterClusterName: "active" currentClusterName: "active" - clusterInitialFailoverVersion: - active: 0 - clusterAddress: + clusterInformation: active: + enabled: true + initialVersion: 0 rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" diff --git a/docker/config_template_mysql.yaml b/docker/config_template_mysql.yaml index 00e1bac53e6..75f94278036 100644 --- a/docker/config_template_mysql.yaml +++ b/docker/config_template_mysql.yaml @@ -67,15 +67,15 @@ services: hostPort: "${STATSD_ENDPOINT}" prefix: "cadence-worker" -clustersInfo: +clustersInformation: enableGlobalDomain: false - failoverVersionIncrement: 10 + versionIncrement: 10 masterClusterName: "active" currentClusterName: "active" - clusterInitialFailoverVersion: - active: 0 - clusterAddress: + clusterInformation: active: + enabled: true + initialVersion: 0 rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" diff --git a/host/integration_cross_dc_domain_test.go b/host/integration_cross_dc_domain_test.go index 62f52e99c1e..b2bb9dfb766 100644 --- a/host/integration_cross_dc_domain_test.go +++ b/host/integration_cross_dc_domain_test.go @@ -94,7 +94,7 @@ func (s *integrationCrossDCSuite) setupTest(enableGlobalDomain bool, isMasterClu EnableIndexer: false, }, IsMasterCluster: isMasterCluster, - ClusterInfo: config.ClustersInfo{ + ClustersInformation: config.ClustersInformation{ EnableGlobalDomain: enableGlobalDomain, }, HistoryConfig: &HistoryConfig{ @@ -291,7 +291,7 @@ func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainD activeClusterName := "" currentClusterName := s.ClusterMetadata.GetCurrentClusterName() var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -342,7 +342,7 @@ func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainE emitMetric := true activeClusterName := "" var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -374,7 +374,7 @@ func (s *integrationCrossDCSuite) TestIntegrationRegisterListDomains() { emitMetric := true activeClusterName := "" var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -447,7 +447,7 @@ func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainE emitMetric := true activeClusterName := "" var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -501,7 +501,7 @@ func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainDis emitMetric := true currentClusterName := s.ClusterMetadata.GetCurrentClusterName() var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -655,7 +655,7 @@ func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEna retention := int32(7) emitMetric := true var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -685,7 +685,7 @@ func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEna domainName := "some random domain name" var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -857,7 +857,7 @@ func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEna retention := int32(7) emitMetric := true var clusters []*workflow.ClusterReplicationConfiguration - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) @@ -916,7 +916,7 @@ func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEna activeClusterName := "" failoverVersion := int64(59) var persistenceClusters []*persistence.ClusterReplicationConfig - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + for clusterName := range s.ClusterMetadata.GetAllClusterInfo() { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(clusterName), }) diff --git a/host/testcluster.go b/host/testcluster.go index 85e325aa8da..659378ce14e 100644 --- a/host/testcluster.go +++ b/host/testcluster.go @@ -21,6 +21,8 @@ package host import ( + "encoding/json" + "fmt" "io/ioutil" "os" @@ -58,7 +60,7 @@ type ( EnableArchival bool IsMasterCluster bool ClusterNo int - ClusterInfo config.ClustersInfo + ClustersInformation config.ClustersInformation MessagingClientConfig *MessagingClientConfig Persistence persistencetests.TestBaseOptions HistoryConfig *HistoryConfig @@ -84,18 +86,26 @@ const defaultTestValueOfESIndexMaxResultWindow = 5 // NewCluster creates and sets up the test cluster func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, error) { - clusterInfo := options.ClusterInfo - clusterMetadata := cluster.GetTestClusterMetadata(clusterInfo.EnableGlobalDomain, options.IsMasterCluster, options.EnableArchival) - if !options.IsMasterCluster && options.ClusterInfo.MasterClusterName != "" { // xdc cluster metadata setup + clustersInformation := options.ClustersInformation + + print := func(value interface{}) string { + bytes, _ := json.MarshalIndent(value, "", " ") + return string(bytes) + } + fmt.Printf("++++++++++\n") + fmt.Printf("## WHAT:\n%v\n.", print(clustersInformation)) + fmt.Printf("++++++++++\n") + + clusterMetadata := cluster.GetTestClusterMetadata(clustersInformation.EnableGlobalDomain, options.IsMasterCluster, options.EnableArchival) + if !options.IsMasterCluster && clustersInformation.MasterClusterName != "" { // xdc cluster metadata setup clusterMetadata = cluster.NewMetadata( logger, &metricsmocks.Client{}, - dynamicconfig.GetBoolPropertyFn(clusterInfo.EnableGlobalDomain), - clusterInfo.FailoverVersionIncrement, - clusterInfo.MasterClusterName, - clusterInfo.CurrentClusterName, - clusterInfo.ClusterInitialFailoverVersions, - clusterInfo.ClusterAddress, + dynamicconfig.GetBoolPropertyFn(clustersInformation.EnableGlobalDomain), + clustersInformation.VersionIncrement, + clustersInformation.MasterClusterName, + clustersInformation.CurrentClusterName, + clustersInformation.ClusterInformation, dynamicconfig.GetStringPropertyFn("disabled"), "", dynamicconfig.GetBoolPropertyFn(false), diff --git a/hostxdc/testdata/integrationtestclusters.yaml b/hostxdc/testdata/integrationtestclusters.yaml index 907907e5ebd..7b0c00a858e 100644 --- a/hostxdc/testdata/integrationtestclusters.yaml +++ b/hostxdc/testdata/integrationtestclusters.yaml @@ -1,20 +1,21 @@ - persisence: dbname: integration_active - clusterinfo: + clustersinformation: enableGlobalDomain: true - failoverVersionIncrement: 10 - masterClusterName: active - currentClusterName: active - clusterInitialFailoverVersion: - active: 0 - standby: 1 - clusterAddress: + versionIncrement: 10 + masterClusterName: "active" + currentClusterName: "active" + clusterInformation: active: - rpcName: cadence-frontend - rpcAddress: 127.0.0.1:7104 + enabled: true + initialVersion: 0 + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:7104" standby: - rpcName: cadence-frontend - rpcAddress: 127.0.0.1:8104 + enabled: true + initialVersion: 1 + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:8104" enablearchival: false workerconfig: enablearchiver: false @@ -37,41 +38,36 @@ cluster: test active-dlq: cluster: test - active-retry: - cluster: test standby: cluster: test standby-dlq: cluster: test - standby-retry: - cluster: test cadence-cluster-topics: active: topic: active - retry-topic: active-retry dlq-topic: active-dlq standby: topic: standby - retry-topic: standby-retry dlq-topic: standby-dlq applications: {} - persistence: dbname: integration_standby - clusterinfo: + clustersinformation: enableGlobalDomain: true - failoverVersionIncrement: 10 - masterClusterName: active - currentClusterName: standby - clusterInitialFailoverVersion: - active: 0 - standby: 1 - clusterAddress: + versionIncrement: 10 + masterClusterName: "active" + currentClusterName: "standby" + clusterInformation: active: - rpcName: cadence-frontend - rpcAddress: 127.0.0.1:7104 + enabled: true + initialVersion: 0 + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:7104" standby: - rpcName: cadence-frontend - rpcAddress: 127.0.0.1:8104 + enabled: true + initialVersion: 1 + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:8104" enablearchival: false workerconfig: enablearchiver: false @@ -94,21 +90,15 @@ cluster: test active-dlq: cluster: test - active-retry: - cluster: test standby: cluster: test standby-dlq: cluster: test - standby-retry: - cluster: test cadence-cluster-topics: active: topic: active - retry-topic: active-retry dlq-topic: active-dlq standby: topic: standby - retry-topic: standby-retry dlq-topic: standby-dlq applications: {} diff --git a/service/frontend/dcRedirectionPolicy.go b/service/frontend/dcRedirectionPolicy.go index 49b4dfffccf..e25835e9ad0 100644 --- a/service/frontend/dcRedirectionPolicy.go +++ b/service/frontend/dcRedirectionPolicy.go @@ -70,8 +70,7 @@ func RedirectionPolicyGenerator(clusterMetadata cluster.Metadata, return NewNoopRedirectionPolicy(clusterMetadata.GetCurrentClusterName()) case DCRedirectionPolicyForwarding: currentClusterName := clusterMetadata.GetCurrentClusterName() - clusterAddress := clusterMetadata.GetAllClientAddress() - if _, ok := clusterAddress[policy.ToDC]; !ok { + if _, ok := clusterMetadata.GetAllClusterInfo()[policy.ToDC]; !ok { panic(fmt.Sprintf("Incorrect to DC: %v", policy.ToDC)) } return NewForwardingDCRedirectionPolicy( diff --git a/service/frontend/domainHandler.go b/service/frontend/domainHandler.go index d9a9ad09090..9afc5e4d7b8 100644 --- a/service/frontend/domainHandler.go +++ b/service/frontend/domainHandler.go @@ -663,7 +663,7 @@ func (d *domainHandlerImpl) mergeDomainData(old map[string]string, new map[strin } func (d *domainHandlerImpl) validateClusterName(clusterName string) error { - if _, ok := d.clusterMetadata.GetAllClusterFailoverVersions()[clusterName]; !ok { + if info, ok := d.clusterMetadata.GetAllClusterInfo()[clusterName]; !ok || !info.Enabled { errMsg := "Invalid cluster name: %s" return &shared.BadRequestError{Message: fmt.Sprintf(errMsg, clusterName)} } diff --git a/service/history/conflictResolver_test.go b/service/history/conflictResolver_test.go index 902149f5b08..dfad9075612 100644 --- a/service/history/conflictResolver_test.go +++ b/service/history/conflictResolver_test.go @@ -217,7 +217,7 @@ func (s *conflictResolverSuite) TestReset() { DecisionTaskScheduledEventAttributes: &shared.DecisionTaskScheduledEventAttributes{}, } - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", event1.GetVersion()).Return(sourceCluster).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", event1.GetVersion()).Return(sourceCluster) s.mockHistoryMgr.On("GetWorkflowExecutionHistory", &persistence.GetWorkflowExecutionHistoryRequest{ DomainID: domainID, Execution: execution, diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index e28f0f8faed..8614ce7697b 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -113,7 +113,7 @@ func (s *engine2Suite) SetupTest() { s.mockClientBean = &client.MockClientBean{} s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.mockClientBean) s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) s.mockDomainCache = &cache.DomainCacheMock{} s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(cache.NewDomainCacheEntryForTest(&p.DomainInfo{ID: validDomainID}, &p.DomainConfig{}), nil) diff --git a/service/history/historyEngine3_eventsv2_test.go b/service/history/historyEngine3_eventsv2_test.go index 60fae0c8137..d414d19f6c9 100644 --- a/service/history/historyEngine3_eventsv2_test.go +++ b/service/history/historyEngine3_eventsv2_test.go @@ -109,7 +109,7 @@ func (s *engine3Suite) SetupTest() { s.mockClientBean = &client.MockClientBean{} s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.mockClientBean) s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) s.mockDomainCache = &cache.DomainCacheMock{} s.mockArchivalClient = &archiver.ClientMock{} diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index d2ed2791615..0102772c4c1 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -119,7 +119,7 @@ func (s *engineSuite) SetupTest() { s.mockClientBean = &client.MockClientBean{} s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean) s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.mockEventsCache = &MockEventsCache{} historyEventNotifier := newHistoryEventNotifier( @@ -156,7 +156,7 @@ func (s *engineSuite) SetupTest() { historyCache := newHistoryCache(shardContextWrapper) // this is used by shard context, not relevant to this test, so we do not care how many times "GetCurrentClusterName" os called s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) h := &historyEngineImpl{ currentClusterName: currentClusterName, diff --git a/service/history/historyReplicator_test.go b/service/history/historyReplicator_test.go index 30613d6e303..ab24fc451ec 100644 --- a/service/history/historyReplicator_test.go +++ b/service/history/historyReplicator_test.go @@ -905,6 +905,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEventsMissingMutableState_Incomin msBuilderCurrent.On("GetNextEventID").Return(currentNextEventID) msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(true) // this is used to update the version on mutable state msBuilderCurrent.On("UpdateReplicationStateVersion", version, true).Once() + msBuilderCurrent.On("UpdateReplicationStateLastEventID", cluster, version, currentNextEventID).Once() s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{ DomainID: domainID, @@ -3346,6 +3347,7 @@ func (s *historyReplicatorSuite) TestConflictResolutionTerminateCurrentRunningIf msBuilderCurrent.On("GetNextEventID").Return(currentNextEventID) msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(true) // this is used to update the version on mutable state msBuilderCurrent.On("UpdateReplicationStateVersion", incomingVersion, true).Once() + msBuilderCurrent.On("UpdateReplicationStateLastEventID", incomingCluster, incomingVersion, currentNextEventID).Once() s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{ DomainID: domainID, diff --git a/service/history/historyTestBase.go b/service/history/historyTestBase.go index 8879f8faca7..c118901f0a9 100644 --- a/service/history/historyTestBase.go +++ b/service/history/historyTestBase.go @@ -100,7 +100,11 @@ func newTestShardContext(shardInfo *persistence.ShardInfo, transferSequenceNumbe // initialize the cluster current time to be the same as ack level standbyClusterCurrentTime := make(map[string]time.Time) timerMaxReadLevelMap := make(map[string]time.Time) - for clusterName := range clusterMetadata.GetAllClusterFailoverVersions() { + for clusterName, info := range clusterMetadata.GetAllClusterInfo() { + if !info.Enabled { + continue + } + if clusterName != clusterMetadata.GetCurrentClusterName() { if currentTime, ok := shardInfo.ClusterTimerAckLevel[clusterName]; ok { standbyClusterCurrentTime[clusterName] = currentTime diff --git a/service/history/shardContext.go b/service/history/shardContext.go index 88ff3351120..ac782d54bc9 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -1130,7 +1130,11 @@ func acquireShard(shardItem *historyShardsItem, closeCh chan<- int) (ShardContex // initialize the cluster current time to be the same as ack level standbyClusterCurrentTime := make(map[string]time.Time) timerMaxReadLevelMap := make(map[string]time.Time) - for clusterName := range shardItem.service.GetClusterMetadata().GetAllClusterFailoverVersions() { + for clusterName, info := range shardItem.service.GetClusterMetadata().GetAllClusterInfo() { + if !info.Enabled { + continue + } + if clusterName != shardItem.service.GetClusterMetadata().GetCurrentClusterName() { if currentTime, ok := shardInfo.ClusterTimerAckLevel[clusterName]; ok { standbyClusterCurrentTime[clusterName] = currentTime diff --git a/service/history/shardController_test.go b/service/history/shardController_test.go index 15b3e5b35b6..3f93c981807 100644 --- a/service/history/shardController_test.go +++ b/service/history/shardController_test.go @@ -174,7 +174,7 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() { // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.controller.acquireShards() count := 0 for _, shardID := range myShards { @@ -260,7 +260,7 @@ func (s *shardControllerSuite) TestAcquireShardRenewSuccess() { // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.controller.acquireShards() for shardID := 0; shardID < numShards; shardID++ { @@ -335,7 +335,7 @@ func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() { // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.controller.acquireShards() for shardID := 0; shardID < numShards; shardID++ { @@ -364,7 +364,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() { mock.Anything).Return(nil) // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.controller.Start() var workerWG sync.WaitGroup for w := 0; w < 10; w++ { @@ -457,7 +457,7 @@ func (s *shardControllerSuite) TestRingUpdated() { mock.Anything).Return(nil) // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.controller.Start() differentHostInfo := membership.NewHostInfo("another-host", nil) @@ -537,7 +537,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() { mock.Anything).Return(nil) // when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.controller.Start() var workerWG sync.WaitGroup diff --git a/service/history/stateBuilder.go b/service/history/stateBuilder.go index 95f989e47dd..d271863fe42 100644 --- a/service/history/stateBuilder.go +++ b/service/history/stateBuilder.go @@ -117,6 +117,8 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha // NOTE: stateBuilder is also being used in the active side if b.msBuilder.GetReplicationState() != nil { b.msBuilder.UpdateReplicationStateVersion(event.GetVersion(), true) + sourceClusterName := b.clusterMetadata.ClusterNameForFailoverVersion(lastEvent.GetVersion()) + b.msBuilder.UpdateReplicationStateLastEventID(sourceClusterName, lastEvent.GetVersion(), lastEvent.GetEventId()) } b.msBuilder.GetExecutionInfo().LastEventTaskID = event.GetTaskId() diff --git a/service/history/stateBuilder_test.go b/service/history/stateBuilder_test.go index d5892824bcd..554c6a9eee0 100644 --- a/service/history/stateBuilder_test.go +++ b/service/history/stateBuilder_test.go @@ -60,6 +60,8 @@ type ( mockClientBean *client.MockClientBean mockEventsCache *MockEventsCache + sourceCluster string + stateBuilder *stateBuilderImpl } ) @@ -111,6 +113,7 @@ func (s *stateBuilderSuite) SetupTest() { s.stateBuilder = newStateBuilder(s.mockShard, s.mockMutableState, s.logger) s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(true) + s.sourceCluster = "some random source cluster" } func (s *stateBuilderSuite) TearDownTest() { @@ -127,6 +130,8 @@ func (s *stateBuilderSuite) TearDownTest() { func (s *stateBuilderSuite) mockUpdateVersion(events ...*shared.HistoryEvent) { for _, event := range events { s.mockMutableState.On("UpdateReplicationStateVersion", event.GetVersion(), true).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", event.GetVersion()).Return(s.sourceCluster).Once() + s.mockMutableState.On("UpdateReplicationStateLastEventID", s.sourceCluster, event.GetVersion(), event.GetEventId()).Once() } } @@ -405,7 +410,6 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionFailed() { } func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedAsNew() { - sourceCluster := "some random source cluster" version := int64(1) requestID := uuid.New() domainName := "some random domain name" @@ -510,10 +514,10 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA TableVersion: persistence.DomainTableVersionV1, }, nil, ).Once() - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", continueAsNewEvent.GetVersion()).Return(sourceCluster).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", continueAsNewEvent.GetVersion()).Return(s.sourceCluster).Once() s.mockMutableState.On("ReplicateWorkflowExecutionContinuedAsNewEvent", continueAsNewEvent.GetEventId(), - sourceCluster, + s.sourceCluster, domainID, continueAsNewEvent, newRunStartedEvent, @@ -529,7 +533,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA mock.Anything, int32(0), mock.Anything, ).Return(nil) - s.mockUpdateVersion(continueAsNewEvent) + s.mockUpdateVersion(continueAsNewEvent, newRunStartedEvent, newRunSignalEvent, newRunDecisionEvent) s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{}) newRunHistory := &shared.History{Events: []*shared.HistoryEvent{newRunStartedEvent, newRunSignalEvent, newRunDecisionEvent}} @@ -565,7 +569,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA expectedNewRunStateBuilder.GetExecutionInfo().LastFirstEventID = newRunStartedEvent.GetEventId() expectedNewRunStateBuilder.GetExecutionInfo().NextEventID = newRunDecisionEvent.GetEventId() + 1 expectedNewRunStateBuilder.SetHistoryBuilder(newHistoryBuilderFromEvents(newRunHistory.Events, s.logger)) - expectedNewRunStateBuilder.UpdateReplicationStateLastEventID(sourceCluster, newRunStartedEvent.GetVersion(), newRunDecisionEvent.GetEventId()) + expectedNewRunStateBuilder.UpdateReplicationStateLastEventID(s.sourceCluster, newRunStartedEvent.GetVersion(), newRunDecisionEvent.GetEventId()) s.Equal(expectedNewRunStateBuilder, newRunStateBuilder) s.Equal([]persistence.Task{&persistence.CloseExecutionTask{}}, s.stateBuilder.transferTasks) @@ -723,7 +727,6 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCancelRequ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedAsNew_EventsV2() { - sourceCluster := "some random source cluster" version := int64(1) requestID := uuid.New() domainName := "some random domain name" @@ -838,10 +841,10 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA TableVersion: persistence.DomainTableVersionV1, }, nil, ).Once() - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", continueAsNewEvent.GetVersion()).Return(sourceCluster).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", continueAsNewEvent.GetVersion()).Return(s.sourceCluster).Once() s.mockMutableState.On("ReplicateWorkflowExecutionContinuedAsNewEvent", continueAsNewEvent.GetEventId(), - sourceCluster, + s.sourceCluster, domainID, continueAsNewEvent, newRunStartedEvent, @@ -857,7 +860,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA mock.Anything, int32(persistence.EventStoreVersionV2), mock.Anything, ).Return(nil) - s.mockUpdateVersion(continueAsNewEvent) + s.mockUpdateVersion(continueAsNewEvent, newRunStartedEvent, newRunSignalEvent, newRunDecisionEvent) s.mockMutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{}) newRunHistory := &shared.History{Events: []*shared.HistoryEvent{newRunStartedEvent, newRunSignalEvent, newRunDecisionEvent}} @@ -897,7 +900,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA expectedNewRunStateBuilder.GetExecutionInfo().EventStoreVersion = persistence.EventStoreVersionV2 expectedNewRunStateBuilder.GetExecutionInfo().BranchToken = newRunStateBuilder.GetCurrentBranch() expectedNewRunStateBuilder.SetHistoryBuilder(newHistoryBuilderFromEvents(newRunHistory.Events, s.logger)) - expectedNewRunStateBuilder.UpdateReplicationStateLastEventID(sourceCluster, newRunStartedEvent.GetVersion(), newRunDecisionEvent.GetEventId()) + expectedNewRunStateBuilder.UpdateReplicationStateLastEventID(s.sourceCluster, newRunStartedEvent.GetVersion(), newRunDecisionEvent.GetEventId()) s.Equal(expectedNewRunStateBuilder, newRunStateBuilder) s.Equal(int32(persistence.EventStoreVersionV2), newRunStateBuilder.GetEventStoreVersion()) diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 2b87c1abd4e..a6401d06240 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -69,7 +69,11 @@ func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImp logger = logger.WithTags(tag.ComponentTimerQueue) taskAllocator := newTaskAllocator(shard) standbyTimerProcessors := make(map[string]*timerQueueStandbyProcessorImpl) - for clusterName := range shard.GetService().GetClusterMetadata().GetAllClusterFailoverVersions() { + for clusterName, info := range shard.GetService().GetClusterMetadata().GetAllClusterInfo() { + if !info.Enabled { + continue + } + if clusterName != shard.GetService().GetClusterMetadata().GetCurrentClusterName() { historyRereplicator := xdc.NewHistoryRereplicator( currentClusterName, @@ -151,11 +155,15 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers(clusterName string, currentTim func (t *timerQueueProcessorImpl) FailoverDomain(domainIDs map[string]struct{}) { minLevel := t.shard.GetTimerClusterAckLevel(t.currentClusterName) standbyClusterName := t.currentClusterName - for cluster := range t.shard.GetService().GetClusterMetadata().GetAllClusterFailoverVersions() { - ackLevel := t.shard.GetTimerClusterAckLevel(cluster) + for clusterName, info := range t.shard.GetService().GetClusterMetadata().GetAllClusterInfo() { + if !info.Enabled { + continue + } + + ackLevel := t.shard.GetTimerClusterAckLevel(clusterName) if ackLevel.Before(minLevel) { minLevel = ackLevel - standbyClusterName = cluster + standbyClusterName = clusterName } } // the ack manager is exclusive, so just add a cassandra min precision diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index 6fb4890a17e..7441e95caed 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -137,7 +137,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() { historyCache := newHistoryCache(s.mockShard) // this is used by shard context, not relevent to this test, so we do not care how many times "GetCurrentClusterName" os called s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) s.mockClusterMetadata.On("IsArchivalEnabled").Return(false) h := &historyEngineImpl{ diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 103710eef97..f99b9ebd570 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -72,7 +72,11 @@ func newTransferQueueProcessor(shard ShardContext, historyService *historyEngine currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() taskAllocator := newTaskAllocator(shard) standbyTaskProcessors := make(map[string]*transferQueueStandbyProcessorImpl) - for clusterName := range shard.GetService().GetClusterMetadata().GetAllClusterFailoverVersions() { + for clusterName, info := range shard.GetService().GetClusterMetadata().GetAllClusterInfo() { + if !info.Enabled { + continue + } + if clusterName != currentClusterName { historyRereplicator := xdc.NewHistoryRereplicator( currentClusterName, @@ -162,11 +166,14 @@ func (t *transferQueueProcessorImpl) NotifyNewTask(clusterName string, transferT func (t *transferQueueProcessorImpl) FailoverDomain(domainIDs map[string]struct{}) { minLevel := t.shard.GetTransferClusterAckLevel(t.currentClusterName) standbyClusterName := t.currentClusterName - for cluster := range t.shard.GetService().GetClusterMetadata().GetAllClusterFailoverVersions() { - ackLevel := t.shard.GetTransferClusterAckLevel(cluster) + for clusterName, info := range t.shard.GetService().GetClusterMetadata().GetAllClusterInfo() { + if !info.Enabled { + continue + } + ackLevel := t.shard.GetTransferClusterAckLevel(clusterName) if ackLevel < minLevel { minLevel = ackLevel - standbyClusterName = cluster + standbyClusterName = clusterName } } diff --git a/service/history/workflowResetor_test.go b/service/history/workflowResetor_test.go index 6220c8ec23d..af352746980 100644 --- a/service/history/workflowResetor_test.go +++ b/service/history/workflowResetor_test.go @@ -120,7 +120,7 @@ func (s *resetorSuite) SetupTest() { s.mockClientBean = &client.MockClientBean{} s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.mockClientBean) s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockClusterMetadata.On("GetAllClusterFailoverVersions").Return(cluster.TestSingleDCAllClusterFailoverVersions) + s.mockClusterMetadata.On("GetAllClusterInfo").Return(cluster.TestSingleDCClusterInfo) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) s.mockDomainCache = &cache.DomainCacheMock{} s.mockArchivalClient = &archiver.ClientMock{} @@ -1441,6 +1441,11 @@ func (s *resetorSuite) TestResetWorkflowExecution_NoReplication_WithRequestCance func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCurrent() { domainName := "testDomainName" + beforeResetVersion := int64(100) + afterResetVersion := int64(101) + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", beforeResetVersion).Return("standby") + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", afterResetVersion).Return("active") + testDomainEntry := cache.NewDomainCacheEntryWithReplicationForTest( &p.DomainInfo{ID: validDomainID}, &p.DomainConfig{Retention: 1}, @@ -1453,7 +1458,10 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur ClusterName: "standby", }, }, - }, cluster.GetTestClusterMetadata(true, true, false)) + }, + afterResetVersion, + cluster.GetTestClusterMetadata(true, true, false), + ) // override domain cache s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(testDomainEntry, nil) s.mockDomainCache.On("GetDomain", mock.Anything).Return(testDomainEntry, nil) @@ -1519,10 +1527,10 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur BranchToken: forkBranchToken, NextEventID: 35, } - currentVersion := int64(100) + forkRepState := &p.ReplicationState{ - CurrentVersion: currentVersion, - StartVersion: currentVersion, + CurrentVersion: beforeResetVersion, + StartVersion: beforeResetVersion, LastWriteEventID: common.EmptyEventID, LastWriteVersion: common.EmptyVersion, LastReplicationInfo: map[string]*p.ReplicationInfo{}, @@ -1578,7 +1586,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ &workflow.HistoryEvent{ EventId: common.Int64Ptr(1), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionStarted), WorkflowExecutionStartedEventAttributes: &workflow.WorkflowExecutionStartedEventAttributes{ WorkflowType: &workflow.WorkflowType{ @@ -1592,7 +1600,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(2), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -1605,7 +1613,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(3), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -1617,7 +1625,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(4), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -1626,7 +1634,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(5), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeMarkerRecorded), MarkerRecordedEventAttributes: &workflow.MarkerRecordedEventAttributes{ MarkerName: common.StringPtr("Version"), @@ -1636,7 +1644,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(6), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted1), @@ -1653,7 +1661,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(7), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -1667,7 +1675,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(8), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -1679,7 +1687,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(9), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -1688,7 +1696,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(10), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -1701,7 +1709,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(11), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -1713,7 +1721,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(12), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -1726,7 +1734,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(13), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerFired), TimerFiredEventAttributes: &workflow.TimerFiredEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -1734,7 +1742,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(14), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -1747,7 +1755,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(15), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -1759,7 +1767,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(16), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -1768,7 +1776,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(17), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDRetry), @@ -1792,7 +1800,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(18), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDNotStarted), @@ -1809,7 +1817,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(19), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID1), @@ -1819,7 +1827,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(20), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID2), @@ -1829,7 +1837,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(21), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted2), @@ -1846,7 +1854,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(22), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDStartedNoRetry), @@ -1863,7 +1871,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(23), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName3), @@ -1875,7 +1883,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(24), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -1887,7 +1895,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(25), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName4), @@ -1899,7 +1907,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(26), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(22), @@ -1911,7 +1919,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(27), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -1920,7 +1928,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(28), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -1933,7 +1941,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(29), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(28), @@ -1946,7 +1954,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(30), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(28), @@ -1955,7 +1963,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur }, { EventId: common.Int64Ptr(31), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerAfterReset), @@ -1969,7 +1977,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(32), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(18), @@ -1981,7 +1989,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(33), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName1), @@ -1993,7 +2001,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(34), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName2), @@ -2047,7 +2055,6 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur s.mockHistoryV2Mgr.On("CompleteForkBranch", completeReqErr).Return(nil).Maybe() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(appendV1Resp, nil).Once() s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(appendV2Resp, nil).Once() - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", mock.Anything).Return("active") s.mockExecutionMgr.On("ResetWorkflowExecution", mock.Anything).Return(nil).Once() s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Once() @@ -2136,8 +2143,16 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur s.Equal(p.ReplicationTaskTypeHistory, resetReq.CurrReplicationTasks[0].GetType()) compareRepState := copyReplicationState(forkRepState) + compareRepState.StartVersion = beforeResetVersion + compareRepState.CurrentVersion = afterResetVersion compareRepState.LastWriteEventID = 34 - compareRepState.LastWriteVersion = currentVersion + compareRepState.LastWriteVersion = afterResetVersion + compareRepState.LastReplicationInfo = map[string]*p.ReplicationInfo{ + "standby": &p.ReplicationInfo{ + LastEventID: 29, + Version: beforeResetVersion, + }, + } s.Equal(compareRepState, resetReq.InsertReplicationState) // not supported feature @@ -2149,6 +2164,11 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { domainName := "testDomainName" + beforeResetVersion := int64(100) + afterResetVersion := int64(101) + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", beforeResetVersion).Return("active") + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", afterResetVersion).Return("standby") + testDomainEntry := cache.NewDomainCacheEntryWithReplicationForTest( &p.DomainInfo{ID: validDomainID}, &p.DomainConfig{Retention: 1}, @@ -2161,7 +2181,10 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { ClusterName: "standby", }, }, - }, cluster.GetTestClusterMetadata(true, true, false)) + }, + afterResetVersion, + cluster.GetTestClusterMetadata(true, true, false), + ) // override domain cache s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(testDomainEntry, nil) s.mockDomainCache.On("GetDomain", mock.Anything).Return(testDomainEntry, nil) @@ -2227,10 +2250,10 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { BranchToken: forkBranchToken, NextEventID: 35, } - currentVersion := int64(100) + forkRepState := &p.ReplicationState{ - CurrentVersion: currentVersion, - StartVersion: currentVersion, + CurrentVersion: beforeResetVersion, + StartVersion: beforeResetVersion, LastWriteEventID: common.EmptyEventID, LastWriteVersion: common.EmptyVersion, LastReplicationInfo: map[string]*p.ReplicationInfo{}, @@ -2285,7 +2308,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ &workflow.HistoryEvent{ EventId: common.Int64Ptr(1), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionStarted), WorkflowExecutionStartedEventAttributes: &workflow.WorkflowExecutionStartedEventAttributes{ WorkflowType: &workflow.WorkflowType{ @@ -2299,7 +2322,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(2), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -2312,7 +2335,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(3), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -2324,7 +2347,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(4), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -2333,7 +2356,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(5), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeMarkerRecorded), MarkerRecordedEventAttributes: &workflow.MarkerRecordedEventAttributes{ MarkerName: common.StringPtr("Version"), @@ -2343,7 +2366,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(6), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted1), @@ -2360,7 +2383,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(7), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -2374,7 +2397,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(8), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -2386,7 +2409,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(9), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -2395,7 +2418,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(10), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -2408,7 +2431,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(11), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -2420,7 +2443,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(12), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -2433,7 +2456,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(13), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerFired), TimerFiredEventAttributes: &workflow.TimerFiredEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -2441,7 +2464,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(14), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -2454,7 +2477,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(15), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -2466,7 +2489,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(16), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -2475,7 +2498,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(17), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDRetry), @@ -2499,7 +2522,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(18), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDNotStarted), @@ -2516,7 +2539,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(19), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID1), @@ -2526,7 +2549,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(20), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID2), @@ -2536,7 +2559,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(21), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted2), @@ -2553,7 +2576,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(22), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDStartedNoRetry), @@ -2570,7 +2593,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(23), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName3), @@ -2582,7 +2605,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(24), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -2594,7 +2617,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(25), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName4), @@ -2606,7 +2629,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(26), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(22), @@ -2618,7 +2641,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(27), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -2627,7 +2650,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(28), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -2640,7 +2663,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(29), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(28), @@ -2653,7 +2676,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(30), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(28), @@ -2662,7 +2685,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { }, { EventId: common.Int64Ptr(31), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerAfterReset), @@ -2676,7 +2699,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(32), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(18), @@ -2688,7 +2711,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(33), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName1), @@ -2700,7 +2723,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(34), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName2), @@ -2739,7 +2762,6 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { s.mockHistoryV2Mgr.On("ReadHistoryBranchByBatch", readHistoryReq).Return(readHistoryResp, nil).Once() s.mockHistoryV2Mgr.On("ForkHistoryBranch", mock.Anything).Return(forkResp, nil).Once() s.mockHistoryV2Mgr.On("CompleteForkBranch", completeReqErr).Return(nil).Once() - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", mock.Anything).Return("standby") s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Once() _, err = s.historyEngine.ResetWorkflowExecution(context.Background(), request) @@ -2748,6 +2770,11 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() { func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurrent() { domainName := "testDomainName" + beforeResetVersion := int64(100) + afterResetVersion := int64(101) + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", beforeResetVersion).Return("standby") + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", afterResetVersion).Return("active") + testDomainEntry := cache.NewDomainCacheEntryWithReplicationForTest( &p.DomainInfo{ID: validDomainID}, &p.DomainConfig{Retention: 1}, @@ -2760,7 +2787,10 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre ClusterName: "standby", }, }, - }, cluster.GetTestClusterMetadata(true, true, false)) + }, + afterResetVersion, + cluster.GetTestClusterMetadata(true, true, false), + ) // override domain cache s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(testDomainEntry, nil) s.mockDomainCache.On("GetDomain", mock.Anything).Return(testDomainEntry, nil) @@ -2826,10 +2856,10 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre BranchToken: forkBranchToken, NextEventID: 35, } - currentVersion := int64(100) + forkRepState := &p.ReplicationState{ - CurrentVersion: currentVersion, - StartVersion: currentVersion, + CurrentVersion: beforeResetVersion, + StartVersion: beforeResetVersion, LastWriteEventID: common.EmptyEventID, LastWriteVersion: common.EmptyVersion, LastReplicationInfo: map[string]*p.ReplicationInfo{}, @@ -2886,7 +2916,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ &workflow.HistoryEvent{ EventId: common.Int64Ptr(1), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionStarted), WorkflowExecutionStartedEventAttributes: &workflow.WorkflowExecutionStartedEventAttributes{ WorkflowType: &workflow.WorkflowType{ @@ -2900,7 +2930,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(2), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -2913,7 +2943,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(3), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -2925,7 +2955,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(4), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -2934,7 +2964,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(5), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeMarkerRecorded), MarkerRecordedEventAttributes: &workflow.MarkerRecordedEventAttributes{ MarkerName: common.StringPtr("Version"), @@ -2944,7 +2974,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(6), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted1), @@ -2961,7 +2991,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(7), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -2975,7 +3005,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(8), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -2987,7 +3017,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(9), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -2996,7 +3026,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(10), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -3009,7 +3039,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(11), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -3021,7 +3051,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(12), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -3034,7 +3064,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(13), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerFired), TimerFiredEventAttributes: &workflow.TimerFiredEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -3042,7 +3072,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(14), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -3055,7 +3085,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(15), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -3067,7 +3097,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(16), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -3076,7 +3106,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(17), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDRetry), @@ -3100,7 +3130,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(18), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDNotStarted), @@ -3117,7 +3147,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(19), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID1), @@ -3127,7 +3157,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(20), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID2), @@ -3137,7 +3167,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(21), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted2), @@ -3154,7 +3184,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(22), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDStartedNoRetry), @@ -3171,7 +3201,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(23), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName3), @@ -3183,7 +3213,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(24), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -3195,7 +3225,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(25), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName4), @@ -3207,7 +3237,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(26), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(22), @@ -3219,7 +3249,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(27), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -3228,7 +3258,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(28), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -3241,7 +3271,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(29), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(28), @@ -3254,7 +3284,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(30), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(28), @@ -3263,7 +3293,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre }, { EventId: common.Int64Ptr(31), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerAfterReset), @@ -3277,7 +3307,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(32), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(18), @@ -3289,7 +3319,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(33), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName1), @@ -3301,7 +3331,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(34), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName2), @@ -3351,7 +3381,6 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre s.mockHistoryV2Mgr.On("CompleteForkBranch", completeReq).Return(nil).Once() s.mockHistoryV2Mgr.On("CompleteForkBranch", completeReqErr).Return(nil).Maybe() s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(appendV2Resp, nil).Once() - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", mock.Anything).Return("active") s.mockExecutionMgr.On("ResetWorkflowExecution", mock.Anything).Return(nil).Once() response, err := s.historyEngine.ResetWorkflowExecution(context.Background(), request) s.Nil(err) @@ -3426,8 +3455,16 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre s.Equal(p.ReplicationTaskTypeHistory, resetReq.InsertReplicationTasks[0].GetType()) compareRepState := copyReplicationState(forkRepState) + compareRepState.StartVersion = beforeResetVersion + compareRepState.CurrentVersion = afterResetVersion compareRepState.LastWriteEventID = 34 - compareRepState.LastWriteVersion = currentVersion + compareRepState.LastWriteVersion = afterResetVersion + compareRepState.LastReplicationInfo = map[string]*p.ReplicationInfo{ + "standby": &p.ReplicationInfo{ + LastEventID: 29, + Version: beforeResetVersion, + }, + } s.Equal(compareRepState, resetReq.InsertReplicationState) // not supported feature @@ -3439,6 +3476,11 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre func (s *resetorSuite) TestApplyReset() { domainID := validDomainID + beforeResetVersion := int64(100) + afterResetVersion := int64(101) + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", beforeResetVersion).Return("standby") + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", afterResetVersion).Return("active") + testDomainEntry := cache.NewDomainCacheEntryWithReplicationForTest( &p.DomainInfo{ID: validDomainID}, &p.DomainConfig{Retention: 1}, @@ -3451,7 +3493,10 @@ func (s *resetorSuite) TestApplyReset() { ClusterName: "standby", }, }, - }, cluster.GetTestClusterMetadata(true, true, false)) + }, + afterResetVersion, + cluster.GetTestClusterMetadata(true, true, false), + ) // override domain cache s.mockDomainCache.On("GetDomainByID", mock.Anything).Return(testDomainEntry, nil) s.mockDomainCache.On("GetDomain", mock.Anything).Return(testDomainEntry, nil) @@ -3507,10 +3552,10 @@ func (s *resetorSuite) TestApplyReset() { BranchToken: forkBranchToken, NextEventID: 35, } - currentVersion := int64(100) + forkRepState := &p.ReplicationState{ - CurrentVersion: currentVersion, - StartVersion: currentVersion, + CurrentVersion: beforeResetVersion, + StartVersion: beforeResetVersion, LastWriteEventID: common.EmptyEventID, LastWriteVersion: common.EmptyVersion, LastReplicationInfo: map[string]*p.ReplicationInfo{}, @@ -3564,7 +3609,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ &workflow.HistoryEvent{ EventId: common.Int64Ptr(1), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionStarted), WorkflowExecutionStartedEventAttributes: &workflow.WorkflowExecutionStartedEventAttributes{ WorkflowType: &workflow.WorkflowType{ @@ -3578,7 +3623,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(2), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -3591,7 +3636,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(3), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -3603,7 +3648,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(4), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), @@ -3612,7 +3657,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(5), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeMarkerRecorded), MarkerRecordedEventAttributes: &workflow.MarkerRecordedEventAttributes{ MarkerName: common.StringPtr("Version"), @@ -3622,7 +3667,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(6), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted1), @@ -3639,7 +3684,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(7), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -3653,7 +3698,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(8), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -3665,7 +3710,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(9), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(6), @@ -3674,7 +3719,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(10), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -3687,7 +3732,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(11), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -3699,7 +3744,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(12), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(10), @@ -3712,7 +3757,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(13), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerFired), TimerFiredEventAttributes: &workflow.TimerFiredEventAttributes{ TimerId: common.StringPtr(timerFiredID), @@ -3720,7 +3765,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(14), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -3733,7 +3778,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(15), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -3745,7 +3790,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(16), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskCompleted), DecisionTaskCompletedEventAttributes: &workflow.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(14), @@ -3754,7 +3799,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(17), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDRetry), @@ -3778,7 +3823,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(18), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDNotStarted), @@ -3795,7 +3840,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(19), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID1), @@ -3805,7 +3850,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(20), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeTimerStarted), TimerStartedEventAttributes: &workflow.TimerStartedEventAttributes{ TimerId: common.StringPtr(timerUnfiredID2), @@ -3815,7 +3860,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(21), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDCompleted2), @@ -3832,7 +3877,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(22), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskScheduled), ActivityTaskScheduledEventAttributes: &workflow.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr(actIDStartedNoRetry), @@ -3849,7 +3894,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(23), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName3), @@ -3861,7 +3906,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(24), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -3873,7 +3918,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(25), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName4), @@ -3885,7 +3930,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(26), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskStarted), ActivityTaskStartedEventAttributes: &workflow.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(22), @@ -3897,7 +3942,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(27), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskCompleted), ActivityTaskCompletedEventAttributes: &workflow.ActivityTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(21), @@ -3906,7 +3951,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(28), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -3919,7 +3964,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ { EventId: common.Int64Ptr(29), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(beforeResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskStarted), DecisionTaskStartedEventAttributes: &workflow.DecisionTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(28), @@ -3968,7 +4013,7 @@ func (s *resetorSuite) TestApplyReset() { Events: []*workflow.HistoryEvent{ &workflow.HistoryEvent{ EventId: common.Int64Ptr(30), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(afterResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskFailed), DecisionTaskFailedEventAttributes: &workflow.DecisionTaskFailedEventAttributes{ ScheduledEventId: common.Int64Ptr(int64(28)), @@ -3979,12 +4024,12 @@ func (s *resetorSuite) TestApplyReset() { Reason: common.StringPtr("resetWFtest"), BaseRunId: common.StringPtr(forkRunID), NewRunId: common.StringPtr(newRunID), - ForkEventVersion: common.Int64Ptr(currentVersion), + ForkEventVersion: common.Int64Ptr(beforeResetVersion), }, }, { EventId: common.Int64Ptr(31), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(afterResetVersion), EventType: common.EventTypePtr(workflow.EventTypeActivityTaskFailed), ActivityTaskFailedEventAttributes: &workflow.ActivityTaskFailedEventAttributes{ Reason: common.StringPtr("resetWF"), @@ -3995,7 +4040,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(32), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(afterResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName1), @@ -4003,7 +4048,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(33), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(afterResetVersion), EventType: common.EventTypePtr(workflow.EventTypeWorkflowExecutionSignaled), WorkflowExecutionSignaledEventAttributes: &workflow.WorkflowExecutionSignaledEventAttributes{ SignalName: common.StringPtr(signalName2), @@ -4011,7 +4056,7 @@ func (s *resetorSuite) TestApplyReset() { }, { EventId: common.Int64Ptr(34), - Version: common.Int64Ptr(currentVersion), + Version: common.Int64Ptr(afterResetVersion), EventType: common.EventTypePtr(workflow.EventTypeDecisionTaskScheduled), DecisionTaskScheduledEventAttributes: &workflow.DecisionTaskScheduledEventAttributes{ TaskList: taskList, @@ -4119,8 +4164,16 @@ func (s *resetorSuite) TestApplyReset() { s.assertActivityIDs([]string{actIDRetry, actIDNotStarted}, resetReq.InsertActivityInfos) compareRepState := copyReplicationState(forkRepState) + compareRepState.StartVersion = beforeResetVersion + compareRepState.CurrentVersion = afterResetVersion compareRepState.LastWriteEventID = 34 - compareRepState.LastWriteVersion = currentVersion + compareRepState.LastWriteVersion = afterResetVersion + compareRepState.LastReplicationInfo = map[string]*p.ReplicationInfo{ + "standby": &p.ReplicationInfo{ + LastEventID: 29, + Version: beforeResetVersion, + }, + } s.Equal(compareRepState, resetReq.InsertReplicationState) s.Equal(0, len(resetReq.InsertReplicationTasks)) diff --git a/service/worker/replicator/replicator.go b/service/worker/replicator/replicator.go index 15ad74cd840..0b3aa90e955 100644 --- a/service/worker/replicator/replicator.go +++ b/service/worker/replicator/replicator.go @@ -96,11 +96,15 @@ func NewReplicator(clusterMetadata cluster.Metadata, metadataManagerV2 persisten // Start is called to start replicator func (r *Replicator) Start() error { currentClusterName := r.clusterMetadata.GetCurrentClusterName() - for cluster := range r.clusterMetadata.GetAllClusterFailoverVersions() { - if cluster != currentClusterName { - consumerName := getConsumerName(currentClusterName, cluster) + for clusterName, info := range r.clusterMetadata.GetAllClusterInfo() { + if !info.Enabled { + continue + } + + if clusterName != currentClusterName { + consumerName := getConsumerName(currentClusterName, clusterName) adminClient := admin.NewRetryableClient( - r.clientBean.GetRemoteAdminClient(cluster), + r.clientBean.GetRemoteAdminClient(clusterName), common.CreateAdminServiceRetryPolicy(), common.IsWhitelistServiceTransientError, ) @@ -109,7 +113,7 @@ func (r *Replicator) Start() error { common.CreateHistoryServiceRetryPolicy(), common.IsWhitelistServiceTransientError, ) - logger := r.logger.WithTags(tag.ComponentReplicationTaskProcessor, tag.SourceCluster(cluster), tag.KafkaConsumerName(consumerName)) + logger := r.logger.WithTags(tag.ComponentReplicationTaskProcessor, tag.SourceCluster(clusterName), tag.KafkaConsumerName(consumerName)) historyRereplicator := xdc.NewHistoryRereplicator( currentClusterName, r.domainCache, @@ -122,7 +126,7 @@ func (r *Replicator) Start() error { r.logger, ) r.processors = append(r.processors, newReplicationTaskProcessor( - currentClusterName, cluster, consumerName, r.client, + currentClusterName, clusterName, consumerName, r.client, r.config, logger, r.metricsClient, r.domainReplicator, historyRereplicator, r.historyClient, task.NewSequentialTaskProcessor(