Skip to content

Commit

Permalink
Support xdc domain archival config (uber#1373)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Jan 7, 2019
1 parent ced0e3b commit 90a79cd
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 12 deletions.
2 changes: 2 additions & 0 deletions service/frontend/domainReplicationTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (domainReplicator *domainReplicatorImpl) HandleTransmissionTask(domainOpera
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(config.Retention),
EmitMetric: common.BoolPtr(config.EmitMetric),
ArchivalBucketName: common.StringPtr(config.ArchivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(config.ArchivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(replicationConfig.ActiveClusterName),
Expand Down
36 changes: 28 additions & 8 deletions service/frontend/domainReplicationTaskHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask_Is
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -105,8 +107,10 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask_Is
Data: data,
}
config := &p.DomainConfig{
Retention: retention,
EmitMetric: emitMetric,
Retention: retention,
EmitMetric: emitMetric,
ArchivalBucket: archivalBucket,
ArchivalStatus: archivalStatus,
}
replicationConfig := &p.DomainReplicationConfig{
ActiveClusterName: clusterActive,
Expand All @@ -129,6 +133,8 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask_Is
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand All @@ -151,6 +157,8 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask_No
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -174,8 +182,10 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask_No
Data: data,
}
config := &p.DomainConfig{
Retention: retention,
EmitMetric: emitMetric,
Retention: retention,
EmitMetric: emitMetric,
ArchivalBucket: archivalBucket,
ArchivalStatus: archivalStatus,
}
replicationConfig := &p.DomainReplicationConfig{
ActiveClusterName: clusterActive,
Expand All @@ -197,6 +207,8 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask_IsGl
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -220,8 +232,10 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask_IsGl
Data: data,
}
config := &p.DomainConfig{
Retention: retention,
EmitMetric: emitMetric,
Retention: retention,
EmitMetric: emitMetric,
ArchivalBucket: archivalBucket,
ArchivalStatus: archivalStatus,
}
replicationConfig := &p.DomainReplicationConfig{
ActiveClusterName: clusterActive,
Expand All @@ -244,6 +258,8 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask_IsGl
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand All @@ -266,6 +282,8 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask_NotG
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -289,8 +307,10 @@ func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask_NotG
Data: data,
}
config := &p.DomainConfig{
Retention: retention,
EmitMetric: emitMetric,
Retention: retention,
EmitMetric: emitMetric,
ArchivalBucket: archivalBucket,
ArchivalStatus: archivalStatus,
}
replicationConfig := &p.DomainReplicationConfig{
ActiveClusterName: clusterActive,
Expand Down
12 changes: 8 additions & 4 deletions service/worker/replicator/domainReplicationTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTas
Data: task.Info.Data,
},
Config: &persistence.DomainConfig{
Retention: task.Config.GetWorkflowExecutionRetentionPeriodInDays(),
EmitMetric: task.Config.GetEmitMetric(),
Retention: task.Config.GetWorkflowExecutionRetentionPeriodInDays(),
EmitMetric: task.Config.GetEmitMetric(),
ArchivalBucket: task.Config.GetArchivalBucketName(),
ArchivalStatus: task.Config.GetArchivalStatus(),
},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: task.ReplicationConfig.GetActiveClusterName(),
Expand Down Expand Up @@ -168,8 +170,10 @@ func (domainReplicator *domainReplicatorImpl) handleDomainUpdateReplicationTask(
Data: task.Info.Data,
}
request.Config = &persistence.DomainConfig{
Retention: task.Config.GetWorkflowExecutionRetentionPeriodInDays(),
EmitMetric: task.Config.GetEmitMetric(),
Retention: task.Config.GetWorkflowExecutionRetentionPeriodInDays(),
EmitMetric: task.Config.GetEmitMetric(),
ArchivalBucket: task.Config.GetArchivalBucketName(),
ArchivalStatus: task.Config.GetArchivalStatus(),
}
request.ReplicationConfig.Clusters = domainReplicator.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters)
request.ConfigVersion = task.GetConfigVersion()
Expand Down
47 changes: 47 additions & 0 deletions service/worker/replicator/domainReplicationTaskHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_RegisterDomainTask() {
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -109,6 +111,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_RegisterDomainTask() {
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand All @@ -135,6 +139,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_RegisterDomainTask() {
s.Equal(data, resp.Info.Data)
s.Equal(retention, resp.Config.Retention)
s.Equal(emitMetric, resp.Config.EmitMetric)
s.Equal(archivalBucket, resp.Config.ArchivalBucket)
s.Equal(archivalStatus, resp.Config.ArchivalStatus)
s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName)
s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters)
s.Equal(configVersion, resp.ConfigVersion)
Expand All @@ -152,6 +158,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_DomainN
ownerEmail := "some random test owner"
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(12)
Expand Down Expand Up @@ -179,6 +187,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_DomainN
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand All @@ -205,6 +215,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_DomainN
s.Equal(domainData, resp.Info.Data)
s.Equal(retention, resp.Config.Retention)
s.Equal(emitMetric, resp.Config.EmitMetric)
s.Equal(archivalBucket, resp.Config.ArchivalBucket)
s.Equal(archivalStatus, resp.Config.ArchivalStatus)
s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName)
s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters)
s.Equal(configVersion, resp.ConfigVersion)
Expand All @@ -223,6 +235,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -249,6 +263,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand All @@ -269,6 +285,7 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
updatedData := map[string]string{"k": "v1"}
updateRetention := int32(122)
updateEmitMetric := true
updateArchivalStatus := shared.ArchivalStatusDisabled
updateClusterActive := "other random active cluster name"
updateClusterStandby := "other random standby cluster name"
updateConfigVersion := configVersion + 1
Expand All @@ -294,6 +311,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(updateRetention),
EmitMetric: common.BoolPtr(updateEmitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(updateArchivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(updateClusterActive),
Expand All @@ -318,6 +337,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
s.Equal(updatedData, resp.Info.Data)
s.Equal(updateRetention, resp.Config.Retention)
s.Equal(updateEmitMetric, resp.Config.EmitMetric)
s.Equal(archivalBucket, resp.Config.ArchivalBucket)
s.Equal(updateArchivalStatus, resp.Config.ArchivalStatus)
s.Equal(updateClusterActive, resp.ReplicationConfig.ActiveClusterName)
s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(updateClusters), resp.ReplicationConfig.Clusters)
s.Equal(updateConfigVersion, resp.ConfigVersion)
Expand All @@ -336,6 +357,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := ""
archivalStatus := shared.ArchivalStatusNeverEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -362,6 +385,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand All @@ -382,6 +407,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
updateData := map[string]string{"k": "v2"}
updateRetention := int32(122)
updateEmitMetric := true
updateArchivalBucket := "some random archival bucket name"
updateArchivalStatus := shared.ArchivalStatusEnabled
updateClusterActive := "other random active cluster name"
updateClusterStandby := "other random standby cluster name"
updateConfigVersion := configVersion + 1
Expand All @@ -407,6 +434,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(updateRetention),
EmitMetric: common.BoolPtr(updateEmitMetric),
ArchivalBucketName: common.StringPtr(updateArchivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(updateArchivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(updateClusterActive),
Expand All @@ -431,6 +460,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateC
s.Equal(updateData, resp.Info.Data)
s.Equal(updateRetention, resp.Config.Retention)
s.Equal(updateEmitMetric, resp.Config.EmitMetric)
s.Equal(updateArchivalBucket, resp.Config.ArchivalBucket)
s.Equal(updateArchivalStatus, resp.Config.ArchivalStatus)
s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName)
s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(updateClusters), resp.ReplicationConfig.Clusters)
s.Equal(updateConfigVersion, resp.ConfigVersion)
Expand All @@ -449,6 +480,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -475,6 +508,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand Down Expand Up @@ -520,6 +555,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(updateRetention),
EmitMetric: common.BoolPtr(updateEmitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(updateClusterActive),
Expand All @@ -544,6 +581,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
s.Equal(data, resp.Info.Data)
s.Equal(retention, resp.Config.Retention)
s.Equal(emitMetric, resp.Config.EmitMetric)
s.Equal(archivalBucket, resp.Config.ArchivalBucket)
s.Equal(archivalStatus, resp.Config.ArchivalStatus)
s.Equal(updateClusterActive, resp.ReplicationConfig.ActiveClusterName)
s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters)
s.Equal(configVersion, resp.ConfigVersion)
Expand All @@ -562,6 +601,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
data := map[string]string{"k": "v"}
retention := int32(10)
emitMetric := true
archivalBucket := "some random archival bucket name"
archivalStatus := shared.ArchivalStatusEnabled
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
Expand All @@ -588,6 +629,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Expand Down Expand Up @@ -635,6 +678,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(updateRetention),
EmitMetric: common.BoolPtr(updateEmitMetric),
ArchivalBucketName: common.StringPtr(archivalBucket),
ArchivalStatus: common.ArchivalStatusPtr(archivalStatus),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(updateClusterActive),
Expand All @@ -656,6 +701,8 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdat
s.Equal(data, resp.Info.Data)
s.Equal(retention, resp.Config.Retention)
s.Equal(emitMetric, resp.Config.EmitMetric)
s.Equal(archivalBucket, resp.Config.ArchivalBucket)
s.Equal(archivalStatus, resp.Config.ArchivalStatus)
s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName)
s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters)
s.Equal(configVersion, resp.ConfigVersion)
Expand Down

0 comments on commit 90a79cd

Please sign in to comment.