From 147172c1d788d54a82ece3311bbd3bf8026f0943 Mon Sep 17 00:00:00 2001 From: allenchen2244 <102192478+allenchen2244@users.noreply.github.com> Date: Fri, 1 Apr 2022 12:37:42 -0700 Subject: [PATCH] Feature/cdnc 2263 Add toggle which can block domain failovers (#4786) Add support for cluster lockdown to prevent domain failovers in case target cluster is already at capacity. --- common/dynamicconfig/constants.go | 7 ++ service/frontend/service.go | 2 + service/frontend/workflowHandler.go | 13 +++ service/frontend/workflowHandler_test.go | 132 +++++++++++++++++++++++ 4 files changed, 154 insertions(+) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 9ded07bfa63..40cfcba0bf4 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2193,6 +2193,12 @@ const ( // Default value: false CorruptWorkflowWatchdogPause + // Lockdown defines if we want to allow failovers of domains to this cluster + // KeyName: system.Lockdown + // Value type: bool + // Default value: false + Lockdown + // LastKeyForTest must be the last one in this const group for testing purpose LastKeyForTest ) @@ -2245,6 +2251,7 @@ var Keys = map[Key]string{ EnableGRPCOutbound: "system.enableGRPCOutbound", GRPCMaxSizeInByte: "system.grpcMaxSizeInByte", EnableWatchDog: "system.EnableWatchDog", + Lockdown: "system.Lockdown", // size limit BlobSizeLimitError: "limit.blobSize.error", diff --git a/service/frontend/service.go b/service/frontend/service.go index 8e04570760e..0a8f3cd15fd 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -59,6 +59,7 @@ type Config struct { EnableClientVersionCheck dynamicconfig.BoolPropertyFn DisallowQuery dynamicconfig.BoolPropertyFnWithDomainFilter ShutdownDrainDuration dynamicconfig.DurationPropertyFn + Lockdown dynamicconfig.BoolPropertyFnWithDomainFilter // id length limits MaxIDLengthWarnLimit dynamicconfig.IntPropertyFn @@ -160,6 +161,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, sendRawWorkflowHistory), DecisionResultCountLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendDecisionResultCountLimit, 0), EmitSignalNameMetricsTag: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.FrontendEmitSignalNameMetricsTag, false), + Lockdown: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.Lockdown, false), domainConfig: domain.Config{ MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, domain.MaxBadBinaries), MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.DefaultMinWorkflowRetentionInDays), diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index e11ab9ac20d..0ae7ee659a3 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -132,6 +132,7 @@ var ( errClusterNameNotSet = &types.BadRequestError{Message: "Cluster name is not set."} errEmptyReplicationInfo = &types.BadRequestError{Message: "Replication task info is not set."} errEmptyQueueType = &types.BadRequestError{Message: "Queue type is not set."} + errDomainInLockdown = &types.BadRequestError{Message: "Domain is not accepting fail overs at this time due to lockdown."} errShuttingDown = &types.InternalServiceError{Message: "Shutting down"} // err for archival @@ -408,6 +409,11 @@ func (wh *WorkflowHandler) UpdateDomain( if err := checkPermission(wh.config, updateRequest.SecurityToken); err != nil { return nil, err } + } else { + // reject the failover if the cluster is in lockdown + if err := checkFailOverPermission(wh.config, updateRequest.Name); err != nil { + return nil, err + } } if isGraceFailoverRequest(updateRequest) { @@ -4370,6 +4376,13 @@ func checkPermission( return nil } +func checkFailOverPermission(config *Config, domainName string) error { + if config.Lockdown(domainName) { + return errDomainInLockdown + } + return nil +} + type domainWrapper struct { domain string } diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index c059518c9d1..0428ad9162c 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -886,6 +886,62 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalNeverEnabledToEn s.Equal(types.ArchivalStatusEnabled, result.Configuration.GetVisibilityArchivalStatus()) s.Equal(testVisibilityArchivalURI, result.Configuration.GetVisibilityArchivalURI()) } +func (s *workflowHandlerSuite) TestUpdateDomain_Success_FailOver() { + s.mockMetadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{ + NotificationVersion: int64(0), + }, nil) + getDomainResp := persistenceGetDomainResponseForFailoverTest( + &domain.ArchivalState{Status: types.ArchivalStatusDisabled, URI: ""}, + &domain.ArchivalState{Status: types.ArchivalStatusDisabled, URI: ""}, + ) + + s.mockMetadataMgr.On("GetDomain", mock.Anything, mock.Anything).Return(getDomainResp, nil) + s.mockMetadataMgr.On("UpdateDomain", mock.Anything, mock.Anything).Return(nil) + s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() + s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestAlternativeClusterName).AnyTimes() + s.mockClusterMetadata.EXPECT().GetNextFailoverVersion(gomock.Any(), gomock.Any()).Return(int64(123)).AnyTimes() + s.mockArchivalMetadata.On("GetHistoryConfig").Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("disabled"), dc.GetBoolPropertyFn(false), "disabled", "some random URI")) + s.mockArchivalMetadata.On("GetVisibilityConfig").Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("disabled"), dc.GetBoolPropertyFn(false), "disabled", "some random URI")) + s.mockProducer.On("Publish", mock.Anything, mock.Anything).Return(nil).Once() + s.mockResource.RemoteFrontendClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any()). + Return(describeDomainResponseServer, nil).AnyTimes() + + wh := s.getWorkflowHandler(s.newConfig(dc.NewInMemoryClient())) + + updateReq := updateFailoverRequest( + common.StringPtr(testHistoryArchivalURI), + types.ArchivalStatusEnabled.Ptr(), + common.StringPtr(testVisibilityArchivalURI), + types.ArchivalStatusEnabled.Ptr(), + common.Int32Ptr(1), + common.StringPtr(cluster.TestAlternativeClusterName), + ) + result, err := wh.UpdateDomain(context.Background(), updateReq) + + s.NoError(err) + s.NotNil(result) + s.NotNil(result.Configuration) + s.Equal(result.ReplicationConfiguration.ActiveClusterName, cluster.TestAlternativeClusterName) +} + +func (s *workflowHandlerSuite) TestUpdateDomain_Failure_FailoverLockdown() { + + dynamicClient := dc.NewInMemoryClient() + dynamicClient.UpdateValue(dc.Lockdown, map[string]interface{}{"Lockdown": true}) + wh := s.getWorkflowHandler(s.newConfig(dynamicClient)) + + updateReq := updateFailoverRequest( + common.StringPtr(testHistoryArchivalURI), + types.ArchivalStatusEnabled.Ptr(), + common.StringPtr(testVisibilityArchivalURI), + types.ArchivalStatusEnabled.Ptr(), + common.Int32Ptr(1), + common.StringPtr(cluster.TestAlternativeClusterName), + ) + resp, err := wh.UpdateDomain(context.Background(), updateReq) + s.Nil(resp) + s.Error(err) +} func (s *workflowHandlerSuite) TestHistoryArchived() { wh := s.getWorkflowHandler(s.newConfig(dc.NewInMemoryClient())) @@ -1488,6 +1544,25 @@ func updateRequest( } } +func updateFailoverRequest( + historyArchivalURI *string, + historyArchivalStatus *types.ArchivalStatus, + visibilityArchivalURI *string, + visibilityArchivalStatus *types.ArchivalStatus, + failoverTimeoutInSeconds *int32, + activeClusterName *string, +) *types.UpdateDomainRequest { + return &types.UpdateDomainRequest{ + Name: "test-name", + HistoryArchivalStatus: historyArchivalStatus, + HistoryArchivalURI: historyArchivalURI, + VisibilityArchivalStatus: visibilityArchivalStatus, + VisibilityArchivalURI: visibilityArchivalURI, + FailoverTimeoutInSeconds: failoverTimeoutInSeconds, + ActiveClusterName: activeClusterName, + } +} + func persistenceGetDomainResponse(historyArchivalState, visibilityArchivalState *domain.ArchivalState) *persistence.GetDomainResponse { return &persistence.GetDomainResponse{ Info: &persistence.DomainInfo{ @@ -1522,6 +1597,40 @@ func persistenceGetDomainResponse(historyArchivalState, visibilityArchivalState } } +func persistenceGetDomainResponseForFailoverTest(historyArchivalState, visibilityArchivalState *domain.ArchivalState) *persistence.GetDomainResponse { + return &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ + ID: "test-id", + Name: "test-name", + Status: 0, + Description: "test-description", + OwnerEmail: "test-owner-email", + Data: make(map[string]string), + }, + Config: &persistence.DomainConfig{ + Retention: 1, + EmitMetric: true, + HistoryArchivalStatus: historyArchivalState.Status, + HistoryArchivalURI: historyArchivalState.URI, + VisibilityArchivalStatus: visibilityArchivalState.Status, + VisibilityArchivalURI: visibilityArchivalState.URI, + }, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + { + ClusterName: cluster.TestAlternativeClusterName, + }, + }, + }, + IsGlobalDomain: true, + ConfigVersion: 0, + FailoverVersion: 0, + FailoverNotificationVersion: 0, + NotificationVersion: 0, + } +} + func registerDomainRequest( historyArchivalStatus *types.ArchivalStatus, historyArchivalURI string, @@ -1567,3 +1676,26 @@ func listArchivedWorkflowExecutionsTestRequest() *types.ListArchivedWorkflowExec Query: "some random query string", } } + +var describeDomainResponseServer = &types.DescribeDomainResponse{ + DomainInfo: &types.DomainInfo{ + Name: "test-domain", + Description: "a test domain", + OwnerEmail: "test@uber.com", + }, + Configuration: &types.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: 3, + EmitMetric: true, + }, + ReplicationConfiguration: &types.DomainReplicationConfiguration{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*types.ClusterReplicationConfiguration{ + { + ClusterName: cluster.TestCurrentClusterName, + }, + { + ClusterName: cluster.TestAlternativeClusterName, + }, + }, + }, +}