Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable archival config per domain #1351

Merged
merged 26 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3892b28
make schema changes
andrewjdawson2016 Dec 18, 2018
6c8706f
Update IDL
andrewjdawson2016 Dec 18, 2018
4c39ff3
plumb through persistence changes and update workflow handler
andrewjdawson2016 Dec 19, 2018
497f1d5
make bins works and make fmt
andrewjdawson2016 Dec 19, 2018
b3fb11b
code complete
andrewjdawson2016 Dec 19, 2018
b01418a
make fmt clean bins
andrewjdawson2016 Dec 19, 2018
63a24d6
remove constant
andrewjdawson2016 Dec 19, 2018
f310e5c
rename thrift field
andrewjdawson2016 Dec 19, 2018
3424944
clean up code in workflow handler and add correct error cases
andrewjdawson2016 Dec 19, 2018
63a34af
run make fmt
andrewjdawson2016 Dec 19, 2018
3ea96dc
code complete
andrewjdawson2016 Dec 19, 2018
c462b29
plumb through deployment group through config
andrewjdawson2016 Dec 19, 2018
10b7c09
update metadataPersistenceTests
andrewjdawson2016 Dec 19, 2018
9c8e01f
update metadataPersistenceTestsV2
andrewjdawson2016 Dec 19, 2018
6ac9617
persistence tests passes
andrewjdawson2016 Dec 19, 2018
b6215da
make domain cache tests pass
andrewjdawson2016 Dec 19, 2018
e79700e
all unit tests pass
andrewjdawson2016 Dec 20, 2018
8becaf6
Get started adding unit tests for workflow handler
andrewjdawson2016 Dec 20, 2018
b2c332e
finish workflow handler tests for register
andrewjdawson2016 Dec 20, 2018
62ccfae
add unit tests for describe domain
andrewjdawson2016 Dec 20, 2018
cacf53a
make progress on update domain unit tests
andrewjdawson2016 Dec 20, 2018
f951cda
finish unit testing workflow handler
andrewjdawson2016 Dec 20, 2018
cf07df4
run make fmt
andrewjdawson2016 Dec 20, 2018
1873918
update cli command comment
andrewjdawson2016 Dec 20, 2018
f01b198
run make fmt
andrewjdawson2016 Dec 21, 2018
b292265
regenerate thrift files
andrewjdawson2016 Dec 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

446 changes: 440 additions & 6 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *server) startService() common.Daemon {
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterInitialFailoverVersions,
s.cfg.ClustersInfo.ClusterAddress,
s.cfg.ClustersInfo.DeploymentGroup,
)
params.DispatcherProvider = client.NewIPYarpcDispatcherProvider()
// TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop
Expand Down
14 changes: 13 additions & 1 deletion common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
ClusterNameForFailoverVersion(failoverVersion int64) string
// GetAllClientAddress return the frontend address for each cluster name
GetAllClientAddress() map[string]config.Address
// GetDeploymentGroup returns the deployment group of cluster
GetDeploymentGroup() string
}

metadataImpl struct {
Expand All @@ -68,21 +70,25 @@ type (
initialFailoverVersionClusters map[int64]string
// clusterToAddress contains the cluster name to corresponding frontend client
clusterToAddress map[string]config.Address
// deploymentGroup is the deployment group name of cluster
deploymentGroup string
}
)

// NewMetadata create a new instance of Metadata
func NewMetadata(enableGlobalDomain dynamicconfig.BoolPropertyFn, failoverVersionIncrement int64,
masterClusterName string, currentClusterName string,
clusterInitialFailoverVersions map[string]int64,
clusterToAddress map[string]config.Address) Metadata {
clusterToAddress map[string]config.Address, deploymentGroup string) Metadata {

if len(clusterInitialFailoverVersions) < 0 {
panic("Empty initial failover versions for cluster")
} else if len(masterClusterName) == 0 {
panic("Master cluster name is empty")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if len(deploymentGroup) == 0 {
panic("Deployment group name is empty")
}
initialFailoverVersionClusters := make(map[int64]string)
for clusterName, initialFailoverVersion := range clusterInitialFailoverVersions {
Expand Down Expand Up @@ -120,6 +126,7 @@ func NewMetadata(enableGlobalDomain dynamicconfig.BoolPropertyFn, failoverVersio
clusterInitialFailoverVersions: clusterInitialFailoverVersions,
initialFailoverVersionClusters: initialFailoverVersionClusters,
clusterToAddress: clusterToAddress,
deploymentGroup: deploymentGroup,
}
}

Expand Down Expand Up @@ -189,3 +196,8 @@ func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int6
func (metadata *metadataImpl) GetAllClientAddress() map[string]config.Address {
return metadata.clusterToAddress
}

// GetDeploymentGroup returns the deployment group name for cluster
func (metadata *metadataImpl) GetDeploymentGroup() string {
return metadata.deploymentGroup
}
3 changes: 3 additions & 0 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
TestCurrentClusterFrontendAddress = "127.0.0.1:7104"
// TestAlternativeClusterFrontendAddress is the ip port address of alternative cluster
TestAlternativeClusterFrontendAddress = "127.0.0.1:8104"
// TestDeploymentGroup is alternative deployment group used for test
TestDeploymentGroup = "test"
)

var (
Expand Down Expand Up @@ -69,5 +71,6 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool) Metad
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
TestAlternativeClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestAlternativeClusterFrontendAddress},
},
TestDeploymentGroup,
)
}
5 changes: 5 additions & 0 deletions common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause)
return &t
}

