Skip to content

Commit

Permalink
Create domain filter for fixer workflow (cadence-workflow#3771)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Nov 19, 2020
1 parent b0316fa commit 8a6df38
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 64 deletions.
11 changes: 11 additions & 0 deletions common/reconciliation/entity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,20 @@ func (curre *CurrentExecution) GetShardID() int {
return curre.Execution.ShardID
}

// GetDomainID returns the domain id
func (ce *ConcreteExecution) GetDomainID() string {
return ce.DomainID
}

// GetDomainID returns the domain id
func (curre *CurrentExecution) GetDomainID() string {
return curre.DomainID
}

// Entity allows to deserialize and validate different type of executions
type Entity interface {
Validate() error
Clone() Entity
GetShardID() int
GetDomainID() string
}
6 changes: 6 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ var keys = map[Key]string{
CurrentExecutionsScannerPersistencePageSize: "worker.currentExecutionsPersistencePageSize",
CurrentExecutionsScannerInvariantCollectionHistory: "worker.currentExecutionsScannerInvariantCollectionHistory",
CurrentExecutionsScannerInvariantCollectionMutableState: "worker.currentExecutionsInvariantCollectionMutableState",
ConcreteExecutionFixerDomainAllow: "worker.concreteExecutionFixerDomainAllow",
CurrentExecutionFixerDomainAllow: "worker.currentExecutionFixerDomainAllow",
}

const (
Expand Down Expand Up @@ -856,6 +858,10 @@ const (
CurrentExecutionsScannerInvariantCollectionHistory
// CurrentExecutionsScannerInvariantCollectionMutableState indicates if mutable state invariant checks should be run
CurrentExecutionsScannerInvariantCollectionMutableState
// ConcreteExecutionFixerDomainAllow indicates which domains are allowed to be fixed by concrete fixer workflow
ConcreteExecutionFixerDomainAllow
// CurrentExecutionFixerDomainAllow indicates which domains are allowed to be fixed by current fixer workflow
CurrentExecutionFixerDomainAllow
// EnableBatcher decides whether start batcher in our worker
EnableBatcher
// EnableParentClosePolicyWorker decides whether or not enable system workers for processing parent close policy task
Expand Down
2 changes: 2 additions & 0 deletions service/worker/scanner/executions/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ func fixShard(
resources.GetBlobstoreClient(),
params.ResolvedFixerWorkflowConfig.BlobstoreFlushThreshold,
func() { activity.RecordHeartbeat(activityCtx, heartbeatDetails) },
resources.GetDomainCache(),
ctx.FixerWorkflowDynamicConfig.AllowDomain,
)
report := fixer.Fix()
if report.Result.ControlFlowFailure != nil {
Expand Down
5 changes: 5 additions & 0 deletions service/worker/scanner/executions/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type (
DynamicConfigInvariantCollections DynamicConfigInvariantCollections
}

// FixerWorkflowDynamicConfig is the dynamic config for the fixer workflow
FixerWorkflowDynamicConfig struct {
AllowDomain dynamicconfig.BoolPropertyFnWithDomainFilter
}

// DynamicConfigInvariantCollections is the portion of ScannerWorkflowDynamicConfig
// which indicates which collections of invariants should be run
DynamicConfigInvariantCollections struct {
Expand Down
27 changes: 26 additions & 1 deletion service/worker/scanner/executions/shard/fixer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ import (
"github.com/pborman/uuid"

"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/reconciliation/entity"
"github.com/uber/cadence/common/reconciliation/invariant"
"github.com/uber/cadence/common/reconciliation/store"
"github.com/uber/cadence/common/service/dynamicconfig"
)

type (
Expand All @@ -43,6 +46,8 @@ type (
fixedWriter store.ExecutionWriter
invariantManager invariant.Manager
progressReportFn func()
domainCache cache.DomainCache
allowDomain dynamicconfig.BoolPropertyFnWithDomainFilter
}
)

Expand All @@ -55,6 +60,8 @@ func NewFixer(
blobstoreClient blobstore.Client,
blobstoreFlushThreshold int,
progressReportFn func(),
domainCache cache.DomainCache,
allowDomain dynamicconfig.BoolPropertyFnWithDomainFilter,
) Fixer {
id := uuid.New()

Expand All @@ -67,6 +74,8 @@ func NewFixer(
fixedWriter: store.NewBlobstoreWriter(id, store.FixedExtension, blobstoreClient, blobstoreFlushThreshold),
invariantManager: manager,
progressReportFn: progressReportFn,
domainCache: domainCache,
allowDomain: allowDomain,
}
}

Expand All @@ -85,7 +94,23 @@ func (f *fixer) Fix() FixReport {
}
return result
}
fixResult := f.invariantManager.RunFixes(f.ctx, soe.Execution)
de, err := f.domainCache.GetDomainByID(soe.Execution.(entity.Entity).GetDomainID())
if err != nil {
result.Result.ControlFlowFailure = &ControlFlowFailure{
Info: "failed to get domain name",
InfoDetails: err.Error(),
}
return result
}

var fixResult invariant.ManagerFixResult
if f.allowDomain(de.GetInfo().Name) {
fixResult = f.invariantManager.RunFixes(f.ctx, soe.Execution)
} else {
fixResult = invariant.ManagerFixResult{
FixResultType: invariant.FixResultTypeSkipped,
}
}
result.Stats.ExecutionCount++
foe := store.FixOutputEntity{
Execution: soe.Execution,
Expand Down
Loading

0 comments on commit 8a6df38

Please sign in to comment.