Skip to content

Commit

Permalink
Fix shard instability (uber#3271)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 21, 2020
1 parent 74b40ed commit 1f3404b
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions service/history/shard/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,17 @@ func (c *controller) ShardIDs() []int32 {
func (c *controller) removeEngineForShard(shardID int, shardItem *historyShardsItem) {
sw := c.metricsScope.StartTimer(metrics.RemoveEngineForShardLatency)
defer sw.Stop()
item, _ := c.removeHistoryShardItem(shardID)
// the shardItem comparison is just a defensive check to make sure we are deleting
// what we intend to delete. In the event that multiple callers call removeEngine / getEngine
// concurrently, it is possible to reorder a delete/delete/add sequence into a delete/add/delete
// sequence. This check is to protect against those scenarios.
if item != nil && (item == shardItem || shardItem == nil) {
item.stopEngine()
currentShardItem, _ := c.removeHistoryShardItem(shardID, shardItem)
if shardItem != nil {
// if shardItem is not nil, then currentShardItem either equals to shardItem or is nil
// in both cases, we need to stop the engine in shardItem
shardItem.stopEngine()
return
}

// if shardItem is nil, then stop the engine for the current shardItem, if exists
if currentShardItem != nil {
currentShardItem.stopEngine()
}
}

Expand Down Expand Up @@ -299,22 +303,28 @@ func (c *controller) getOrCreateHistoryShardItem(shardID int) (*historyShardsIte
return nil, CreateShardOwnershipLostError(c.GetHostInfo().Identity(), info.GetAddress())
}

func (c *controller) removeHistoryShardItem(shardID int) (*historyShardsItem, error) {
func (c *controller) removeHistoryShardItem(shardID int, shardItem *historyShardsItem) (*historyShardsItem, error) {
nShards := 0
c.Lock()
shardItem, ok := c.historyShards[shardID]
defer c.Unlock()

currentShardItem, ok := c.historyShards[shardID]
if !ok {
c.Unlock()
return nil, fmt.Errorf("No item found to remove for shard: %v", shardID)
}
if shardItem != nil && currentShardItem != shardItem {
// the shardItem comparison is a defensive check to make sure we are deleting
// what we intend to delete.
return nil, fmt.Errorf("Current shardItem doesn't match the one we intend to delete for shard: %v", shardID)
}

delete(c.historyShards, shardID)
nShards = len(c.historyShards)
c.Unlock()

c.metricsScope.IncCounter(metrics.ShardItemRemovedCounter)

shardItem.logger.Info("", tag.LifeCycleStopped, tag.ComponentShardItem, tag.Number(int64(nShards)))
return shardItem, nil
currentShardItem.logger.Info("", tag.LifeCycleStopped, tag.ComponentShardItem, tag.Number(int64(nShards)))
return currentShardItem, nil
}

// shardManagementPump is the main event loop for
Expand Down Expand Up @@ -428,6 +438,10 @@ func (i *historyShardsItem) getOrCreateEngine(
i.logger.Info("", tag.LifeCycleStarting, tag.ComponentShardEngine)
context, err := acquireShard(i, closeCallback)
if err != nil {
// invalidate the shardItem so that the same shardItem won't be
// used to create another shardContext
i.logger.Info("", tag.LifeCycleStopped, tag.ComponentShardEngine)
i.status = historyShardsItemStatusStopped
return nil, err
}
if context.PreviousShardOwnerWasDifferent() {
Expand Down

0 comments on commit 1f3404b

Please sign in to comment.