Skip to content

Commit

Permalink
Only update maxReadLevel after successful re-acquire of shard (uber#4799
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Shaddoll authored Apr 20, 2022
1 parent b49002d commit 6ea8658
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 132 deletions.
3 changes: 3 additions & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type (
historyEngine *historyEngineImpl
mockExecutionMgr *mocks.ExecutionManager
mockHistoryV2Mgr *mocks.HistoryV2Manager
mockShardManager *mocks.ShardManager

config *config.Config
logger log.Logger
Expand Down Expand Up @@ -115,6 +116,7 @@ func (s *engine2Suite) SetupTest() {
s.mockDomainCache = s.mockShard.Resource.DomainCache
s.mockExecutionMgr = s.mockShard.Resource.ExecutionMgr
s.mockHistoryV2Mgr = s.mockShard.Resource.HistoryMgr
s.mockShardManager = s.mockShard.Resource.ShardMgr
s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata
s.mockEventsCache = s.mockShard.MockEventsCache
testDomainEntry := cache.NewLocalDomainCacheEntryForTest(
Expand Down Expand Up @@ -1300,6 +1302,7 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_CreateTimeout() {
s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(nil, notExistErr).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything, mock.Anything).Return(nil, &p.TimeoutError{}).Once()
s.mockShardManager.On("UpdateShard", mock.Anything, mock.Anything).Return(nil).Once()

resp, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
s.True(p.IsTimeoutError(err))
Expand Down
286 changes: 154 additions & 132 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,6 @@ func (s *contextImpl) CreateWorkflowExecution(
); err != nil {
return nil, err
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

Create_Loop:
for attempt := 0; attempt < conditionalRetryCount && ctx.Err() == nil; attempt++ {
Expand All @@ -634,55 +633,62 @@ Create_Loop:
request.RangeID = currentRangeID

response, err := s.executionManager.CreateWorkflowExecution(ctx, request)
if err != nil {
switch err.(type) {
case *types.WorkflowExecutionAlreadyStartedError,
*persistence.WorkflowExecutionAlreadyStartedError,
*persistence.CurrentWorkflowConditionFailedError,
*types.ServiceBusyError,
*persistence.TimeoutError,
*types.LimitExceededError:
// No special handling required for these errors
case *persistence.ShardOwnershipLostError:
{
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Create_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
return nil, err
}
switch err.(type) {
case nil:
// Update MaxReadLevel if write to DB succeeds
s.updateMaxReadLevelLocked(transferMaxReadLevel)
return response, nil
case *types.WorkflowExecutionAlreadyStartedError,
*persistence.WorkflowExecutionAlreadyStartedError,
*persistence.CurrentWorkflowConditionFailedError,
*types.ServiceBusyError:
// No special handling required for these errors
// We know write to DB fails if these errors are returned
// Updating MaxReadLevel doesn't matter in this case
s.updateMaxReadLevelLocked(transferMaxReadLevel)
return nil, err
case *persistence.ShardOwnershipLostError:
{
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Create_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
return nil, err
}
default:
{
// We have no idea if the write failed or will eventually make it to
// persistence. Increment RangeID to guarantee that subsequent reads
// will either see that write, or know for certain that it failed.
// This allows the callers to reliably check the outcome by performing
// a read.
err1 := s.renewRangeLocked(false)
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Create_Loop
}
}
default:
{
// We have no idea if the write failed or will eventually make it to
// persistence. Increment RangeID to guarantee that subsequent reads
// will either see that write, or know for certain that it failed.
// This allows the callers to reliably check the outcome by performing
// a read.
err1 := s.renewRangeLocked(false)
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Create_Loop
} else {
// Shard is re-acquired successfully, and we know write to DB is guaranteed to
// complete, it's safe to update MaxReadLevel
s.updateMaxReadLevelLocked(transferMaxReadLevel)
}
}
}

return response, err
}

Expand Down Expand Up @@ -747,7 +753,6 @@ func (s *contextImpl) UpdateWorkflowExecution(
return nil, err
}
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

Update_Loop:
for attempt := 0; attempt < conditionalRetryCount && ctx.Err() == nil; attempt++ {
Expand All @@ -758,52 +763,60 @@ Update_Loop:
request.RangeID = currentRangeID

resp, err := s.executionManager.UpdateWorkflowExecution(ctx, request)
if err != nil {
switch err.(type) {
case *persistence.ConditionFailedError,
*types.ServiceBusyError,
*types.LimitExceededError:
// No special handling required for these errors
case *persistence.ShardOwnershipLostError:
{
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Update_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
}
switch err.(type) {
case nil:
// Update MaxReadLevel if write to DB succeeds
s.updateMaxReadLevelLocked(transferMaxReadLevel)
return resp, nil
case *persistence.ConditionFailedError,
*types.ServiceBusyError:
// No special handling required for these errors
// We know write to DB fails if these errors are returned
// Updating MaxReadLevel doesn't matter in this case
s.updateMaxReadLevelLocked(transferMaxReadLevel)
return nil, err
case *persistence.ShardOwnershipLostError:
{
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Update_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
}
default:
{
// We have no idea if the write failed or will eventually make it to
// persistence. Increment RangeID to guarantee that subsequent reads
// will either see that write, or know for certain that it failed.
// This allows the callers to reliably check the outcome by performing
// a read.
err1 := s.renewRangeLocked(false)
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
}
}
default:
{
// We have no idea if the write failed or will eventually make it to
// persistence. Increment RangeID to guarantee that subsequent reads
// will either see that write, or know for certain that it failed.
// This allows the callers to reliably check the outcome by performing
// a read.
err1 := s.renewRangeLocked(false)
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
} else {
// Shard is re-acquired successfully, and we know write to DB is guaranteed to
// complete, it's safe to update MaxReadLevel
s.updateMaxReadLevelLocked(transferMaxReadLevel)
}
}
}

return resp, err
}

Expand Down Expand Up @@ -877,7 +890,6 @@ func (s *contextImpl) ConflictResolveWorkflowExecution(
return nil, err
}
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

Conflict_Resolve_Loop:
for attempt := 0; attempt < conditionalRetryCount && ctx.Err() == nil; attempt++ {
Expand All @@ -887,48 +899,57 @@ Conflict_Resolve_Loop:
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
resp, err := s.executionManager.ConflictResolveWorkflowExecution(ctx, request)
if err != nil {
switch err.(type) {
case *persistence.ConditionFailedError,
*types.ServiceBusyError,
*types.LimitExceededError:
// No special handling required for these errors
case *persistence.ShardOwnershipLostError:
{
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Conflict_Resolve_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Conflict_Resolve_Loop
}
switch err.(type) {
case nil:
// Update MaxReadLevel if write to DB succeeds
s.updateMaxReadLevelLocked(transferMaxReadLevel)
return resp, nil
case *persistence.ConditionFailedError,
*types.ServiceBusyError:
// No special handling required for these errors
// We know write to DB fails if these errors are returned
// Updating MaxReadLevel doesn't matter in this case
s.updateMaxReadLevelLocked(transferMaxReadLevel)
return nil, err
case *persistence.ShardOwnershipLostError:
{
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Conflict_Resolve_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Conflict_Resolve_Loop
}
default:
{
// We have no idea if the write failed or will eventually make it to
// persistence. Increment RangeID to guarantee that subsequent reads
// will either see that write, or know for certain that it failed.
// This allows the callers to reliably check the outcome by performing
// a read.
err1 := s.renewRangeLocked(false)
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Conflict_Resolve_Loop
}
}
default:
{
// We have no idea if the write failed or will eventually make it to
// persistence. Increment RangeID to guarantee that subsequent reads
// will either see that write, or know for certain that it failed.
// This allows the callers to reliably check the outcome by performing
// a read.
err1 := s.renewRangeLocked(false)
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Conflict_Resolve_Loop
} else {
// Shard is re-acquired successfully, and we know write to DB is guaranteed to
// complete, it's safe to update MaxReadLevel
s.updateMaxReadLevelLocked(transferMaxReadLevel)
}
}
}
Expand Down Expand Up @@ -1426,7 +1447,6 @@ func (s *contextImpl) ReplicateFailoverMarkers(
); err != nil {
return err
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

var err error
Retry_Loop:
Expand All @@ -1443,6 +1463,8 @@ Retry_Loop:
)
switch err.(type) {
case nil:
// Update MaxReadLevel if write to DB succeeds
s.updateMaxReadLevelLocked(transferMaxReadLevel)
break Retry_Loop
case *persistence.ShardOwnershipLostError:
// do not retry on ShardOwnershipLostError
Expand Down

0 comments on commit 6ea8658

Please sign in to comment.