Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate clustersInfo configuration in favor of new clusterMetadata… #1809

Merged
merged 5 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust

remoteAdminClients := map[string]admin.Client{}
remoteFrontendClients := map[string]frontend.Client{}
for cluster, address := range clusterMetadata.GetAllClientAddress() {
dispatcher, err := dispatcherProvider.Get(address.RPCName, address.RPCAddress)
for cluster, info := range clusterMetadata.GetAllClusterInfo() {
dispatcher, err := dispatcherProvider.Get(info.RPCName, info.RPCAddress)
if err != nil {
return nil, err
}

adminClient, err := factory.NewAdminClientWithTimeoutAndDispatcher(
address.RPCName,
info.RPCName,
admin.DefaultTimeout,
dispatcher,
)
Expand All @@ -102,7 +102,7 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
}

frontendclient, err := factory.NewFrontendClientWithTimeoutAndDispatcher(
address.RPCName,
info.RPCName,
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
dispatcher,
Expand Down
25 changes: 17 additions & 8 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"log"
"time"

"github.com/uber/cadence/common/cluster"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -123,22 +124,30 @@ func (s *server) startService() common.Daemon {
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger)
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
enableGlobalDomain := dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, s.cfg.ClustersInfo.EnableGlobalDomain)

archivalStatus := dc.GetStringProperty(dynamicconfig.ArchivalStatus, s.cfg.Archival.Status)
enableReadFromArchival := dc.GetBoolProperty(dynamicconfig.EnableReadFromArchival, s.cfg.Archival.EnableReadFromArchival)

params.DCRedirectionPolicy = s.cfg.DCRedirectionPolicy

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

clusterMetadata := s.cfg.ClusterMetadata
// TODO remove when ClustersInfo is fully deprecated
if len(s.cfg.ClustersInfo.CurrentClusterName) != 0 && len(s.cfg.ClusterMetadata.CurrentClusterName) != 0 {
log.Fatalf("cannot config both clustersInfo and clusterMetadata")
}
if len(s.cfg.ClustersInfo.CurrentClusterName) != 0 {
clusterMetadata = s.cfg.ClustersInfo.ToClusterMetadata()
}
params.ClusterMetadata = cluster.NewMetadata(
params.Logger,
params.MetricsClient,
enableGlobalDomain,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.MasterClusterName,
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterInitialFailoverVersions,
s.cfg.ClustersInfo.ClusterAddress,
dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, clusterMetadata.EnableGlobalDomain),
clusterMetadata.FailoverVersionIncrement,
clusterMetadata.MasterClusterName,
clusterMetadata.CurrentClusterName,
clusterMetadata.ClusterInformation,
archivalStatus,
s.cfg.Archival.DefaultBucket,
enableReadFromArchival,
Expand Down
7 changes: 6 additions & 1 deletion common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,17 @@ func newDomainCacheEntry(clusterMetadata cluster.Metadata) *DomainCacheEntry {
}

// NewDomainCacheEntryWithReplicationForTest returns an entry with test data
func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo, config *persistence.DomainConfig, repConfig *persistence.DomainReplicationConfig, clusterMetadata cluster.Metadata) *DomainCacheEntry {
func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo,
config *persistence.DomainConfig,
repConfig *persistence.DomainReplicationConfig,
failoverVersion int64,
clusterMetadata cluster.Metadata) *DomainCacheEntry {
return &DomainCacheEntry{
info: info,
config: config,
isGlobalDomain: true,
replicationConfig: repConfig,
failoverVersion: failoverVersion,
clusterMetadata: clusterMetadata,
}
}
Expand Down
106 changes: 49 additions & 57 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package cluster

