Skip to content

Commit

Permalink
Add invariant manager (uber#3263)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored and yux0 committed May 4, 2021
1 parent 562c638 commit 9ab9771
Show file tree
Hide file tree
Showing 12 changed files with 675 additions and 20 deletions.
1 change: 1 addition & 0 deletions service/history/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package resource

import (
"github.com/golang/mock/gomock"

"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/service/history/events"
Expand Down
6 changes: 3 additions & 3 deletions service/worker/scanner/executions/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type (
// It can be used to run a group of invariant checks or fixes.
// It is responsible for running invariants in their dependency order.
InvariantManager interface {
RunChecks(Execution) CheckResult
RunFixes(Execution) FixResult
RunChecks(Execution) ManagerCheckResult
RunFixes(Execution) ManagerFixResult
InvariantTypes() []InvariantType
}

Expand All @@ -49,7 +49,7 @@ type (
// It can also be used to fix the invariant for an execution.
Invariant interface {
Check(Execution, *InvariantResourceBag) CheckResult
Fix(Execution) FixResult
Fix(Execution, *InvariantResourceBag) FixResult
InvariantType() InvariantType
}

Expand Down
76 changes: 76 additions & 0 deletions service/worker/scanner/executions/common/invariant_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 24 additions & 3 deletions service/worker/scanner/executions/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type (
FixResultType string
// InvariantType is the type of an invariant
InvariantType string
// InvariantCollection is a type which indicates a sorted collection of invariants
InvariantCollection int
)

const (
Expand All @@ -65,6 +67,11 @@ const (
ValidFirstEventInvariantType = "valid_first_event"
// OpenCurrentExecution asserts that an open concrete execution must have a valid current execution
OpenCurrentExecution = "open_current_execution"

// InvariantCollectionMutableState is the collection of invariants relating to mutable state
InvariantCollectionMutableState InvariantCollection = iota
// InvariantCollectionHistory is the collection of invariants relating to history
InvariantCollectionHistory
)

// The following are types related to Invariant.
Expand Down Expand Up @@ -96,9 +103,22 @@ type (
// FixResult is the result of running Fix.
FixResult struct {
FixResultType FixResultType
CheckResult CheckResult
Info string
InfoDetails string
}

// ManagerCheckResult is the result of running a sorted list of checks
ManagerCheckResult struct {
CheckResultType CheckResultType
CheckResults []CheckResult
}

// ManagerFixResult is the result of running a sorted list of fixes
ManagerFixResult struct {
FixResultType FixResultType
FixResults []FixResult
}
)

// The following are serializable types that represent the reports returns by Scan and Fix.
Expand Down Expand Up @@ -186,13 +206,14 @@ type (
// ScanOutputEntity represents a single execution that should be durably recorded by Scan.
ScanOutputEntity struct {
Execution Execution
Result CheckResult
Result ManagerCheckResult
}

// FixOutputEntity represents a single execution that should be durably recorded by fix.
// It contains the ScanOutputEntity that was given as input to fix.
FixOutputEntity struct {
ScanOutputEntity ScanOutputEntity
Result FixResult
Execution Execution
Input ScanOutputEntity
Result ManagerFixResult
}
)
8 changes: 4 additions & 4 deletions service/worker/scanner/executions/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ func Open(state int) bool {
func DeleteExecution(
exec *Execution,
pr PersistenceRetryer,
) FixResult {
) *FixResult {
if err := pr.DeleteWorkflowExecution(&persistence.DeleteWorkflowExecutionRequest{
DomainID: exec.DomainID,
WorkflowID: exec.WorkflowID,
RunID: exec.RunID,
}); err != nil {
return FixResult{
return &FixResult{
FixResultType: FixResultTypeFailed,
Info: "failed to delete concrete workflow execution",
InfoDetails: err.Error(),
Expand All @@ -157,13 +157,13 @@ func DeleteExecution(
WorkflowID: exec.WorkflowID,
RunID: exec.RunID,
}); err != nil {
return FixResult{
return &FixResult{
FixResultType: FixResultTypeFailed,
Info: "failed to delete current workflow execution",
InfoDetails: err.Error(),
}
}
return FixResult{
return &FixResult{
FixResultType: FixResultTypeFixed,
}
}
8 changes: 4 additions & 4 deletions service/worker/scanner/executions/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,26 +383,26 @@ func (s *UtilSuite) TestDeleteExecution() {
testCases := []struct {
deleteConcreteErr error
deleteCurrentErr error
expectedFixResult FixResult
expectedFixResult *FixResult
}{
{
deleteConcreteErr: errors.New("error deleting concrete execution"),
expectedFixResult: FixResult{
expectedFixResult: &FixResult{
FixResultType: FixResultTypeFailed,
Info: "failed to delete concrete workflow execution",
InfoDetails: "error deleting concrete execution",
},
},
{
deleteCurrentErr: errors.New("error deleting current execution"),
expectedFixResult: FixResult{
expectedFixResult: &FixResult{
FixResultType: FixResultTypeFailed,
Info: "failed to delete current workflow execution",
InfoDetails: "error deleting current execution",
},
},
{
expectedFixResult: FixResult{
expectedFixResult: &FixResult{
FixResultType: FixResultTypeFixed,
},
},
Expand Down
10 changes: 8 additions & 2 deletions service/worker/scanner/executions/invariants/historyExists.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ func (h *historyExists) Check(execution common.Execution, resources *common.Inva
}
}

func (h *historyExists) Fix(execution common.Execution) common.FixResult {
return common.DeleteExecution(&execution, h.pr)
func (h *historyExists) Fix(execution common.Execution, resources *common.InvariantResourceBag) common.FixResult {
fixResult, checkResult := checkBeforeFix(h, execution, resources)
if fixResult != nil {
return *fixResult
}
fixResult = common.DeleteExecution(&execution, h.pr)
fixResult.CheckResult = *checkResult
return *fixResult
}

func (h *historyExists) InvariantType() common.InvariantType {
Expand Down
132 changes: 132 additions & 0 deletions service/worker/scanner/executions/invariants/invariantManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// The MIT License (MIT)
//
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package invariants

import (
"github.com/uber/cadence/service/worker/scanner/executions/common"
)

type (
invariantManager struct {
invariants []common.Invariant
types []common.InvariantType
}
)

// NewInvariantManager handles running a collection of invariants according to the invariant collection provided.
// InvariantManager takes care of ensuring invariants are run in their correct dependency order.
func NewInvariantManager(
invariantCollections []common.InvariantCollection,
pr common.PersistenceRetryer,
) common.InvariantManager {
manager := &invariantManager{}
manager.invariants, manager.types = getSortedInvariants(invariantCollections, pr)
return manager
}

// RunChecks runs the Check method of all managed invariants.
// Stops on the first check which indicates corruption or failure.
// Only returns CheckResultTypeHealthy if all managed checks indicate healthy.
func (i *invariantManager) RunChecks(execution common.Execution) common.ManagerCheckResult {
resources := &common.InvariantResourceBag{}
var checkResults []common.CheckResult
for _, iv := range i.invariants {
checkResult := iv.Check(execution, resources)
checkResults = append(checkResults, checkResult)
if checkResult.CheckResultType != common.CheckResultTypeHealthy {
return common.ManagerCheckResult{
CheckResultType: checkResult.CheckResultType,
CheckResults: checkResults,
}
}
}
return common.ManagerCheckResult{
CheckResultType: common.CheckResultTypeHealthy,
CheckResults: checkResults,
}
}

// RunFixes runs the Fix method of all managed invariants.
// Stops on the first fix which indicates a failure.
// Returns FixResultTypeSkipped if all invariants where skipped, if at least one was fixed returns FixResultTypeFixed.
func (i *invariantManager) RunFixes(execution common.Execution) common.ManagerFixResult {
resources := &common.InvariantResourceBag{}
encounteredFix := false
var fixResults []common.FixResult
for _, iv := range i.invariants {
fixResult := iv.Fix(execution, resources)
fixResults = append(fixResults, fixResult)
if fixResult.FixResultType == common.FixResultTypeFailed {
return common.ManagerFixResult{
FixResultType: common.FixResultTypeFailed,
FixResults: fixResults,
}
}
if fixResult.FixResultType == common.FixResultTypeFixed {
encounteredFix = true
}
}
if encounteredFix {
return common.ManagerFixResult{
FixResultType: common.FixResultTypeFixed,
FixResults: fixResults,
}
}
return common.ManagerFixResult{
FixResultType: common.FixResultTypeSkipped,
FixResults: fixResults,
}
}

// InvariantTypes returns sorted list of all invariants that manager will run.
func (i *invariantManager) InvariantTypes() []common.InvariantType {
return i.types
}

func getSortedInvariants(
collections []common.InvariantCollection,
pr common.PersistenceRetryer,
) ([]common.Invariant, []common.InvariantType) {
var ivs []common.Invariant
for _, collection := range collections {
switch collection {
case common.InvariantCollectionHistory:
ivs = append(ivs, getHistoryCollection(pr)...)
case common.InvariantCollectionMutableState:
ivs = append(ivs, getMutableStateCollection(pr)...)
}
}
types := make([]common.InvariantType, len(ivs), len(ivs))
for i, iv := range ivs {
types[i] = iv.InvariantType()
}
return ivs, types
}

func getHistoryCollection(pr common.PersistenceRetryer) []common.Invariant {
return []common.Invariant{NewHistoryExists(pr), NewValidFirstEvent(pr)}
}

func getMutableStateCollection(pr common.PersistenceRetryer) []common.Invariant {
return []common.Invariant{NewOpenCurrentExecution(pr)}
}
Loading

0 comments on commit 9ab9771

Please sign in to comment.