Skip to content

Commit

Permalink
Add fixer workflow triggered by remote (cadence-workflow#4482)
Browse files Browse the repository at this point in the history
* Add fixer workflow triggered by remote
  • Loading branch information
yux0 authored Oct 12, 2021
1 parent b21e5e0 commit e13da58
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 20 deletions.
11 changes: 11 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,8 @@ const (
ParentClosePolicyProcessorScope
// ShardScannerScope is scope used by all metrics emitted by worker.shardscanner module
ShardScannerScope
// CheckDataCorruptionWorkflowScope is scope used by the data corruption workflow
CheckDataCorruptionWorkflowScope

NumWorkerScopes
)
Expand Down Expand Up @@ -1688,6 +1690,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
TaskListScavengerScope: {operation: "tasklistscavenger"},
ExecutionsScannerScope: {operation: "ExecutionsScanner"},
ShardScannerScope: {operation: "ShardScanner"},
CheckDataCorruptionWorkflowScope: {operation: "CheckDataCorruptionWorkflow"},
ExecutionsFixerScope: {operation: "ExecutionsFixer"},
HistoryScavengerScope: {operation: "historyscavenger"},
BatcherScope: {operation: "batcher"},
Expand Down Expand Up @@ -2201,6 +2204,10 @@ const (
ScannerShardSizeTenGauge
ShardScannerScan
ShardScannerFix
DataCorruptionWorkflowCount
DataCorruptionWorkflowFailure
DataCorruptionWorkflowSuccessCount
DataCorruptionWorkflowSkipCount

NumWorkerMetrics
)
Expand Down Expand Up @@ -2718,6 +2725,10 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ScannerShardSizeTenGauge: {metricName: "scanner_shard_size_ten", metricType: Gauge},
ShardScannerScan: {metricName: "shardscanner_scan", metricType: Counter},
ShardScannerFix: {metricName: "shardscanner_fix", metricType: Counter},
DataCorruptionWorkflowFailure: {metricName: "data_corruption_workflow_failure", metricType: Counter},
DataCorruptionWorkflowSuccessCount: {metricName: "data_corruption_workflow_success", metricType: Counter},
DataCorruptionWorkflowCount: {metricName: "data_corruption_workflow_count", metricType: Counter},
DataCorruptionWorkflowSkipCount: {metricName: "data_corruption_workflow_skips", metricType: Counter},
},
}

Expand Down
12 changes: 6 additions & 6 deletions common/reconciliation/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ package reconciliation
// Execution fixer workflow relates

const (
ExecutionFixerWorkflowType = "execution-fixer-workflow"
ExecutionFixerWorkflowTaskList = "execution-fixer-tl"
ExecutionFixerWorkflowSignalName = "execution-fixer-signal"
ExecutionFixerWorkflowID = "execution-fixer-workflow-id"
ExecutionFixerWorkflowTimeout = 24 * 60 * 60
ExecutionFixerWorkflowTaskTimeoutInSeconds = 60
CheckDataCorruptionWorkflowType = "check-data-corruption-workflow"
CheckDataCorruptionWorkflowTaskList = "check-data-corruption-workflow-tl"
CheckDataCorruptionWorkflowSignalName = "check-data-corruption-workflow-signal"
CheckDataCorruptionWorkflowID = "check-data-corruption-workflow-id"
CheckDataCorruptionWorkflowTimeoutInSeconds = 24 * 60 * 60
CheckDataCorruptionWorkflowTaskTimeoutInSeconds = 60
)
2 changes: 1 addition & 1 deletion common/reconciliation/fetcher/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/uber/cadence/common/reconciliation/entity"
)

