Skip to content

Commit

Permalink
Do not get workflow execution from database when shard is closed (ube…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Mar 1, 2024
1 parent f75fc71 commit 3f64176
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 2 deletions.
2 changes: 1 addition & 1 deletion service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ func (c *contextImpl) getWorkflowExecutionWithRetry(
var resp *persistence.GetWorkflowExecutionResponse
op := func() error {
var err error
resp, err = c.executionManager.GetWorkflowExecution(ctx, request)
resp, err = c.shard.GetWorkflowExecution(ctx, request)

return err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/ndc/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (r *transactionManagerImpl) checkWorkflowExists(
if errorDomainName != nil {
return false, errorDomainName
}
_, err := r.shard.GetExecutionManager().GetWorkflowExecution(
_, err := r.shard.GetWorkflowExecution(
ctx,
&persistence.GetWorkflowExecutionRequest{
DomainID: domainID,
Expand Down
11 changes: 11 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type (
GetDomainNotificationVersion() int64
UpdateDomainNotificationVersion(domainNotificationVersion int64) error

GetWorkflowExecution(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error)
CreateWorkflowExecution(ctx context.Context, request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error)
UpdateWorkflowExecution(ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error)
ConflictResolveWorkflowExecution(ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
Expand Down Expand Up @@ -578,6 +579,16 @@ func (s *contextImpl) UpdateTimerMaxReadLevel(cluster string) time.Time {
return s.timerMaxReadLevelMap[cluster]
}

func (s *contextImpl) GetWorkflowExecution(
ctx context.Context,
request *persistence.GetWorkflowExecutionRequest,
) (*persistence.GetWorkflowExecutionResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
}
return s.executionManager.GetWorkflowExecution(ctx, request)
}

func (s *contextImpl) CreateWorkflowExecution(
ctx context.Context,
request *persistence.CreateWorkflowExecutionRequest,
Expand Down
79 changes: 79 additions & 0 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -229,3 +230,81 @@ func (s *contextTestSuite) TestGetAndUpdateProcessingQueueStates() {
s.Equal(updatedTransferQueueStates[0].GetAckLevel(), s.context.GetTransferClusterAckLevel(clusterName))
s.Equal(time.Unix(0, updatedTimerQueueStates[0].GetAckLevel()), s.context.GetTimerClusterAckLevel(clusterName))
}

func TestGetWorkflowExecution(t *testing.T) {
testCases := []struct {
name string
isClosed bool
request *persistence.GetWorkflowExecutionRequest
mockSetup func(*mocks.ExecutionManager)
expectedResult *persistence.GetWorkflowExecutionResponse
expectedError error
}{
{
name: "Success",
request: &persistence.GetWorkflowExecutionRequest{
DomainID: "testDomain",
Execution: types.WorkflowExecution{WorkflowID: "testWorkflowID", RunID: "testRunID"},
},
mockSetup: func(mgr *mocks.ExecutionManager) {
mgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DomainID: "testDomain",
WorkflowID: "testWorkflowID",
RunID: "testRunID",
},
},
}, nil)
},
expectedResult: &persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DomainID: "testDomain",
WorkflowID: "testWorkflowID",
RunID: "testRunID",
},
},
},
expectedError: nil,
},
{
name: "Error",
request: &persistence.GetWorkflowExecutionRequest{
DomainID: "testDomain",
Execution: types.WorkflowExecution{WorkflowID: "testWorkflowID", RunID: "testRunID"},
},
mockSetup: func(mgr *mocks.ExecutionManager) {
mgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(nil, errors.New("some random error"))
},
expectedResult: nil,
expectedError: errors.New("some random error"),
},
{
name: "Shard closed",
isClosed: true,
request: &persistence.GetWorkflowExecutionRequest{
DomainID: "testDomain",
Execution: types.WorkflowExecution{WorkflowID: "testWorkflowID", RunID: "testRunID"},
},
mockSetup: func(mgr *mocks.ExecutionManager) {},
expectedResult: nil,
expectedError: ErrShardClosed,
},
}

for _, tc := range testCases {
mockExecutionMgr := &mocks.ExecutionManager{}
shardContext := &contextImpl{
executionManager: mockExecutionMgr,
}
if tc.isClosed {
shardContext.closed = 1
}
tc.mockSetup(mockExecutionMgr)

result, err := shardContext.GetWorkflowExecution(context.Background(), tc.request)
assert.Equal(t, tc.expectedResult, result)
assert.Equal(t, tc.expectedError, err)
}
}

0 comments on commit 3f64176

Please sign in to comment.