Skip to content

Commit

Permalink
Feature/cdnc 2263 Add toggle which can block domain failovers (uber#4786
Browse files Browse the repository at this point in the history
)

Add support for cluster lockdown to prevent domain failovers in case target cluster is already at capacity.
  • Loading branch information
allenchen2244 authored Apr 1, 2022
1 parent 8c61641 commit 147172c
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 0 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,12 @@ const (
// Default value: false
CorruptWorkflowWatchdogPause

// Lockdown defines if we want to allow failovers of domains to this cluster
// KeyName: system.Lockdown
// Value type: bool
// Default value: false
Lockdown

// LastKeyForTest must be the last one in this const group for testing purpose
LastKeyForTest
)
Expand Down Expand Up @@ -2245,6 +2251,7 @@ var Keys = map[Key]string{
EnableGRPCOutbound: "system.enableGRPCOutbound",
GRPCMaxSizeInByte: "system.grpcMaxSizeInByte",
EnableWatchDog: "system.EnableWatchDog",
Lockdown: "system.Lockdown",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down
2 changes: 2 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
EnableClientVersionCheck dynamicconfig.BoolPropertyFn
DisallowQuery dynamicconfig.BoolPropertyFnWithDomainFilter
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
Lockdown dynamicconfig.BoolPropertyFnWithDomainFilter

// id length limits
MaxIDLengthWarnLimit dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -160,6 +161,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, sendRawWorkflowHistory),
DecisionResultCountLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendDecisionResultCountLimit, 0),
EmitSignalNameMetricsTag: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.FrontendEmitSignalNameMetricsTag, false),
Lockdown: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.Lockdown, false),
domainConfig: domain.Config{
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, domain.MaxBadBinaries),
MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.DefaultMinWorkflowRetentionInDays),
Expand Down
13 changes: 13 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var (
errClusterNameNotSet = &types.BadRequestError{Message: "Cluster name is not set."}
errEmptyReplicationInfo = &types.BadRequestError{Message: "Replication task info is not set."}
errEmptyQueueType = &types.BadRequestError{Message: "Queue type is not set."}
errDomainInLockdown = &types.BadRequestError{Message: "Domain is not accepting fail overs at this time due to lockdown."}
errShuttingDown = &types.InternalServiceError{Message: "Shutting down"}

// err for archival
Expand Down Expand Up @@ -408,6 +409,11 @@ func (wh *WorkflowHandler) UpdateDomain(
if err := checkPermission(wh.config, updateRequest.SecurityToken); err != nil {
return nil, err
}
} else {
// reject the failover if the cluster is in lockdown
if err := checkFailOverPermission(wh.config, updateRequest.Name); err != nil {
return nil, err
}
}

if isGraceFailoverRequest(updateRequest) {
Expand Down Expand Up @@ -4370,6 +4376,13 @@ func checkPermission(
return nil
}

func checkFailOverPermission(config *Config, domainName string) error {
if config.Lockdown(domainName) {
return errDomainInLockdown
}
return nil
}

type domainWrapper struct {
domain string
}
Expand Down
132 changes: 132 additions & 0 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,62 @@ func (s *workflowHandlerSuite) TestUpdateDomain_Success_ArchivalNeverEnabledToEn
s.Equal(types.ArchivalStatusEnabled, result.Configuration.GetVisibilityArchivalStatus())
s.Equal(testVisibilityArchivalURI, result.Configuration.GetVisibilityArchivalURI())
}
func (s *workflowHandlerSuite) TestUpdateDomain_Success_FailOver() {
s.mockMetadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{
NotificationVersion: int64(0),
}, nil)
getDomainResp := persistenceGetDomainResponseForFailoverTest(
&domain.ArchivalState{Status: types.ArchivalStatusDisabled, URI: ""},
&domain.ArchivalState{Status: types.ArchivalStatusDisabled, URI: ""},
)

s.mockMetadataMgr.On("GetDomain", mock.Anything, mock.Anything).Return(getDomainResp, nil)
s.mockMetadataMgr.On("UpdateDomain", mock.Anything, mock.Anything).Return(nil)
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestAlternativeClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetNextFailoverVersion(gomock.Any(), gomock.Any()).Return(int64(123)).AnyTimes()
s.mockArchivalMetadata.On("GetHistoryConfig").Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("disabled"), dc.GetBoolPropertyFn(false), "disabled", "some random URI"))
s.mockArchivalMetadata.On("GetVisibilityConfig").Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("disabled"), dc.GetBoolPropertyFn(false), "disabled", "some random URI"))
s.mockProducer.On("Publish", mock.Anything, mock.Anything).Return(nil).Once()
s.mockResource.RemoteFrontendClient.EXPECT().DescribeDomain(gomock.Any(), gomock.Any()).
Return(describeDomainResponseServer, nil).AnyTimes()

