Skip to content

Commit

Permalink
Batcher now only retries the activity 5 times with bad heartbeats bef…
Browse files Browse the repository at this point in the history
…ore retrying
  • Loading branch information
jakobht committed Dec 2, 2024
1 parent fcd56c6 commit f24c067
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 226 deletions.
16 changes: 16 additions & 0 deletions service/worker/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"go.uber.org/cadence/.gen/go/shared"

"github.com/uber/cadence/client"
Expand Down Expand Up @@ -60,5 +62,19 @@ func setuptest(t *testing.T) (*Batcher, *resource.Test) {
ServiceClient: sdkClient,
ClientBean: mockClientBean,
TallyScope: tally.TestScope(nil),
Config: Config{
ClusterMetadata: cluster.NewMetadata(
12,
"test-primary-cluster",
"test-primary-cluster",
map[string]config.ClusterInformation{
"test-primary-cluster": {},
"test-secondary-cluster": {},
},
nil,
metrics.NewClient(tally.NoopScope, metrics.Worker),
testlogger.New(t),
),
},
}), mockResource
}
93 changes: 93 additions & 0 deletions service/worker/batcher/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package batcher

import (
"time"

"github.com/uber/cadence/common/types"
)

// TerminateParams is the parameters for terminating workflow
type TerminateParams struct {
// this indicates whether to terminate children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
TerminateChildren *bool
}

// CancelParams is the parameters for canceling workflow
type CancelParams struct {
// this indicates whether to cancel children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
CancelChildren *bool
}

// SignalParams is the parameters for signaling workflow
type SignalParams struct {
SignalName string
Input string
}

// ReplicateParams is the parameters for replicating workflow
type ReplicateParams struct {
SourceCluster string
TargetCluster string
}

// BatchParams is the parameters for batch operation workflow
type BatchParams struct {
// Target domain to execute batch operation
DomainName string
// To get the target workflows for processing
Query string
// Reason for the operation
Reason string
// Supporting: reset,terminate
BatchType string

// Below are all optional
// TerminateParams is params only for BatchTypeTerminate
TerminateParams TerminateParams
// CancelParams is params only for BatchTypeCancel
CancelParams CancelParams
// SignalParams is params only for BatchTypeSignal
SignalParams SignalParams
// ReplicateParams is params only for BatchTypeReplicate
ReplicateParams ReplicateParams
// RPS of processing. Default to DefaultRPS
// TODO we will implement smarter way than this static rate limiter: https://github.com/uber/cadence/issues/2138
RPS int
// Number of goroutines running in parallel to process
Concurrency int
// Number of workflows processed in a batch
PageSize int
// Number of attempts for each workflow to process in case of retryable error before giving up
AttemptsOnRetryableError int
// timeout for activity heartbeat
ActivityHeartBeatTimeout time.Duration
// Max number of attempts for the activity to retry. Default to 0 (unlimited)
MaxActivityRetries int32
// errors that will not retry which consumes AttemptsOnRetryableError. Default to empty
NonRetryableErrors []string
// internal conversion for NonRetryableErrors
_nonRetryableErrors map[string]struct{}
}

// HeartBeatDetails is the struct for heartbeat details
type HeartBeatDetails struct {
PageToken []byte
CurrentPage int
// This is just an estimation for visibility
TotalEstimate int64
// Number of workflows processed successfully
SuccessCount int
// Number of workflows that give up due to errors.
ErrorCount int
}

type taskDetail struct {
execution types.WorkflowExecution
attempts int
// passing along the current heartbeat details to make heartbeat within a task so that it won't timeout
hbd HeartBeatDetails
}
119 changes: 21 additions & 98 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
batchActivityName = "cadence-sys-batch-activity"
// InfiniteDuration is a long duration(20 yrs) we used for infinite workflow running
InfiniteDuration = 20 * 365 * 24 * time.Hour
// Max number of activity failures
MaxActivityFailures = 5

_nonRetriableReason = "non-retriable-error"