// ConcreteExecutionIterator is used to retrieve Concrete executions.
// TimerIterator is used to retrieve Concrete executions.
func TimerIterator(
ctx context.Context,
retryer persistence.Retryer,
Expand Down
13 changes: 7 additions & 6 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ func (p *taskProcessorImpl) triggerDataInconsistencyScan(replicationTask *types.
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ShardID: p.shard.GetShardID(),
}
fixExecutionInput, err := json.Marshal(fixExecution)
if err != nil {
Expand All @@ -591,15 +592,15 @@ func (p *taskProcessorImpl) triggerDataInconsistencyScan(replicationTask *types.
// Assume the workflow is corrupted, rely on invariant to validate it
_, err = client.SignalWithStartWorkflowExecution(context.Background(), &types.SignalWithStartWorkflowExecutionRequest{
Domain: common.SystemLocalDomainName,
WorkflowID: reconciliation.ExecutionFixerWorkflowID,
WorkflowType: &types.WorkflowType{Name: reconciliation.ExecutionFixerWorkflowType},
TaskList: &types.TaskList{Name: reconciliation.ExecutionFixerWorkflowTaskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(reconciliation.ExecutionFixerWorkflowTimeout),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(reconciliation.ExecutionFixerWorkflowTaskTimeoutInSeconds),
WorkflowID: reconciliation.CheckDataCorruptionWorkflowID,
WorkflowType: &types.WorkflowType{Name: reconciliation.CheckDataCorruptionWorkflowType},
TaskList: &types.TaskList{Name: reconciliation.CheckDataCorruptionWorkflowTaskList},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(reconciliation.CheckDataCorruptionWorkflowTimeoutInSeconds),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(reconciliation.CheckDataCorruptionWorkflowTaskTimeoutInSeconds),
Identity: "cadence-history-replication",
RequestID: uuid.New(),
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(),
SignalName: reconciliation.ExecutionFixerWorkflowSignalName,
SignalName: reconciliation.CheckDataCorruptionWorkflowSignalName,
SignalInput: fixExecutionInput,
})
return err
Expand Down
9 changes: 5 additions & 4 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,17 +319,18 @@ func (s *taskProcessorSuite) TestTriggerDataInconsistencyScan_Success() {
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ShardID: s.mockShard.GetShardID(),
}
jsArray, err := json.Marshal(fixExecution)
s.NoError(err)
s.mockFrontendClient.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *types.SignalWithStartWorkflowExecutionRequest) {
s.Equal(common.SystemLocalDomainName, request.GetDomain())
s.Equal(reconciliation.ExecutionFixerWorkflowID, request.GetWorkflowID())
s.Equal(reconciliation.ExecutionFixerWorkflowType, request.GetWorkflowType().GetName())
s.Equal(reconciliation.ExecutionFixerWorkflowTaskList, request.GetTaskList().GetName())
s.Equal(reconciliation.CheckDataCorruptionWorkflowID, request.GetWorkflowID())
s.Equal(reconciliation.CheckDataCorruptionWorkflowType, request.GetWorkflowType().GetName())
s.Equal(reconciliation.CheckDataCorruptionWorkflowTaskList, request.GetTaskList().GetName())
s.Equal(types.WorkflowIDReusePolicyAllowDuplicate.String(), request.GetWorkflowIDReusePolicy().String())
s.Equal(reconciliation.ExecutionFixerWorkflowSignalName, request.GetSignalName())
s.Equal(reconciliation.CheckDataCorruptionWorkflowSignalName, request.GetSignalName())
s.Equal(jsArray, request.GetSignalInput())
}).Return(&types.StartWorkflowExecutionResponse{}, nil)
s.clusterMetadata.EXPECT().ClusterNameForFailoverVersion(int64(100)).Return("active")
Expand Down
251 changes: 251 additions & 0 deletions service/worker/scanner/data_corruption_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package scanner

import (
"context"
"fmt"
"time"

"go.uber.org/cadence/activity"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"

c "github.com/uber/cadence/common"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation"
"github.com/uber/cadence/common/reconciliation/entity"
"github.com/uber/cadence/common/reconciliation/fetcher"
"github.com/uber/cadence/common/reconciliation/invariant"
"github.com/uber/cadence/common/resource"
)

func init() {
workflow.RegisterWithOptions(
CheckDataCorruptionWorkflow,
workflow.RegisterOptions{Name: reconciliation.CheckDataCorruptionWorkflowType},
)
activity.Register(ExecutionFixerActivity)
activity.Register(EmitResultMetricsActivity)
}

const (
workflowTimer = 5 * time.Minute
maxSignalNumber = 1000
fixerActivityTimeout = time.Minute
)

// CheckDataCorruptionWorkflow is invoked by remote cluster via signals
func CheckDataCorruptionWorkflow(ctx workflow.Context, fixList []entity.Execution) error {

logger := workflow.GetLogger(ctx)
signalCh := workflow.GetSignalChannel(ctx, reconciliation.CheckDataCorruptionWorkflowSignalName)
signalCount := 0
maxReceivedSignalNumber := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return maxSignalNumber
})

for {
selector := workflow.NewSelector(ctx)
// timer
timerCtx, timerCancel := workflow.WithCancel(ctx)
waitTimer := workflow.NewTimer(timerCtx, workflowTimer)
selector.AddFuture(waitTimer, func(f workflow.Future) {
// do nothing. Unblock the selector
})

// signal
selector.AddReceive(signalCh, func(c workflow.Channel, more bool) {
var fixExecution entity.Execution
for c.ReceiveAsync(&fixExecution) {
signalCount++
fixList = append(fixList, fixExecution)
}
})

selector.Select(ctx)
if len(fixList) == 0 {
return nil
}
timerCancel()

timeout := fixerActivityTimeout * time.Duration(len(fixList))
activityOptions = workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: timeout,
HeartbeatTimeout: fixerActivityTimeout,
RetryPolicy: &activityRetryPolicy,
}
activityCtx := workflow.WithActivityOptions(ctx, activityOptions)
var fixResults []invariant.FixResult
err := workflow.ExecuteActivity(activityCtx, ExecutionFixerActivity, fixList).Get(activityCtx, &fixResults)
if err != nil {
logger.Error("failed to run execution fixer activity", zap.Error(err))
return err
}

activityOptions = workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
}
activityCtx = workflow.WithActivityOptions(ctx, activityOptions)
err = workflow.ExecuteActivity(activityCtx, EmitResultMetricsActivity, fixResults).Get(activityCtx, nil)
if err != nil {
logger.Error("failed to run execution fixer activity", zap.Error(err))
return err
}

