Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap underlying cause for conditional update error #4797

Merged
merged 6 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ Update_History_Loop:
}

if updateErr != nil {
if updateErr == execution.ErrConflict {
if execution.IsConflictError(updateErr) {
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.ConcurrencyUpdateFailureCounter)
continue Update_History_Loop
}
Expand Down
33 changes: 26 additions & 7 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/uber/cadence/common"
Expand All @@ -45,10 +46,29 @@ const (
defaultRemoteCallTimeout = 30 * time.Second
)

var (
// ErrConflict is exported temporarily for integration test
ErrConflict = errors.New("conditional update failed")
)
type conflictError struct {
Cause error
}

func (e *conflictError) Error() string {
return fmt.Sprintf("conditional update failed: %v", e.Cause)
}

func (e *conflictError) Unwrap() error {
return e.Cause
}

// NewConflictError is only public because it used in workflow/util_test.go
// TODO: refactor those tests
func NewConflictError(_ *testing.T, cause error) error {
return &conflictError{cause}
}

// IsConflictError checks whether a conflict has occurred while updating a workflow execution
func IsConflictError(err error) bool {
var e *conflictError
return errors.As(err, &e)
}

type (
// Context is the processing context for all operations on workflow execution
Expand Down Expand Up @@ -1114,8 +1134,7 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
case nil:
return resp, nil
case *persistence.ConditionFailedError:
// TODO get rid of ErrConflict
return nil, ErrConflict
return nil, &conflictError{err}
default:
c.logger.Error(
"Persistent store operation failure",
Expand Down Expand Up @@ -1274,6 +1293,6 @@ func (c *contextImpl) isPersistenceTimeoutError(
case *persistence.TimeoutError:
return true
default:
return err != ErrConflict
return !IsConflictError(err)
}
}
4 changes: 2 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ UpdateWorkflowLoop:
newMutableState,
)
if updateErr != nil {
if updateErr == execution.ErrConflict {
if execution.IsConflictError(updateErr) {
e.metricsClient.IncCounter(metrics.HistoryStartWorkflowExecutionScope, metrics.ConcurrencyUpdateFailureCounter)
continue UpdateWorkflowLoop
}
Expand Down Expand Up @@ -2391,7 +2391,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
// the history and try the operation again.
if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil {
if err == execution.ErrConflict {
if execution.IsConflictError(err) {
continue Just_Signal_Loop
}
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ UpdateHistoryLoop:
}

err = workflowContext.GetContext().UpdateWorkflowExecutionAsActive(ctx, now)
if err == execution.ErrConflict {
if execution.IsConflictError(err) {
if attempt != ConditionalRetryCount-1 {
_, err = workflowContext.ReloadMutableState(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -71,7 +72,7 @@ func TestUpdateHelper(t *testing.T) {
{
msg: "update workflow conflict",
mockSetupFn: func(mockContext *execution.MockContext, mockMutableState *execution.MockMutableState) {
mockContext.EXPECT().UpdateWorkflowExecutionAsActive(gomock.Any(), gomock.Any()).Return(execution.ErrConflict).Times(ConditionalRetryCount - 1)
mockContext.EXPECT().UpdateWorkflowExecutionAsActive(gomock.Any(), gomock.Any()).Return(execution.NewConflictError(t, assert.AnError)).Times(ConditionalRetryCount - 1)
mockContext.EXPECT().UpdateWorkflowExecutionAsActive(gomock.Any(), gomock.Any()).Return(nil).Times(1)
},
actionFn: func(context execution.Context, mutableState execution.MutableState) (*UpdateAction, error) {
Expand Down