Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batcher will now by default retry it's activity 5 times #6534

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions service/worker/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"go.uber.org/cadence/.gen/go/shared"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/resource"
Expand Down Expand Up @@ -60,5 +62,19 @@ func setuptest(t *testing.T) (*Batcher, *resource.Test) {
ServiceClient: sdkClient,
ClientBean: mockClientBean,
TallyScope: tally.TestScope(nil),
Config: Config{
ClusterMetadata: cluster.NewMetadata(
12,
"test-primary-cluster",
"test-primary-cluster",
map[string]config.ClusterInformation{
"test-primary-cluster": {},
"test-secondary-cluster": {},
},
nil,
metrics.NewClient(tally.NoopScope, metrics.Worker),
testlogger.New(t),
),
},
}), mockResource
}
115 changes: 115 additions & 0 deletions service/worker/batcher/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package batcher

import (
"time"

"github.com/uber/cadence/common/types"
)

// TerminateParams is the parameters for terminating workflow
type TerminateParams struct {
// this indicates whether to terminate children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
TerminateChildren *bool
}

// CancelParams is the parameters for canceling workflow
type CancelParams struct {
// this indicates whether to cancel children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
CancelChildren *bool
}

// SignalParams is the parameters for signaling workflow
type SignalParams struct {
SignalName string
Input string
}

// ReplicateParams is the parameters for replicating workflow
type ReplicateParams struct {
SourceCluster string
TargetCluster string
}

// BatchParams is the parameters for batch operation workflow
type BatchParams struct {
// Target domain to execute batch operation
DomainName string
// To get the target workflows for processing
Query string
// Reason for the operation
Reason string
// Supporting: reset,terminate
BatchType string

// Below are all optional
// TerminateParams is params only for BatchTypeTerminate
TerminateParams TerminateParams
// CancelParams is params only for BatchTypeCancel
CancelParams CancelParams
// SignalParams is params only for BatchTypeSignal
SignalParams SignalParams
// ReplicateParams is params only for BatchTypeReplicate
ReplicateParams ReplicateParams
// RPS of processing. Default to DefaultRPS
// TODO we will implement smarter way than this static rate limiter: https://github.com/uber/cadence/issues/2138
RPS int
// Number of goroutines running in parallel to process
Concurrency int
// Number of workflows processed in a batch
PageSize int
// Number of attempts for each workflow to process in case of retryable error before giving up
AttemptsOnRetryableError int
// timeout for activity heartbeat
ActivityHeartBeatTimeout time.Duration
// Max number of attempts for the activity to retry. Default to 0 (unlimited)
MaxActivityRetries int
// errors that will not retry which consumes AttemptsOnRetryableError. Default to empty
NonRetryableErrors []string
// internal conversion for NonRetryableErrors
_nonRetryableErrors map[string]struct{}
}

// HeartBeatDetails is the struct for heartbeat details
type HeartBeatDetails struct {
PageToken []byte
CurrentPage int
// This is just an estimation for visibility
TotalEstimate int64
// Number of workflows processed successfully
SuccessCount int
// Number of workflows that give up due to errors.
ErrorCount int
}

type taskDetail struct {
execution types.WorkflowExecution
attempts int
// passing along the current heartbeat details to make heartbeat within a task so that it won't timeout
hbd HeartBeatDetails
}
124 changes: 24 additions & 100 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ const (
DefaultAttemptsOnRetryableError = 50
// DefaultActivityHeartBeatTimeout is the default value for ActivityHeartBeatTimeout
DefaultActivityHeartBeatTimeout = time.Second * 10
// DefaultMaxActivityRetries is the default value for MaxActivityRetries
DefaultMaxActivityRetries = 4
)