Expand Down Expand Up @@ -82,92 +84,6 @@ const (
// AllBatchTypes is the batch types we supported
var AllBatchTypes = []string{BatchTypeTerminate, BatchTypeCancel, BatchTypeSignal, BatchTypeReplicate}

type (
// TerminateParams is the parameters for terminating workflow
TerminateParams struct {
// this indicates whether to terminate children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
TerminateChildren *bool
}

// CancelParams is the parameters for canceling workflow
CancelParams struct {
// this indicates whether to cancel children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
CancelChildren *bool
}

// SignalParams is the parameters for signaling workflow
SignalParams struct {
SignalName string
Input string
}

// ReplicateParams is the parameters for replicating workflow
ReplicateParams struct {
SourceCluster string
TargetCluster string
}

// BatchParams is the parameters for batch operation workflow
BatchParams struct {
// Target domain to execute batch operation
DomainName string
// To get the target workflows for processing
Query string
// Reason for the operation
Reason string
// Supporting: reset,terminate
BatchType string

// Below are all optional
// TerminateParams is params only for BatchTypeTerminate
TerminateParams TerminateParams
// CancelParams is params only for BatchTypeCancel
CancelParams CancelParams
// SignalParams is params only for BatchTypeSignal
SignalParams SignalParams
// ReplicateParams is params only for BatchTypeReplicate
ReplicateParams ReplicateParams
// RPS of processing. Default to DefaultRPS
// TODO we will implement smarter way than this static rate limiter: https://github.com/uber/cadence/issues/2138
RPS int
// Number of goroutines running in parallel to process
Concurrency int
// Number of workflows processed in a batch
PageSize int
// Number of attempts for each workflow to process in case of retryable error before giving up
AttemptsOnRetryableError int
// timeout for activity heartbeat
ActivityHeartBeatTimeout time.Duration
// errors that will not retry which consumes AttemptsOnRetryableError. Default to empty
NonRetryableErrors []string
// internal conversion for NonRetryableErrors
_nonRetryableErrors map[string]struct{}
}

// HeartBeatDetails is the struct for heartbeat details
HeartBeatDetails struct {
PageToken []byte
CurrentPage int
// This is just an estimation for visibility
TotalEstimate int64
// Number of workflows processed successfully
SuccessCount int
// Number of workflows that give up due to errors.
ErrorCount int
}

taskDetail struct {
execution types.WorkflowExecution
attempts int
// passing along the current heartbeat details to make heartbeat within a task so that it won't timeout
hbd HeartBeatDetails
}
)

var (
batchActivityRetryPolicy = cadence.RetryPolicy{
InitialInterval: 10 * time.Second,
Expand Down Expand Up @@ -197,6 +113,7 @@ func BatchWorkflow(ctx workflow.Context, batchParams BatchParams) (HeartBeatDeta
return HeartBeatDetails{}, err
}
batchActivityOptions.HeartbeatTimeout = batchParams.ActivityHeartBeatTimeout
batchActivityOptions.RetryPolicy.MaximumAttempts = batchParams.MaxActivityRetries
opt := workflow.WithActivityOptions(ctx, batchActivityOptions)
var result HeartBeatDetails
err = workflow.ExecuteActivity(opt, batchActivityName, batchParams).Get(ctx, &result)
Expand Down Expand Up @@ -281,19 +198,9 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
return HeartBeatDetails{}, err
}
domainID := domainResp.GetDomainInfo().GetUUID()
hbd := HeartBeatDetails{}
startOver := true
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &hbd); err == nil {
startOver = false
} else {
batcher := ctx.Value(batcherContextKey).(*Batcher)
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err))
}
}
hbd, ok := getHeartBeatDetails(ctx)

if startOver {
if !ok {
resp, err := client.CountWorkflowExecutions(ctx, &types.CountWorkflowExecutionsRequest{
Domain: batchParams.DomainName,
Query: batchParams.Query,
Expand All @@ -314,6 +221,7 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
// TODO https://github.com/uber/cadence/issues/2154
// Need to improve scan concurrency because it will hold an ES resource until the workflow finishes.
// And we can't use list API because terminate / reset will mutate the result.
fmt.Printf("Scanning workflows for batch operation: %v\n", hbd.PageToken)
resp, err := client.ScanWorkflowExecutions(ctx, &types.ListWorkflowExecutionsRequest{
Domain: batchParams.DomainName,
PageSize: int32(batchParams.PageSize),
Expand Down Expand Up @@ -362,6 +270,7 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
hbd.SuccessCount += succCount
hbd.ErrorCount += errCount
activity.RecordHeartbeat(ctx, hbd)
fmt.Printf("Batch operation progress: %v\n", hbd)

if len(hbd.PageToken) == 0 {
break
Expand All @@ -371,6 +280,20 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
return hbd, nil
}

func getHeartBeatDetails(ctx context.Context) (hbd HeartBeatDetails, ok bool) {
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &hbd); err != nil {
batcher := ctx.Value(batcherContextKey).(*Batcher)
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err))
return HeartBeatDetails{}, false
}
return hbd, true
}

return hbd, false
}

func startTaskProcessor(
ctx context.Context,
batchParams BatchParams,
Expand Down
Loading

0 comments on commit f24c067

Please sign in to comment.