Skip to content

Commit

Permalink
Deprecate clustersInfo configuration in favor of new clustersInformat…
Browse files Browse the repository at this point in the history
…ion configuration, which provides better flexbility
  • Loading branch information
Wenquan Xing committed May 8, 2019
1 parent 2236052 commit eb6742a
Show file tree
Hide file tree
Showing 37 changed files with 588 additions and 426 deletions.
8 changes: 4 additions & 4 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust

remoteAdminClients := map[string]admin.Client{}
remoteFrontendClients := map[string]frontend.Client{}
for cluster, address := range clusterMetadata.GetAllClientAddress() {
dispatcher, err := dispatcherProvider.Get(address.RPCName, address.RPCAddress)
for cluster, info := range clusterMetadata.GetAllClusterInfo() {
dispatcher, err := dispatcherProvider.Get(info.RPCName, info.RPCAddress)
if err != nil {
return nil, err
}

adminClient, err := factory.NewAdminClientWithTimeoutAndDispatcher(
address.RPCName,
info.RPCName,
admin.DefaultTimeout,
dispatcher,
)
Expand All @@ -102,7 +102,7 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
}

frontendclient, err := factory.NewFrontendClientWithTimeoutAndDispatcher(
address.RPCName,
info.RPCName,
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
dispatcher,
Expand Down
25 changes: 17 additions & 8 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"log"
"time"

"github.com/uber/cadence/common/cluster"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -123,22 +124,30 @@ func (s *server) startService() common.Daemon {
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger)
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
enableGlobalDomain := dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, s.cfg.ClustersInfo.EnableGlobalDomain)

archivalStatus := dc.GetStringProperty(dynamicconfig.ArchivalStatus, s.cfg.Archival.Status)
enableReadFromArchival := dc.GetBoolProperty(dynamicconfig.EnableReadFromArchival, s.cfg.Archival.EnableReadFromArchival)

params.DCRedirectionPolicy = s.cfg.DCRedirectionPolicy

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

clustersInformation := s.cfg.ClustersInformation
// TODO remove when ClustersInfo is fully deprecated
if len(s.cfg.ClustersInfo.CurrentClusterName) != 0 && len(s.cfg.ClustersInformation.CurrentClusterName) != 0 {
log.Fatalf("cannot config both clustersInfo and clustersInformation")
}
if len(s.cfg.ClustersInfo.CurrentClusterName) != 0 {
clustersInformation = s.cfg.ClustersInfo.ToClusterInformation()
}
params.ClusterMetadata = cluster.NewMetadata(
params.Logger,
params.MetricsClient,
enableGlobalDomain,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.MasterClusterName,
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterInitialFailoverVersions,
s.cfg.ClustersInfo.ClusterAddress,
dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, clustersInformation.EnableGlobalDomain),
clustersInformation.VersionIncrement,
clustersInformation.MasterClusterName,
clustersInformation.CurrentClusterName,
clustersInformation.ClusterInformation,
archivalStatus,
s.cfg.Archival.DefaultBucket,
enableReadFromArchival,
Expand Down
7 changes: 6 additions & 1 deletion common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,17 @@ func newDomainCacheEntry(clusterMetadata cluster.Metadata) *DomainCacheEntry {
}

// NewDomainCacheEntryWithReplicationForTest returns an entry with test data
func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo, config *persistence.DomainConfig, repConfig *persistence.DomainReplicationConfig, clusterMetadata cluster.Metadata) *DomainCacheEntry {
func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo,
config *persistence.DomainConfig,
repConfig *persistence.DomainReplicationConfig,
failoverVersion int64,
clusterMetadata cluster.Metadata) *DomainCacheEntry {
return &DomainCacheEntry{
info: info,
config: config,
isGlobalDomain: true,
replicationConfig: repConfig,
failoverVersion: failoverVersion,
clusterMetadata: clusterMetadata,
}
}
Expand Down
122 changes: 57 additions & 65 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package cluster

