Skip to content

Commit

Permalink
Merge branch 'master' into 3+DC
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Sep 30, 2019
2 parents b88ecb6 + ac3eba1 commit a74819b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
5 changes: 1 addition & 4 deletions common/archiver/filestore/queryParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ func (p *queryParser) convertAndExpr(andExpr *sqlparser.AndExpr, parsedQuery *pa
if err := p.convertWhereExpr(andExpr.Left, parsedQuery); err != nil {
return err
}
if err := p.convertWhereExpr(andExpr.Right, parsedQuery); err != nil {
return err
}
return nil
return p.convertWhereExpr(andExpr.Right, parsedQuery)
}

func (p *queryParser) convertComparisonExpr(compExpr *sqlparser.ComparisonExpr, parsedQuery *parsedQuery) error {
Expand Down
43 changes: 35 additions & 8 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
package worker

import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

Expand All @@ -43,6 +46,8 @@ import (
"github.com/uber/cadence/service/worker/parentclosepolicy"
"github.com/uber/cadence/service/worker/replicator"
"github.com/uber/cadence/service/worker/scanner"
cshared "go.uber.org/cadence/.gen/go/shared"
cclient "go.uber.org/cadence/client"
)

type (
Expand Down Expand Up @@ -72,7 +77,7 @@ type (
}
)

const domainRefreshInterval = time.Second * 30
const domainRefreshInterval = time.Second * 11

// NewService builds a new cadence-worker service
func NewService(params *service.BootstrapParams) common.Daemon {
Expand Down Expand Up @@ -327,17 +332,39 @@ func (s *Service) ensureSystemDomainExists(pFactory persistencefactory.Factory,
}
defer metadataProxy.Close()
_, err = metadataProxy.GetDomain(&persistence.GetDomainRequest{Name: common.SystemLocalDomainName})
switch err.(type) {
case nil:
return
case *shared.EntityNotExistsError:
s.logger.Info("cadence-system domain does not exist, attempting to register domain")
s.registerSystemDomain(pFactory, clusterName)
default:
if err == nil {
s.ensureDomainAvailable()
} else {
if _, ok := err.(*shared.EntityNotExistsError); ok {
s.logger.Info("cadence-system domain does not exist, attempting to register domain")
s.registerSystemDomain(pFactory, clusterName)
}
s.logger.Fatal("failed to verify if cadence system domain exists", tag.Error(err))
}
}

func (s *Service) ensureDomainAvailable() {
client := cclient.NewClient(s.params.PublicClient, common.SystemLocalDomainName, &cclient.Options{})
// Use TerminateWorkflow to check whether domain is refreshed in cache or not
err := client.TerminateWorkflow(context.Background(), "wid-not-exist", "", "test reason", nil)
retryCount := 0
for err != nil && retryCount <= 10 {
nonExistErr, ok := err.(*cshared.EntityNotExistsError)
if ok && isErrSystemDomainNotExist(nonExistErr) {
s.logger.Info(fmt.Sprintf("cadence-system domain is not ready, waiting %v for domain refresh", domainRefreshInterval), tag.Attempt(int32(retryCount)))
time.Sleep(domainRefreshInterval)
err = client.TerminateWorkflow(context.Background(), "wid-not-exist", "", "test reason", nil)
retryCount++
} else {
break
}
}
}

func isErrSystemDomainNotExist(err *cshared.EntityNotExistsError) bool {
return strings.Contains(err.Message, common.SystemLocalDomainName)
}

func (s *Service) registerSystemDomain(pFactory persistencefactory.Factory, clusterName string) {
metadataV2, err := pFactory.NewMetadataManager(persistencefactory.MetadataV2)
if err != nil {
Expand Down

0 comments on commit a74819b

Please sign in to comment.