const (
Expand All @@ -82,94 +84,8 @@ const (
// AllBatchTypes is the batch types we supported
var AllBatchTypes = []string{BatchTypeTerminate, BatchTypeCancel, BatchTypeSignal, BatchTypeReplicate}

type (
// TerminateParams is the parameters for terminating workflow
TerminateParams struct {
// this indicates whether to terminate children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
TerminateChildren *bool
}

// CancelParams is the parameters for canceling workflow
CancelParams struct {
// this indicates whether to cancel children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
CancelChildren *bool
}

// SignalParams is the parameters for signaling workflow
SignalParams struct {
SignalName string
Input string
}

// ReplicateParams is the parameters for replicating workflow
ReplicateParams struct {
SourceCluster string
TargetCluster string
}

// BatchParams is the parameters for batch operation workflow
BatchParams struct {
// Target domain to execute batch operation
DomainName string
// To get the target workflows for processing
Query string
// Reason for the operation
Reason string
// Supporting: reset,terminate
BatchType string

// Below are all optional
// TerminateParams is params only for BatchTypeTerminate
TerminateParams TerminateParams
// CancelParams is params only for BatchTypeCancel
CancelParams CancelParams
// SignalParams is params only for BatchTypeSignal
SignalParams SignalParams
// ReplicateParams is params only for BatchTypeReplicate
ReplicateParams ReplicateParams
// RPS of processing. Default to DefaultRPS
// TODO we will implement smarter way than this static rate limiter: https://github.com/uber/cadence/issues/2138
RPS int
// Number of goroutines running in parallel to process
Concurrency int
// Number of workflows processed in a batch
PageSize int
// Number of attempts for each workflow to process in case of retryable error before giving up
AttemptsOnRetryableError int
// timeout for activity heartbeat
ActivityHeartBeatTimeout time.Duration
// errors that will not retry which consumes AttemptsOnRetryableError. Default to empty
NonRetryableErrors []string
// internal conversion for NonRetryableErrors
_nonRetryableErrors map[string]struct{}
}

// HeartBeatDetails is the struct for heartbeat details
HeartBeatDetails struct {
PageToken []byte
CurrentPage int
// This is just an estimation for visibility
TotalEstimate int64
// Number of workflows processed successfully
SuccessCount int
// Number of workflows that give up due to errors.
ErrorCount int
}

taskDetail struct {
execution types.WorkflowExecution
attempts int
// passing along the current heartbeat details to make heartbeat within a task so that it won't timeout
hbd HeartBeatDetails
}
)

var (
batchActivityRetryPolicy = cadence.RetryPolicy{
BatchActivityRetryPolicy = cadence.RetryPolicy{
InitialInterval: 10 * time.Second,
BackoffCoefficient: 1.7,
MaximumInterval: 5 * time.Minute,
Expand All @@ -180,7 +96,7 @@ var (
batchActivityOptions = workflow.ActivityOptions{
ScheduleToStartTimeout: 5 * time.Minute,
StartToCloseTimeout: InfiniteDuration,
RetryPolicy: &batchActivityRetryPolicy,
RetryPolicy: &BatchActivityRetryPolicy,
}
)

Expand All @@ -197,6 +113,7 @@ func BatchWorkflow(ctx workflow.Context, batchParams BatchParams) (HeartBeatDeta
return HeartBeatDetails{}, err
}
batchActivityOptions.HeartbeatTimeout = batchParams.ActivityHeartBeatTimeout
batchActivityOptions.RetryPolicy.MaximumAttempts = int32(batchParams.MaxActivityRetries)
opt := workflow.WithActivityOptions(ctx, batchActivityOptions)
var result HeartBeatDetails
err = workflow.ExecuteActivity(opt, batchActivityName, batchParams).Get(ctx, &result)
Expand Down Expand Up @@ -258,6 +175,9 @@ func setDefaultParams(params BatchParams) BatchParams {
if params.TerminateParams.TerminateChildren == nil {
params.TerminateParams.TerminateChildren = common.BoolPtr(true)
}
if params.MaxActivityRetries < 0 {
params.MaxActivityRetries = DefaultMaxActivityRetries
}
return params
}

Expand All @@ -281,19 +201,9 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
return HeartBeatDetails{}, err
}
domainID := domainResp.GetDomainInfo().GetUUID()
hbd := HeartBeatDetails{}
startOver := true
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &hbd); err == nil {
startOver = false
} else {
batcher := ctx.Value(batcherContextKey).(*Batcher)
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err))
}
}
hbd, ok := getHeartBeatDetails(ctx)

if startOver {
if !ok {
resp, err := client.CountWorkflowExecutions(ctx, &types.CountWorkflowExecutionsRequest{
Domain: batchParams.DomainName,
Query: batchParams.Query,
Expand Down Expand Up @@ -371,6 +281,20 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
return hbd, nil
}

func getHeartBeatDetails(ctx context.Context) (hbd HeartBeatDetails, ok bool) {
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &hbd); err != nil {
batcher := ctx.Value(batcherContextKey).(*Batcher)
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err))
return HeartBeatDetails{}, false
}
return hbd, true
}

return hbd, false
}

func startTaskProcessor(
ctx context.Context,
batchParams BatchParams,
Expand Down
Loading