import (
"fmt"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -45,12 +46,10 @@ type (
GetMasterClusterName() string
// GetCurrentClusterName return the current cluster name
GetCurrentClusterName() string
// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
GetAllClusterFailoverVersions() map[string]int64
// GetAllClusterInfo return the all cluster name -> corresponding info
GetAllClusterInfo() map[string]config.ClusterInformation
// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
ClusterNameForFailoverVersion(failoverVersion int64) string
// GetAllClientAddress return the frontend address for each cluster name
GetAllClientAddress() map[string]config.Address

// ArchivalConfig returns the archival config of the cluster
ArchivalConfig() *ArchivalConfig
Expand All @@ -62,17 +61,17 @@ type (
// EnableGlobalDomain whether the global domain is enabled,
// this attr should be discarded when cross DC is made public
enableGlobalDomain dynamicconfig.BoolPropertyFn
// failoverVersionIncrement is the increment of each cluster failover version
failoverVersionIncrement int64
// versionIncrement is the increment of each cluster's version when failover happen
versionIncrement int64
// masterClusterName is the name of the master cluster, only the master cluster can register / update domain
// all clusters can do domain failover
masterClusterName string
// currentClusterName is the name of the current cluster
currentClusterName string
// clusterInitialFailoverVersions contains all cluster name -> corresponding initial failover version
clusterInitialFailoverVersions map[string]int64
// clusterInitialFailoverVersions contains all initial failover version -> corresponding cluster name
initialFailoverVersionClusters map[int64]string
// clusterInfo contains all cluster name -> corresponding information
clusterInfo map[string]config.ClusterInformation
// versionToClusterName contains all initial version -> corresponding cluster name
versionToClusterName map[int64]string
// clusterToAddress contains the cluster name to corresponding frontend client
clusterToAddress map[string]config.Address

Expand All @@ -90,53 +89,52 @@ func NewMetadata(
logger log.Logger,
metricsClient metrics.Client,
enableGlobalDomain dynamicconfig.BoolPropertyFn,
failoverVersionIncrement int64,
versionIncrement int64,
masterClusterName string,
currentClusterName string,
clusterInitialFailoverVersions map[string]int64,
clusterToAddress map[string]config.Address,
clusterInfo map[string]config.ClusterInformation,
archivalStatus dynamicconfig.StringPropertyFn,
defaultBucket string,
enableReadFromArchival dynamicconfig.BoolPropertyFn,
) Metadata {

if len(clusterInitialFailoverVersions) == 0 {
panic("Empty initial failover versions for cluster")
if len(clusterInfo) == 0 {
panic("Empty cluster information")
} else if len(masterClusterName) == 0 {
panic("Master cluster name is empty")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if versionIncrement == 0 {
panic("Version increment is 0")
}
initialFailoverVersionClusters := make(map[int64]string)
for clusterName, initialFailoverVersion := range clusterInitialFailoverVersions {
if failoverVersionIncrement <= initialFailoverVersion {

versionToClusterName := make(map[int64]string)
for clusterName, info := range clusterInfo {
if versionIncrement <= info.InitialVersion || info.InitialVersion < 0 {
panic(fmt.Sprintf(
"Failover version increment %v is smaller than initial value: %v.",
failoverVersionIncrement,
clusterInitialFailoverVersions,
"Version increment %v is smaller than initial version: %v.",
versionIncrement,
info.InitialVersion,
))
}
if len(clusterName) == 0 {
panic("Cluster name in all cluster names is empty")
}
initialFailoverVersionClusters[initialFailoverVersion] = clusterName
versionToClusterName[info.InitialVersion] = clusterName

if info.Enabled && (len(info.RPCName) == 0 || len(info.RPCAddress) == 0) {
panic(fmt.Sprintf("Cluster %v: rpc name / address is empty", clusterName))
}
}

if _, ok := clusterInitialFailoverVersions[currentClusterName]; !ok {
panic("Current cluster is not specified in all cluster names")
if _, ok := clusterInfo[currentClusterName]; !ok {
panic("Current cluster is not specified in cluster info")
}
if _, ok := clusterInitialFailoverVersions[masterClusterName]; !ok {
panic("Master cluster is not specified in all cluster names")
if _, ok := clusterInfo[masterClusterName]; !ok {
panic("Master cluster is not specified in cluster info")
}
if len(initialFailoverVersionClusters) != len(clusterInitialFailoverVersions) {
panic("Cluster to initial failover versions have duplicate initial versions")
}

// only check whether a cluster in cluster -> initial failover versions exists in cluster -> address
for clusterName := range clusterInitialFailoverVersions {
if _, ok := clusterToAddress[clusterName]; !ok {
panic("Cluster -> initial failover version does not have an address")
}
if len(versionToClusterName) != len(clusterInfo) {
panic("Cluster info initial versions have duplicates")
}

status, err := getArchivalStatus(archivalStatus())
Expand All @@ -149,18 +147,17 @@ func NewMetadata(
}

return &metadataImpl{
logger: logger,
metricsClient: metricsClient,
enableGlobalDomain: enableGlobalDomain,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInitialFailoverVersions: clusterInitialFailoverVersions,
initialFailoverVersionClusters: initialFailoverVersionClusters,
clusterToAddress: clusterToAddress,
archivalStatus: archivalStatus,
defaultBucket: defaultBucket,
enableReadFromArchival: enableReadFromArchival,
logger: logger,
metricsClient: metricsClient,
enableGlobalDomain: enableGlobalDomain,
versionIncrement: versionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInfo: clusterInfo,
versionToClusterName: versionToClusterName,
archivalStatus: archivalStatus,
defaultBucket: defaultBucket,
enableReadFromArchival: enableReadFromArchival,
}
}

Expand All @@ -172,24 +169,24 @@ func (metadata *metadataImpl) IsGlobalDomainEnabled() bool {

// GetNextFailoverVersion return the next failover version based on input
func (metadata *metadataImpl) GetNextFailoverVersion(cluster string, currentFailoverVersion int64) int64 {
initialFailoverVersion, ok := metadata.clusterInitialFailoverVersions[cluster]
info, ok := metadata.clusterInfo[cluster]
if !ok {
panic(fmt.Sprintf(
"Unknown cluster name: %v with given cluster initial failover version map: %v.",
cluster,
metadata.clusterInitialFailoverVersions,
metadata.clusterInfo,
))
}
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + initialFailoverVersion
failoverVersion := currentFailoverVersion/metadata.versionIncrement*metadata.versionIncrement + info.InitialVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + metadata.failoverVersionIncrement
return failoverVersion + metadata.versionIncrement
}
return failoverVersion
}

// IsVersionFromSameCluster return true if 2 version are used for the same cluster
func (metadata *metadataImpl) IsVersionFromSameCluster(version1 int64, version2 int64) bool {
return (version1-version2)%metadata.failoverVersionIncrement == 0
return (version1-version2)%metadata.versionIncrement == 0
}

func (metadata *metadataImpl) IsMasterCluster() bool {
Expand All @@ -206,31 +203,26 @@ func (metadata *metadataImpl) GetCurrentClusterName() string {
return metadata.currentClusterName
}

// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
func (metadata *metadataImpl) GetAllClusterFailoverVersions() map[string]int64 {
return metadata.clusterInitialFailoverVersions
// GetAllClusterInfo return the all cluster name -> corresponding information
func (metadata *metadataImpl) GetAllClusterInfo() map[string]config.ClusterInformation {
return metadata.clusterInfo
}

// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int64) string {
initialFailoverVersion := failoverVersion % metadata.failoverVersionIncrement
clusterName, ok := metadata.initialFailoverVersionClusters[initialFailoverVersion]
initialVersion := failoverVersion % metadata.versionIncrement
clusterName, ok := metadata.versionToClusterName[initialVersion]
if !ok {
panic(fmt.Sprintf(
"Unknown initial failover version %v with given cluster initial failover version map: %v and failover version increment %v.",
initialFailoverVersion,
metadata.clusterInitialFailoverVersions,
metadata.failoverVersionIncrement,
initialVersion,
metadata.clusterInfo,
metadata.versionIncrement,
))
}
return clusterName
}

// GetAllClientAddress return the frontend address for each cluster name
func (metadata *metadataImpl) GetAllClientAddress() map[string]config.Address {
return metadata.clusterToAddress
}

// ArchivalConfig returns the archival config of the cluster.
// This method always return a well formed ArchivalConfig (this means ArchivalConfig().IsValid always returns true).
func (metadata *metadataImpl) ArchivalConfig() (retCfg *ArchivalConfig) {
Expand Down
44 changes: 24 additions & 20 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,32 @@ const (
var (
// TestAllClusterNames is the all cluster names used for test
TestAllClusterNames = []string{TestCurrentClusterName, TestAlternativeClusterName}
// TestAllClusterFailoverVersions is the same as above, juse convinent for test mocking
TestAllClusterFailoverVersions = map[string]int64{
TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion,
TestAlternativeClusterName: TestAlternativeClusterInitialFailoverVersion,
}
// TestAllClusterAddress is the same as above, juse convinent for test mocking
TestAllClusterAddress = map[string]config.Address{
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
TestAlternativeClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestAlternativeClusterFrontendAddress},
// TestAllClusterInfo is the same as above, just convenient for test mocking
TestAllClusterInfo = map[string]config.ClusterInformation{
TestCurrentClusterName: config.ClusterInformation{
Enabled: true,
InitialVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestCurrentClusterFrontendAddress,
},
TestAlternativeClusterName: config.ClusterInformation{
Enabled: true,
InitialVersion: TestAlternativeClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestAlternativeClusterFrontendAddress,
},
}

// TestSingleDCAllClusterNames is the all cluster names used for test
TestSingleDCAllClusterNames = []string{TestCurrentClusterName}
// TestSingleDCAllClusterFailoverVersions is the same as above, juse convinent for test mocking
TestSingleDCAllClusterFailoverVersions = map[string]int64{
TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion,
}
// TestSingleDCAllClusterAddress is the same as above, juse convinent for test mocking
TestSingleDCAllClusterAddress = map[string]config.Address{
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
// TestSingleDCClusterInfo is the same as above, just convenient for test mocking
TestSingleDCClusterInfo = map[string]config.ClusterInformation{
TestCurrentClusterName: config.ClusterInformation{
Enabled: true,
InitialVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestCurrentClusterFrontendAddress,
},
}
)

Expand All @@ -92,8 +98,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl
TestFailoverVersionIncrement,
masterClusterName,
TestCurrentClusterName,
TestAllClusterFailoverVersions,
TestAllClusterAddress,
TestAllClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
Expand All @@ -107,8 +112,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl
TestFailoverVersionIncrement,
TestCurrentClusterName,
TestCurrentClusterName,
TestSingleDCAllClusterFailoverVersions,
TestSingleDCAllClusterAddress,
TestSingleDCClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
Expand Down
1 change: 0 additions & 1 deletion common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (k *KafkaConfig) Validate(checkCluster bool, checkApp bool) {
}
for _, topics := range k.ClusterToTopic {
validateTopicsFn(topics.Topic)
validateTopicsFn(topics.RetryTopic)
validateTopicsFn(topics.DLQTopic)
}
}
Expand Down
Loading

0 comments on commit eb6742a

Please sign in to comment.