Skip to content

Commit

Permalink
Check on-going failover across clusters (uber#3206)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored May 14, 2020
1 parent 0f34983 commit cd7ec20
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 7 deletions.
1 change: 1 addition & 0 deletions common/domain/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
errActiveClusterNotInClusters = &workflow.BadRequestError{Message: "Active cluster is not contained in all clusters."}
errCannotDoDomainFailoverAndUpdate = &workflow.BadRequestError{Message: "Cannot set active cluster to current cluster when other parameters are set."}
errCannotDoGracefulFailoverFromCluster = &workflow.BadRequestError{Message: "Cannot start the graceful failover from a to-be-passive cluster."}
errGracefulFailoverInActiveCluster = &workflow.BadRequestError{Message: "Cannot start the graceful failover from an active cluster to an active cluster."}
errOngoingGracefulFailover = &workflow.BadRequestError{Message: "Cannot start concurrent graceful failover."}
errInvalidGracefulFailover = &workflow.BadRequestError{Message: "Cannot start graceful failover without updating active cluster or in local domain."}

Expand Down
4 changes: 4 additions & 0 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ func (d *HandlerImpl) UpdateDomain(
failoverNotificationVersion := getResponse.FailoverNotificationVersion
isGlobalDomain := getResponse.IsGlobalDomain
gracefulFailoverEndTime := getResponse.FailoverEndTime
currentActiveCluster := replicationConfig.ActiveClusterName

// whether history archival config changed
historyArchivalConfigChanged := false
Expand Down Expand Up @@ -442,6 +443,9 @@ func (d *HandlerImpl) UpdateDomain(
if replicationConfig.ActiveClusterName != d.clusterMetadata.GetCurrentClusterName() {
return nil, errCannotDoGracefulFailoverFromCluster
}
if replicationConfig.ActiveClusterName == currentActiveCluster {
return nil, errGracefulFailoverInActiveCluster
}
// cannot have concurrent failover
if gracefulFailoverEndTime != nil {
return nil, errOngoingGracefulFailover
Expand Down
4 changes: 4 additions & 0 deletions common/domain/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ func (s *domainHandlerCommonSuite) TestUpdateDomain_GracefulFailover_Success() {
}
err := s.handler.RegisterDomain(context.Background(), registerRequest)
s.NoError(err)
resp1, _ := s.metadataMgr.GetDomain(&persistence.GetDomainRequest{
Name: domain,
})
s.Equal(resp1.ReplicationConfig.ActiveClusterName, "standby")

updateRequest := &workflow.UpdateDomainRequest{
Name: common.StringPtr(domain),
Expand Down
73 changes: 73 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

Expand All @@ -37,6 +38,7 @@ import (
h "github.com/uber/cadence/.gen/go/history"
m "github.com/uber/cadence/.gen/go/matching"
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -370,6 +372,15 @@ func (wh *WorkflowHandler) UpdateDomain(
}
}

if isGraceFailoverRequest(updateRequest) {
if err := wh.checkOngoingFailover(
ctx,
updateRequest.Name,
); err != nil {
return nil, err
}
}

if updateRequest.GetName() == "" {
return nil, errDomainNotSet
}
Expand Down Expand Up @@ -3706,6 +3717,68 @@ func isFailoverRequest(updateRequest *gen.UpdateDomainRequest) bool {
return updateRequest.ReplicationConfiguration != nil && updateRequest.ReplicationConfiguration.ActiveClusterName != nil
}

func isGraceFailoverRequest(updateRequest *gen.UpdateDomainRequest) bool {
return updateRequest.IsSetFailoverTimeoutInSeconds()
}

func (wh *WorkflowHandler) checkOngoingFailover(
ctx context.Context,
domainName *string,
) error {

clusterMetadata := wh.GetClusterMetadata()
respChan := make(chan *gen.DescribeDomainResponse, len(clusterMetadata.GetAllClusterInfo()))
wg := &sync.WaitGroup{}

describeDomain := func(
ctx context.Context,
client frontend.Client,
domainName *string,
) {
defer wg.Done()
resp, _ := client.DescribeDomain(
ctx,
&gen.DescribeDomainRequest{
Name: domainName,
},
)
respChan <- resp
}

for clusterName, cluster := range clusterMetadata.GetAllClusterInfo() {
if !cluster.Enabled {
continue
}
frontendClient := wh.GetRemoteFrontendClient(clusterName)
wg.Add(1)
go describeDomain(
ctx,
frontendClient,
domainName,
)
}
wg.Wait()
close(respChan)

var failoverVersion *int64
for resp := range respChan {
if resp == nil {
return &gen.InternalServiceError{
Message: "Failed to verify failover version from all clusters",
}
}
if failoverVersion == nil {
failoverVersion = resp.FailoverVersion
}
if failoverVersion != resp.FailoverVersion {
return &gen.BadRequestError{
Message: "Concurrent failover is not allow.",
}
}
}
return nil
}

func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *gen.GetWorkflowExecutionHistoryRequest, domainID string) bool {
if request.GetExecution() == nil || request.GetExecution().GetRunId() == "" {
return false
Expand Down
15 changes: 8 additions & 7 deletions tools/cli/domainCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,17 @@ func (d *domainCLIImpl) UpdateDomain(c *cli.Context) {
replicationConfig := &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(activeCluster),
}

var failoverTimeout *int32
if c.String(FlagFailoverType) == gracefulFailoverType {
timeout := int32(c.Int(FlagFailoverTimeout))
failoverTimeout = &timeout
}

updateRequest = &shared.UpdateDomainRequest{
Name: common.StringPtr(domainName),
ReplicationConfiguration: replicationConfig,
FailoverTimeoutInSeconds: failoverTimeout,
}
} else {
resp, err := d.describeDomain(ctx, &shared.DescribeDomainRequest{
Expand Down Expand Up @@ -247,12 +255,6 @@ func (d *domainCLIImpl) UpdateDomain(c *cli.Context) {
badBinaryToDelete = common.StringPtr(c.String(FlagRemoveBadBinary))
}

var failoverTimeout *int32
if c.String(FlagFailoverType) == gracefulFailoverType {
timeout := int32(c.Int(FlagFailoverTimeout))
failoverTimeout = &timeout
}

updateInfo := &shared.UpdateDomainInfo{
Description: common.StringPtr(description),
OwnerEmail: common.StringPtr(ownerEmail),
Expand All @@ -276,7 +278,6 @@ func (d *domainCLIImpl) UpdateDomain(c *cli.Context) {
Configuration: updateConfig,
ReplicationConfiguration: replicationConfig,
DeleteBadBinary: badBinaryToDelete,
FailoverTimeoutInSeconds: failoverTimeout,
}
}

Expand Down

0 comments on commit cd7ec20

Please sign in to comment.