fixList = []entity.Execution{}
workflow.GetMetricsScope(ctx)
var maxSignalCount int
if err := maxReceivedSignalNumber.Get(&maxSignalCount); err != nil {
logger.Error("failed to get max supported signal number")
return err
}

if signalCount > maxSignalCount {
var fixExecution entity.Execution
for signalCh.ReceiveAsync(&fixExecution) {
fixList = append(fixList, fixExecution)
}
return workflow.NewContinueAsNewError(ctx, reconciliation.CheckDataCorruptionWorkflowType, fixList)
}
}
}

func ExecutionFixerActivity(ctx context.Context, fixList []entity.Execution) ([]invariant.FixResult, error) {
var result []invariant.FixResult
index := 0
if activity.HasHeartbeatDetails(ctx) {
activity.GetHeartbeatDetails(ctx, &index)
}

for index < len(fixList) {
execution := fixList[index]
pr, err := getDefaultDAO(ctx, execution.ShardID)
if err != nil {
return nil, err
}
request := fetcher.ExecutionRequest{
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
}
concreteExecution, err := fetcher.ConcreteExecution(ctx, pr, request)
if err != nil {
return nil, err
}

currentExecutionInvariant := invariant.NewOpenCurrentExecution(pr)
fixResult := currentExecutionInvariant.Fix(ctx, concreteExecution)
result = append(result, fixResult)
historyInvariant := invariant.NewHistoryExists(pr)
fixResult = historyInvariant.Fix(ctx, concreteExecution)
result = append(result, fixResult)
activity.RecordHeartbeat(ctx, index)
index++
}
return result, nil
}

func getDefaultDAO(
ctx context.Context,
shardID int,
) (persistence.Retryer, error) {
sc, err := getScannerContext(ctx)
if err != nil {
return nil, fmt.Errorf("cannot find key %v in context", reconciliation.CheckDataCorruptionWorkflowType)
}
res := sc.resource

execManager, err := res.GetExecutionManager(shardID)
if err != nil {
return nil, err
}
pr := persistence.NewPersistenceRetryer(execManager, res.GetHistoryManager(), c.CreatePersistenceRetryPolicy())
return pr, nil
}

func EmitResultMetricsActivity(ctx context.Context, fixResults []invariant.FixResult) error {
sc, err := getScannerContext(ctx)
if err != nil {
return fmt.Errorf("cannot find key %v in context", reconciliation.CheckDataCorruptionWorkflowType)
}
res := sc.resource

for _, result := range fixResults {
scope := res.GetMetricsClient().Scope(
metrics.CheckDataCorruptionWorkflowScope,
metrics.InvariantTypeTag(string(result.InvariantName)))

scope.IncCounter(metrics.DataCorruptionWorkflowCount)
switch result.FixResultType {
case invariant.FixResultTypeFailed:
scope.IncCounter(metrics.DataCorruptionWorkflowFailure)
case invariant.FixResultTypeFixed:
scope.IncCounter(metrics.DataCorruptionWorkflowSuccessCount)
case invariant.FixResultTypeSkipped:
scope.IncCounter(metrics.DataCorruptionWorkflowSkipCount)
}
}
return nil
}

func NewDataCorruptionWorkflowWorker(
resource resource.Resource,
params *BootstrapParams,
) *Scanner {

zapLogger, err := zap.NewProduction()
if err != nil {
resource.GetLogger().Fatal("failed to initialize zap logger", tag.Error(err))
}
return &Scanner{
context: scannerContext{
resource: resource,
},
tallyScope: params.TallyScope,
zapLogger: zapLogger.Named("data-corruption-workflow"),
}

}

func (s *Scanner) StartDataCorruptionWorkflowWorker() error {
ctx := context.WithValue(context.Background(), contextKey(reconciliation.CheckDataCorruptionWorkflowType), s.context)
workerOpts := worker.Options{
Logger: s.zapLogger,
MetricsScope: s.tallyScope,
MaxConcurrentActivityExecutionSize: maxConcurrentActivityExecutionSize,
MaxConcurrentDecisionTaskExecutionSize: maxConcurrentDecisionTaskExecutionSize,
BackgroundActivityContext: ctx,
}

err := worker.New(
s.context.resource.GetSDKClient(),
c.SystemLocalDomainName,
reconciliation.CheckDataCorruptionWorkflowTaskList,
workerOpts,
).Start()
return err
}
Loading

0 comments on commit e13da58

Please sign in to comment.