diff --git a/service/worker/batcher/workflow.go b/service/worker/batcher/workflow.go index 2e58cc930f6..a9be88846df 100644 --- a/service/worker/batcher/workflow.go +++ b/service/worker/batcher/workflow.go @@ -53,7 +53,6 @@ const ( batchActivityName = "cadence-sys-batch-activity" // InfiniteDuration is a long duration(20 yrs) we used for infinite workflow running InfiniteDuration = 20 * 365 * 24 * time.Hour - pageSize = 1000 _nonRetriableReason = "non-retriable-error" @@ -61,6 +60,8 @@ const ( DefaultRPS = 50 // DefaultConcurrency is the default concurrency DefaultConcurrency = 5 + // DefaultPageSize is the default page size + DefaultPageSize = 1000 // DefaultAttemptsOnRetryableError is the default value for AttemptsOnRetryableError DefaultAttemptsOnRetryableError = 50 // DefaultActivityHeartBeatTimeout is the default value for ActivityHeartBeatTimeout @@ -135,6 +136,8 @@ type ( RPS int // Number of goroutines running in parallel to process Concurrency int + // Number of workflows processed in a batch + PageSize int // Number of attempts for each workflow to process in case of retryable error before giving up AttemptsOnRetryableError int // timeout for activity heartbeat @@ -237,6 +240,9 @@ func setDefaultParams(params BatchParams) BatchParams { if params.Concurrency <= 0 { params.Concurrency = DefaultConcurrency } + if params.PageSize <= 0 { + params.PageSize = DefaultPageSize + } if params.AttemptsOnRetryableError <= 0 { params.AttemptsOnRetryableError = DefaultAttemptsOnRetryableError } @@ -298,8 +304,8 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai hbd.TotalEstimate = resp.GetCount() } rateLimiter := rate.NewLimiter(rate.Limit(batchParams.RPS), batchParams.RPS) - taskCh := make(chan taskDetail, pageSize) - respCh := make(chan error, pageSize) + taskCh := make(chan taskDetail, batchParams.PageSize) + respCh := make(chan error, batchParams.PageSize) for i := 0; i < batchParams.Concurrency; i++ { go startTaskProcessor(ctx, batchParams, domainID, taskCh, respCh, rateLimiter, client, adminClient) } @@ -310,7 +316,7 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai // And we can't use list API because terminate / reset will mutate the result. resp, err := client.ScanWorkflowExecutions(ctx, &types.ListWorkflowExecutionsRequest{ Domain: batchParams.DomainName, - PageSize: int32(pageSize), + PageSize: int32(batchParams.PageSize), NextPageToken: hbd.PageToken, Query: batchParams.Query, }) diff --git a/tools/cli/flags.go b/tools/cli/flags.go index 2a09c648442..03514132bc9 100644 --- a/tools/cli/flags.go +++ b/tools/cli/flags.go @@ -253,6 +253,8 @@ const ( FlagFailoverTypeWithAlias = FlagFailoverType + ", ft" FlagFailoverTimeout = "failover_timeout_seconds" FlagFailoverTimeoutWithAlias = FlagFailoverTimeout + ", fts" + FlagActivityHeartBeatTimeout = "heart_beat_timeout_seconds" + FlagActivityHeartBeatTimeoutWithAlias = FlagActivityHeartBeatTimeout + ", hbts" FlagFailoverWaitTime = "failover_wait_time_second" FlagFailoverWaitTimeWithAlias = FlagFailoverWaitTime + ", fwts" FlagFailoverBatchSize = "failover_batch_size" diff --git a/tools/cli/workflow.go b/tools/cli/workflow.go index c55d5bd73bd..9c3488dc969 100644 --- a/tools/cli/workflow.go +++ b/tools/cli/workflow.go @@ -22,6 +22,7 @@ package cli import ( "strings" + "time" "github.com/urfave/cli" @@ -511,6 +512,26 @@ func newBatchCommands() []cli.Command { Name: FlagYes, Usage: "Optional flag to disable confirmation prompt", }, + cli.IntFlag{ + Name: FlagPageSize, + Value: batcher.DefaultPageSize, + Usage: "PageSize of processiing", + }, + cli.IntFlag{ + Name: FlagRetryAttempts, + Value: batcher.DefaultAttemptsOnRetryableError, + Usage: "Retry attempts for retriable errors", + }, + cli.IntFlag{ + Name: FlagActivityHeartBeatTimeoutWithAlias, + Value: int(batcher.DefaultActivityHeartBeatTimeout / time.Second), + Usage: "Heartbeat timeout for batcher activity in seconds", + }, + cli.IntFlag{ + Name: FlagConcurrency, + Value: batcher.DefaultConcurrency, + Usage: "Concurrency of batch activity", + }, }, Action: func(c *cli.Context) { StartBatchJob(c) diff --git a/tools/cli/workflowBatchCommands.go b/tools/cli/workflowBatchCommands.go index 63f8b21ca52..b35933d9db7 100644 --- a/tools/cli/workflowBatchCommands.go +++ b/tools/cli/workflowBatchCommands.go @@ -26,6 +26,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/pborman/uuid" "github.com/urfave/cli" @@ -172,6 +173,10 @@ func StartBatchJob(c *cli.Context) { targetCluster = getRequiredOption(c, FlagTargetCluster) } rps := c.Int(FlagRPS) + pageSize := c.Int(FlagPageSize) + concurrency := c.Int(FlagConcurrency) + retryAttempt := c.Int(FlagRetryAttempts) + heartBeatTimeout := time.Duration(c.Int(FlagActivityHeartBeatTimeout)) * time.Second svcClient := cFactory.ServerFrontendClient(c) tcCtx, cancel := newContext(c) @@ -221,7 +226,11 @@ func StartBatchJob(c *cli.Context) { SourceCluster: sourceCluster, TargetCluster: targetCluster, }, - RPS: rps, + RPS: rps, + Concurrency: concurrency, + PageSize: pageSize, + AttemptsOnRetryableError: retryAttempt, + ActivityHeartBeatTimeout: heartBeatTimeout, } input, err := json.Marshal(params) if err != nil {