wh := s.getWorkflowHandler(s.newConfig(dc.NewInMemoryClient()))

updateReq := updateFailoverRequest(
common.StringPtr(testHistoryArchivalURI),
types.ArchivalStatusEnabled.Ptr(),
common.StringPtr(testVisibilityArchivalURI),
types.ArchivalStatusEnabled.Ptr(),
common.Int32Ptr(1),
common.StringPtr(cluster.TestAlternativeClusterName),
)
result, err := wh.UpdateDomain(context.Background(), updateReq)

s.NoError(err)
s.NotNil(result)
s.NotNil(result.Configuration)
s.Equal(result.ReplicationConfiguration.ActiveClusterName, cluster.TestAlternativeClusterName)
}

func (s *workflowHandlerSuite) TestUpdateDomain_Failure_FailoverLockdown() {

dynamicClient := dc.NewInMemoryClient()
dynamicClient.UpdateValue(dc.Lockdown, map[string]interface{}{"Lockdown": true})
wh := s.getWorkflowHandler(s.newConfig(dynamicClient))

updateReq := updateFailoverRequest(
common.StringPtr(testHistoryArchivalURI),
types.ArchivalStatusEnabled.Ptr(),
common.StringPtr(testVisibilityArchivalURI),
types.ArchivalStatusEnabled.Ptr(),
common.Int32Ptr(1),
common.StringPtr(cluster.TestAlternativeClusterName),
)
resp, err := wh.UpdateDomain(context.Background(), updateReq)
s.Nil(resp)
s.Error(err)
}

func (s *workflowHandlerSuite) TestHistoryArchived() {
wh := s.getWorkflowHandler(s.newConfig(dc.NewInMemoryClient()))
Expand Down Expand Up @@ -1488,6 +1544,25 @@ func updateRequest(
}
}

func updateFailoverRequest(
historyArchivalURI *string,
historyArchivalStatus *types.ArchivalStatus,
visibilityArchivalURI *string,
visibilityArchivalStatus *types.ArchivalStatus,
failoverTimeoutInSeconds *int32,
activeClusterName *string,
) *types.UpdateDomainRequest {
return &types.UpdateDomainRequest{
Name: "test-name",
HistoryArchivalStatus: historyArchivalStatus,
HistoryArchivalURI: historyArchivalURI,
VisibilityArchivalStatus: visibilityArchivalStatus,
VisibilityArchivalURI: visibilityArchivalURI,
FailoverTimeoutInSeconds: failoverTimeoutInSeconds,
ActiveClusterName: activeClusterName,
}
}

func persistenceGetDomainResponse(historyArchivalState, visibilityArchivalState *domain.ArchivalState) *persistence.GetDomainResponse {
return &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{
Expand Down Expand Up @@ -1522,6 +1597,40 @@ func persistenceGetDomainResponse(historyArchivalState, visibilityArchivalState
}
}

func persistenceGetDomainResponseForFailoverTest(historyArchivalState, visibilityArchivalState *domain.ArchivalState) *persistence.GetDomainResponse {
return &persistence.GetDomainResponse{
Info: &persistence.DomainInfo{
ID: "test-id",
Name: "test-name",
Status: 0,
Description: "test-description",
OwnerEmail: "test-owner-email",
Data: make(map[string]string),
},
Config: &persistence.DomainConfig{
Retention: 1,
EmitMetric: true,
HistoryArchivalStatus: historyArchivalState.Status,
HistoryArchivalURI: historyArchivalState.URI,
VisibilityArchivalStatus: visibilityArchivalState.Status,
VisibilityArchivalURI: visibilityArchivalState.URI,
},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{
ClusterName: cluster.TestAlternativeClusterName,
},
},
},
IsGlobalDomain: true,
ConfigVersion: 0,
FailoverVersion: 0,
FailoverNotificationVersion: 0,
NotificationVersion: 0,
}
}

func registerDomainRequest(
historyArchivalStatus *types.ArchivalStatus,
historyArchivalURI string,
Expand Down Expand Up @@ -1567,3 +1676,26 @@ func listArchivedWorkflowExecutionsTestRequest() *types.ListArchivedWorkflowExec
Query: "some random query string",
}
}

var describeDomainResponseServer = &types.DescribeDomainResponse{
DomainInfo: &types.DomainInfo{
Name: "test-domain",
Description: "a test domain",
OwnerEmail: "test@uber.com",
},
Configuration: &types.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: 3,
EmitMetric: true,
},
ReplicationConfiguration: &types.DomainReplicationConfiguration{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*types.ClusterReplicationConfiguration{
{
ClusterName: cluster.TestCurrentClusterName,
},
{
ClusterName: cluster.TestAlternativeClusterName,
},
},
},
}

0 comments on commit 147172c

Please sign in to comment.