// ArchivalStatusPtr makes a copy and returns the pointer to an ArchivalStatus.
func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus {
return &t
}

// StringDefault returns value if string pointer is set otherwise default value of string
func StringDefault(v *string) string {
var defaultString string
Expand Down
14 changes: 14 additions & 0 deletions common/mocks/ClusterMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ func (_m *ClusterMetadata) GetMasterClusterName() string {
return r0
}

// GetDeploymentGroup provides a mock function with given fields:
func (_m *ClusterMetadata) GetDeploymentGroup() string {
ret := _m.Called()

var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}

return r0
}

// GetNextFailoverVersion provides a mock function with given fields: _a0, _a1
func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 {
ret := _m.Called(_a0, _a1)
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/cassandra/cassandraMetadataPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ const (

templateDomainConfigType = `{` +
`retention: ?, ` +
`emit_metric: ?` +
`emit_metric: ?, ` +
`archival_bucket: ?, ` +
`archival_status: ?` +
`}`

templateDomainReplicationConfigType = `{` +
Expand All @@ -64,6 +66,7 @@ const (

templateGetDomainByNameQuery = `SELECT domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, domain.data, config.retention, config.emit_metric, ` +
`config.archival_bucket, config.archival_status, ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`is_global_domain, ` +
`config_version, ` +
Expand Down Expand Up @@ -152,6 +155,8 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *p.CreateDomainReque
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.IsGlobalDomain,
Expand Down Expand Up @@ -245,6 +250,8 @@ func (m *cassandraMetadataPersistence) GetDomain(request *p.GetDomainRequest) (*
&info.Data,
&config.Retention,
&config.EmitMetric,
&config.ArchivalBucket,
&config.ArchivalStatus,
&replicationConfig.ActiveClusterName,
&replicationClusters,
&isGlobalDomain,
Expand Down Expand Up @@ -289,6 +296,8 @@ func (m *cassandraMetadataPersistence) UpdateDomain(request *p.UpdateDomainReque
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.ConfigVersion,
Expand Down Expand Up @@ -330,7 +339,7 @@ func (m *cassandraMetadataPersistence) DeleteDomain(request *p.DeleteDomainReque
func (m *cassandraMetadataPersistence) DeleteDomainByName(request *p.DeleteDomainByNameRequest) error {
var ID string
query := m.session.Query(templateGetDomainByNameQuery, request.Name)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
if err == gocql.ErrNotFound {
return nil
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/cassandra/cassandraMetadataPersistenceV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (

templateGetDomainByNameQueryV2 = `SELECT domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, domain.data, config.retention, config.emit_metric, ` +
`config.archival_bucket, config.archival_status, ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`is_global_domain, ` +
`config_version, ` +
Expand Down Expand Up @@ -78,6 +79,7 @@ const (

templateListDomainQueryV2 = `SELECT name, domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, domain.data, config.retention, config.emit_metric, ` +
`config.archival_bucket, config.archival_status, ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`is_global_domain, ` +
`config_version, ` +
Expand Down Expand Up @@ -157,6 +159,8 @@ func (m *cassandraMetadataPersistenceV2) CreateDomain(request *p.CreateDomainReq
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.IsGlobalDomain,
Expand Down Expand Up @@ -213,6 +217,8 @@ func (m *cassandraMetadataPersistenceV2) UpdateDomain(request *p.UpdateDomainReq
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.ConfigVersion,
Expand Down Expand Up @@ -303,6 +309,8 @@ func (m *cassandraMetadataPersistenceV2) GetDomain(request *p.GetDomainRequest)
&info.Data,
&config.Retention,
&config.EmitMetric,
&config.ArchivalBucket,
&config.ArchivalStatus,
&replicationConfig.ActiveClusterName,
&replicationClusters,
&isGlobalDomain,
Expand Down Expand Up @@ -360,12 +368,13 @@ func (m *cassandraMetadataPersistenceV2) ListDomains(request *p.ListDomainsReque
&name,
&domain.Info.ID, &domain.Info.Name, &domain.Info.Status, &domain.Info.Description, &domain.Info.OwnerEmail, &domain.Info.Data,
&domain.Config.Retention, &domain.Config.EmitMetric,
&domain.Config.ArchivalBucket, &domain.Config.ArchivalStatus,
&domain.ReplicationConfig.ActiveClusterName, &replicationClusters,
&domain.IsGlobalDomain, &domain.ConfigVersion, &domain.FailoverVersion,
&domain.FailoverNotificationVersion, &domain.NotificationVersion,
) {
if name != domainMetadataRecordName {
// do not inlcude the metadata record
// do not include the metadata record
if domain.Info.Data == nil {
domain.Info.Data = map[string]string{}
}
Expand Down Expand Up @@ -411,7 +420,7 @@ func (m *cassandraMetadataPersistenceV2) DeleteDomain(request *p.DeleteDomainReq
func (m *cassandraMetadataPersistenceV2) DeleteDomainByName(request *p.DeleteDomainByNameRequest) error {
var ID string
query := m.session.Query(templateGetDomainByNameQueryV2, constDomainPartition, request.Name)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
if err == gocql.ErrNotFound {
return nil
Expand Down
6 changes: 4 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,10 @@ type (
// DomainConfig describes the domain configuration
DomainConfig struct {
// NOTE: this retention is in days, not in seconds
Retention int32
EmitMetric bool
Retention int32
EmitMetric bool
ArchivalBucket string
ArchivalStatus workflow.ArchivalStatus
}

// DomainReplicationConfig describes the cross DC domain replication configuration
Expand Down
Loading