Skip to content

Commit

Permalink
Improve failover coordinator error logging (uber#4811)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored May 11, 2022
1 parent a51b613 commit 400bbe4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
21 changes: 11 additions & 10 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (c *domainCache) GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID i
return int64(c.cacheByID.Load().(Cache).Size()), int64(c.cacheNameToID.Load().(Cache).Size())
}

// Start start the background refresh of domain
// Start starts the background refresh of domain
func (c *domainCache) Start() {
if !atomic.CompareAndSwapInt32(&c.status, domainCacheInitialized, domainCacheStarted) {
return
Expand All @@ -266,7 +266,7 @@ func (c *domainCache) Start() {
go c.refreshLoop()
}

// Start start the background refresh of domain
// Stop stops background refresh of domain
func (c *domainCache) Stop() {
if !atomic.CompareAndSwapInt32(&c.status, domainCacheStarted, domainCacheStopped) {
return
Expand Down Expand Up @@ -348,8 +348,9 @@ func (c *domainCache) GetDomain(
) (*DomainCacheEntry, error) {

if name == "" {
return nil, &types.BadRequestError{Message: "Domain is empty."}
return nil, &types.BadRequestError{Message: "Domain name is empty"}
}

return c.getDomain(name)
}

Expand Down Expand Up @@ -452,7 +453,7 @@ func (c *domainCache) refreshDomainsLocked() error {
if err != nil {
return err
}
domainNotificationVersion := metadata.NotificationVersion

var token []byte
request := &persistence.ListDomainsRequest{PageSize: domainCacheRefreshPageSize}
var domains DomainCacheEntries
Expand Down Expand Up @@ -487,23 +488,25 @@ func (c *domainCache) refreshDomainsLocked() error {
newCacheByID.Put(domain.info.ID, domain)
}

scopedMetrics := c.metricsClient.Scope(metrics.DomainCacheScope)

UpdateLoop:
for _, domain := range domains {
if domain.notificationVersion >= domainNotificationVersion {
if domain.notificationVersion >= metadata.NotificationVersion {
// this guarantee that domain change events before the
// domainNotificationVersion is loaded into the cache.

// the domain change events after the domainNotificationVersion
// will be loaded into cache in the next refresh
c.logger.Info("Received larger domain notification version", tag.WorkflowDomainName(domain.GetInfo().Name))
c.logger.Info("Domain notification is not less than than metadata notification version", tag.WorkflowDomainName(domain.GetInfo().Name))
break UpdateLoop
}
triggerCallback, nextEntry, err := c.updateIDToDomainCache(newCacheByID, domain.info.ID, domain)
if err != nil {
return err
}

c.metricsClient.Scope(metrics.DomainCacheScope).Tagged(
scopedMetrics.Tagged(
metrics.DomainTag(nextEntry.info.Name),
metrics.ActiveClusterTag(nextEntry.replicationConfig.ActiveClusterName),
).UpdateGauge(metrics.ActiveClusterGauge, 1)
Expand All @@ -528,9 +531,7 @@ UpdateLoop:
c.lastRefreshTime = now
if now.Sub(c.lastCallbackEmitTime) > 30*time.Minute {
c.lastCallbackEmitTime = now
for range c.callbacks {
c.metricsClient.IncCounter(metrics.DomainCacheScope, metrics.DomainCacheCallbacksCount)
}
scopedMetrics.AddCounter(metrics.DomainCacheCallbacksCount, int64(len(c.callbacks)))
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions common/domain/failover_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package domain

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -183,13 +184,12 @@ func CleanPendingActiveState(
// this call has to be made
metadata, err := domainManager.GetMetadata(context.Background())
if err != nil {
return err
return fmt.Errorf("getting metadata: %w", err)
}
notificationVersion := metadata.NotificationVersion

getResponse, err := domainManager.GetDomain(context.Background(), &persistence.GetDomainRequest{ID: domainID})
if err != nil {
return err
return fmt.Errorf("getting domain: %w", err)
}
localFailoverVersion := getResponse.FailoverVersion
isGlobalDomain := getResponse.IsGlobalDomain
Expand All @@ -205,7 +205,7 @@ func CleanPendingActiveState(
FailoverVersion: localFailoverVersion,
FailoverNotificationVersion: getResponse.FailoverNotificationVersion,
FailoverEndTime: nil,
NotificationVersion: notificationVersion,
NotificationVersion: metadata.NotificationVersion,
}
op := func() error {
return domainManager.UpdateDomain(context.Background(), updateReq)
Expand Down
9 changes: 7 additions & 2 deletions service/history/failover/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,10 @@ func (c *coordinatorImpl) handleFailoverMarkers(
domainName, err := c.domainCache.GetDomainName(domainID)
if err != nil {
c.logger.Error("Coordinator failed to get domain after receiving all failover markers",
tag.WorkflowDomainID(domainID))
tag.WorkflowDomainID(domainID),
tag.Error(err),
)

c.scope.Tagged(metrics.DomainTag(domainName)).IncCounter(metrics.CadenceFailures)
return
}
Expand All @@ -310,7 +313,9 @@ func (c *coordinatorImpl) handleFailoverMarkers(
c.retryPolicy,
); err != nil {
c.logger.Error("Coordinator failed to update domain after receiving all failover markers",
tag.WorkflowDomainID(domainID))
tag.WorkflowDomainID(domainID),
tag.Error(err),
)
c.scope.IncCounter(metrics.CadenceFailures)
return
}
Expand Down

0 comments on commit 400bbe4

Please sign in to comment.