Skip to content

Commit

Permalink
Wrap shadow workflow config inside SideEffect (uber#4140)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Apr 16, 2021
1 parent d0a8f7e commit d53e631
Showing 1 changed file with 52 additions and 16 deletions.
68 changes: 52 additions & 16 deletions service/worker/shadower/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,25 @@ import (
)

const (
defaultScanWorkflowPageSize = 2000

// NOTE: do not simply change following values as it may result in workflow non-deterministic errors
defaultReplayConcurrency = 1
defaultMaxReplayConcurrency = 50
defaultMaxShadowCountPerRun = 100000

defaultScanWorkflowPageSize = 2000
defaultSamplingRate = 1.0
defaultReplayConcurrency = 1
defaultMaxReplayConcurrency = 50
defaultMaxShadowCountPerRun = 100000
defaultWaitDurationPerIteration = 5 * time.Minute
)

type (
workflowConfig struct {
ScanWorkflowPageSize int32
DefaultSamplingRate float64
DefaultReplayConcurrency int32
MaxReplayConcurrency int32
MaxShadowCountPerRun int32
WaitDurationPerIteration time.Duration
}
)

func register(worker worker.Worker) {
worker.RegisterWorkflowWithOptions(
shadowWorkflow,
Expand All @@ -57,7 +66,13 @@ func shadowWorkflow(
) (shadower.WorkflowResult, error) {
profile := beginWorkflow(ctx, &params)

if err := validateAndFillWorkflowParams(&params); err != nil {
var config workflowConfig
config, err := getWorkflowConfig(ctx)
if err != nil {
return shadower.WorkflowResult{}, profile.endWorkflow(err)
}

if err := validateAndFillWorkflowParams(&params, &config); err != nil {
return shadower.WorkflowResult{}, profile.endWorkflow(err)
}

Expand Down Expand Up @@ -99,7 +114,7 @@ func shadowWorkflow(
Domain: params.Domain,
WorkflowQuery: params.WorkflowQuery,
NextPageToken: params.NextPageToken,
PageSize: common.Int32Ptr(defaultScanWorkflowPageSize),
PageSize: common.Int32Ptr(config.ScanWorkflowPageSize),
SamplingRate: params.SamplingRate,
}
for {
Expand Down Expand Up @@ -137,14 +152,14 @@ func shadowWorkflow(
break
}

if shouldContinueAsNew(shadowResult) {
if shouldContinueAsNew(shadowResult, &config) {
continueAsNewErr := getContinueAsNewError(ctx, params, profile.startTime, params.GetLastRunResult(), shadowResult, scanParams.NextPageToken)
return shadower.WorkflowResult{}, profile.endWorkflow(continueAsNewErr)
}
}

if params.GetShadowMode() == shadower.ModeContinuous {
if err := workflow.Sleep(ctx, defaultWaitDurationPerIteration); err != nil {
if err := workflow.Sleep(ctx, config.WaitDurationPerIteration); err != nil {
return shadower.WorkflowResult{}, profile.endWorkflow(err)
}
continueAsNewErr := getContinueAsNewError(ctx, params, profile.startTime, params.GetLastRunResult(), shadowResult, nil)
Expand All @@ -154,8 +169,28 @@ func shadowWorkflow(
return combineShadowResults(shadowResult, params.GetLastRunResult()), profile.endWorkflow(nil)
}

func getWorkflowConfig(
ctx workflow.Context,
) (workflowConfig, error) {
var config workflowConfig
if err := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return workflowConfig{
ScanWorkflowPageSize: defaultScanWorkflowPageSize,
DefaultSamplingRate: defaultSamplingRate,
DefaultReplayConcurrency: defaultReplayConcurrency,
MaxReplayConcurrency: defaultMaxReplayConcurrency,
MaxShadowCountPerRun: defaultMaxShadowCountPerRun,
WaitDurationPerIteration: defaultWaitDurationPerIteration,
}
}).Get(&config); err != nil {
return workflowConfig{}, err
}
return config, nil
}

func validateAndFillWorkflowParams(
params *shadower.WorkflowParams,
config *workflowConfig,
) error {
if len(params.GetDomain()) == 0 {
return errors.New("Domain is not set on shadower workflow params")
Expand All @@ -166,15 +201,15 @@ func validateAndFillWorkflowParams(
}

if params.GetSamplingRate() == 0 {
params.SamplingRate = common.Float64Ptr(1)
params.SamplingRate = common.Float64Ptr(config.DefaultSamplingRate)
}

if params.GetConcurrency() == 0 {
params.Concurrency = common.Int32Ptr(defaultReplayConcurrency)
params.Concurrency = common.Int32Ptr(config.DefaultReplayConcurrency)
}

if params.GetConcurrency() > defaultMaxReplayConcurrency {
params.Concurrency = common.Int32Ptr(defaultMaxReplayConcurrency)
if params.GetConcurrency() > config.MaxReplayConcurrency {
params.Concurrency = common.Int32Ptr(config.MaxReplayConcurrency)
}

return nil
Expand Down Expand Up @@ -223,8 +258,9 @@ func exitConditionMet(

func shouldContinueAsNew(
currentResult shadower.WorkflowResult,
config *workflowConfig,
) bool {
return currentResult.GetSucceeded()+currentResult.GetSkipped()+currentResult.GetFailed() >= defaultMaxShadowCountPerRun
return currentResult.GetSucceeded()+currentResult.GetSkipped()+currentResult.GetFailed() >= config.MaxShadowCountPerRun
}

func getContinueAsNewError(
Expand Down

0 comments on commit d53e631

Please sign in to comment.