import (
"fmt"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -45,12 +46,10 @@ type (
GetMasterClusterName() string
// GetCurrentClusterName return the current cluster name
GetCurrentClusterName() string
// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
GetAllClusterFailoverVersions() map[string]int64
// GetAllClusterInfo return the all cluster name -> corresponding info
GetAllClusterInfo() map[string]config.ClusterInformation
// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
ClusterNameForFailoverVersion(failoverVersion int64) string
// GetAllClientAddress return the frontend address for each cluster name
GetAllClientAddress() map[string]config.Address

// ArchivalConfig returns the archival config of the cluster
ArchivalConfig() *ArchivalConfig
Expand All @@ -62,17 +61,17 @@ type (
// EnableGlobalDomain whether the global domain is enabled,
// this attr should be discarded when cross DC is made public
enableGlobalDomain dynamicconfig.BoolPropertyFn
// failoverVersionIncrement is the increment of each cluster failover version
// failoverVersionIncrement is the increment of each cluster's version when failover happen
failoverVersionIncrement int64
// masterClusterName is the name of the master cluster, only the master cluster can register / update domain
// all clusters can do domain failover
masterClusterName string
// currentClusterName is the name of the current cluster
currentClusterName string
// clusterInitialFailoverVersions contains all cluster name -> corresponding initial failover version
clusterInitialFailoverVersions map[string]int64
// clusterInitialFailoverVersions contains all initial failover version -> corresponding cluster name
initialFailoverVersionClusters map[int64]string
// clusterInfo contains all cluster name -> corresponding information
clusterInfo map[string]config.ClusterInformation
// versionToClusterName contains all initial version -> corresponding cluster name
versionToClusterName map[int64]string
// clusterToAddress contains the cluster name to corresponding frontend client
clusterToAddress map[string]config.Address

Expand All @@ -93,50 +92,49 @@ func NewMetadata(
failoverVersionIncrement int64,
masterClusterName string,
currentClusterName string,
clusterInitialFailoverVersions map[string]int64,
clusterToAddress map[string]config.Address,
clusterInfo map[string]config.ClusterInformation,
archivalStatus dynamicconfig.StringPropertyFn,
defaultBucket string,
enableReadFromArchival dynamicconfig.BoolPropertyFn,
) Metadata {

if len(clusterInitialFailoverVersions) == 0 {
panic("Empty initial failover versions for cluster")
if len(clusterInfo) == 0 {
panic("Empty cluster information")
} else if len(masterClusterName) == 0 {
panic("Master cluster name is empty")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if failoverVersionIncrement == 0 {
panic("Version increment is 0")
}
initialFailoverVersionClusters := make(map[int64]string)
for clusterName, initialFailoverVersion := range clusterInitialFailoverVersions {
if failoverVersionIncrement <= initialFailoverVersion {

versionToClusterName := make(map[int64]string)
for clusterName, info := range clusterInfo {
if failoverVersionIncrement <= info.InitialFailoverVersion || info.InitialFailoverVersion < 0 {
panic(fmt.Sprintf(
"Failover version increment %v is smaller than initial value: %v.",
"Version increment %v is smaller than initial version: %v.",
failoverVersionIncrement,
clusterInitialFailoverVersions,
info.InitialFailoverVersion,
))
}
if len(clusterName) == 0 {
panic("Cluster name in all cluster names is empty")
}
initialFailoverVersionClusters[initialFailoverVersion] = clusterName
versionToClusterName[info.InitialFailoverVersion] = clusterName

if info.Enabled && (len(info.RPCName) == 0 || len(info.RPCAddress) == 0) {
panic(fmt.Sprintf("Cluster %v: rpc name / address is empty", clusterName))
}
}

if _, ok := clusterInitialFailoverVersions[currentClusterName]; !ok {
panic("Current cluster is not specified in all cluster names")
if _, ok := clusterInfo[currentClusterName]; !ok {
panic("Current cluster is not specified in cluster info")
}
if _, ok := clusterInitialFailoverVersions[masterClusterName]; !ok {
panic("Master cluster is not specified in all cluster names")
if _, ok := clusterInfo[masterClusterName]; !ok {
panic("Master cluster is not specified in cluster info")
}
if len(initialFailoverVersionClusters) != len(clusterInitialFailoverVersions) {
panic("Cluster to initial failover versions have duplicate initial versions")
}

// only check whether a cluster in cluster -> initial failover versions exists in cluster -> address
for clusterName := range clusterInitialFailoverVersions {
if _, ok := clusterToAddress[clusterName]; !ok {
panic("Cluster -> initial failover version does not have an address")
}
if len(versionToClusterName) != len(clusterInfo) {
panic("Cluster info initial versions have duplicates")
}

status, err := getArchivalStatus(archivalStatus())
Expand All @@ -149,18 +147,17 @@ func NewMetadata(
}

return &metadataImpl{
logger: logger,
metricsClient: metricsClient,
enableGlobalDomain: enableGlobalDomain,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInitialFailoverVersions: clusterInitialFailoverVersions,
initialFailoverVersionClusters: initialFailoverVersionClusters,
clusterToAddress: clusterToAddress,
archivalStatus: archivalStatus,
defaultBucket: defaultBucket,
enableReadFromArchival: enableReadFromArchival,
logger: logger,
metricsClient: metricsClient,
enableGlobalDomain: enableGlobalDomain,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInfo: clusterInfo,
versionToClusterName: versionToClusterName,
archivalStatus: archivalStatus,
defaultBucket: defaultBucket,
enableReadFromArchival: enableReadFromArchival,
}
}

Expand All @@ -172,15 +169,15 @@ func (metadata *metadataImpl) IsGlobalDomainEnabled() bool {

// GetNextFailoverVersion return the next failover version based on input
func (metadata *metadataImpl) GetNextFailoverVersion(cluster string, currentFailoverVersion int64) int64 {
initialFailoverVersion, ok := metadata.clusterInitialFailoverVersions[cluster]
info, ok := metadata.clusterInfo[cluster]
if !ok {
panic(fmt.Sprintf(
"Unknown cluster name: %v with given cluster initial failover version map: %v.",
cluster,
metadata.clusterInitialFailoverVersions,
metadata.clusterInfo,
))
}
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + initialFailoverVersion
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + info.InitialFailoverVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + metadata.failoverVersionIncrement
}
Expand All @@ -206,31 +203,26 @@ func (metadata *metadataImpl) GetCurrentClusterName() string {
return metadata.currentClusterName
}

// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
func (metadata *metadataImpl) GetAllClusterFailoverVersions() map[string]int64 {
return metadata.clusterInitialFailoverVersions
// GetAllClusterInfo return the all cluster name -> corresponding information
func (metadata *metadataImpl) GetAllClusterInfo() map[string]config.ClusterInformation {
return metadata.clusterInfo
}

// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int64) string {
initialFailoverVersion := failoverVersion % metadata.failoverVersionIncrement
clusterName, ok := metadata.initialFailoverVersionClusters[initialFailoverVersion]
clusterName, ok := metadata.versionToClusterName[initialFailoverVersion]
if !ok {
panic(fmt.Sprintf(
"Unknown initial failover version %v with given cluster initial failover version map: %v and failover version increment %v.",
initialFailoverVersion,
metadata.clusterInitialFailoverVersions,
metadata.clusterInfo,
metadata.failoverVersionIncrement,
))
}
return clusterName
}

// GetAllClientAddress return the frontend address for each cluster name
func (metadata *metadataImpl) GetAllClientAddress() map[string]config.Address {
return metadata.clusterToAddress
}

// ArchivalConfig returns the archival config of the cluster.
// This method always return a well formed ArchivalConfig (this means ArchivalConfig().IsValid always returns true).
func (metadata *metadataImpl) ArchivalConfig() (retCfg *ArchivalConfig) {
Expand Down
44 changes: 24 additions & 20 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,32 @@ const (
var (
// TestAllClusterNames is the all cluster names used for test
TestAllClusterNames = []string{TestCurrentClusterName, TestAlternativeClusterName}
// TestAllClusterFailoverVersions is the same as above, juse convinent for test mocking
TestAllClusterFailoverVersions = map[string]int64{
TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion,
TestAlternativeClusterName: TestAlternativeClusterInitialFailoverVersion,
}
// TestAllClusterAddress is the same as above, juse convinent for test mocking
TestAllClusterAddress = map[string]config.Address{
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
TestAlternativeClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestAlternativeClusterFrontendAddress},
// TestAllClusterInfo is the same as above, just convenient for test mocking
TestAllClusterInfo = map[string]config.ClusterInformation{
TestCurrentClusterName: config.ClusterInformation{
Enabled: true,
InitialFailoverVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestCurrentClusterFrontendAddress,
},
TestAlternativeClusterName: config.ClusterInformation{
Enabled: true,
InitialFailoverVersion: TestAlternativeClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestAlternativeClusterFrontendAddress,
},
}

// TestSingleDCAllClusterNames is the all cluster names used for test
TestSingleDCAllClusterNames = []string{TestCurrentClusterName}
// TestSingleDCAllClusterFailoverVersions is the same as above, juse convinent for test mocking
TestSingleDCAllClusterFailoverVersions = map[string]int64{
TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion,
}
// TestSingleDCAllClusterAddress is the same as above, juse convinent for test mocking
TestSingleDCAllClusterAddress = map[string]config.Address{
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
// TestSingleDCClusterInfo is the same as above, just convenient for test mocking
TestSingleDCClusterInfo = map[string]config.ClusterInformation{
TestCurrentClusterName: config.ClusterInformation{
Enabled: true,
InitialFailoverVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCAddress: TestCurrentClusterFrontendAddress,
},
}
)

Expand All @@ -92,8 +98,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl
TestFailoverVersionIncrement,
masterClusterName,
TestCurrentClusterName,
TestAllClusterFailoverVersions,
TestAllClusterAddress,
TestAllClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
Expand All @@ -107,8 +112,7 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl
TestFailoverVersionIncrement,
TestCurrentClusterName,
TestCurrentClusterName,
TestSingleDCAllClusterFailoverVersions,
TestSingleDCAllClusterAddress,
TestSingleDCClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
Expand Down
1 change: 0 additions & 1 deletion common/messaging/kafkaConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (k *KafkaConfig) Validate(checkCluster bool, checkApp bool) {
}
for _, topics := range k.ClusterToTopic {
validateTopicsFn(topics.Topic)
validateTopicsFn(topics.RetryTopic)
validateTopicsFn(topics.DLQTopic)
}
}
Expand Down
Loading