Skip to content

Commit

Permalink
Removed the mutex.
Browse files Browse the repository at this point in the history
The original purpose was to prevent concurrent addition/removal of keyspace groups
during critical periods such as service shutdown and online keyspace group, while the former requires
snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
added/initialized after that. Now we have ensured that's case by always cancelling the watch loop before closing keyspace groups in Close().

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Apr 3, 2023
1 parent 9a23fbf commit b870c5f
Showing 1 changed file with 22 additions and 48 deletions.
70 changes: 22 additions & 48 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
primaryElectionSuffix = "primary"
// defaultLoadKeyspaceGroupsTimeout is the default timeout for loading the initial
// keyspace group assignment
defaultLoadKeyspaceGroupsTimeout = 3 * time.Minute
defaultLoadKeyspaceGroupsTimeout = 30 * time.Second
defaultLoadKeyspaceGroupsBatchSize = int64(400)
loadFromEtcdMaxRetryTimes = 6
loadFromEtcdRetryInterval = 500 * time.Millisecond
Expand All @@ -56,12 +56,7 @@ const (
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
type KeyspaceGroupManager struct {
// mu protects the `ams` and `ksgs` data structures from concurrent addition/removal of
// keyspace groups during critical periods such as service shutdown and online keyspace group
// membership/distribution changes.
// It's important to note that accessing `ams[i]` does not require acquiring this lock.
mu sync.Mutex
// ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned
// ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned
// with an allocator manager managing its global/local tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
// different keyspace groups for tso service.
Expand Down Expand Up @@ -113,7 +108,7 @@ type KeyspaceGroupManager struct {
// cfg is the TSO config
cfg ServiceConfig
maxResetTSGap func() time.Duration
// loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment
// loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment.
loadKeyspaceGroupsTimeout time.Duration
loadKeyspaceGroupsBatchSize int64
}
Expand Down Expand Up @@ -171,7 +166,7 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(kgm.ctx, group)
kgm.updateKeyspaceGroup(group)
return nil
}

Expand All @@ -198,16 +193,20 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")

// Note: don't change the order. We need to cancel all service loops in the keyspace group manager
// before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups
// during critical periods such as service shutdown and online keyspace group, while the former requires
// snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.closeKeyspaceGroups()

log.Info("keyspace group manager closed")
}

func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() {
kgm.mu.Lock()
defer kgm.mu.Unlock()

log.Info("closing all keyspace groups")

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -276,7 +275,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err

if kgm.isAssignedToMe(group) {
keyspaceGroupsApplied++
kgm.updateKeyspaceGroup(ctx, group)
kgm.updateKeyspaceGroup(group)
}
}

Expand Down Expand Up @@ -331,7 +330,7 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups(
return revison, kgs, resp.More, nil
}

// watchKeyspaceGroupsMetaChangeLoop watches any change in keyspace group membership/distribution
// startKeyspaceGroupsMetaWatchLoop Repeatedly watches any change in keyspace group membership/distribution
// and apply the change dynamically.
func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) {
defer logutil.LogPanic()
Expand Down Expand Up @@ -374,8 +373,8 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
log.Error("watcher is canceled",
if wresp.Err() != nil {
log.Error("watch is canceled or closed",
zap.Int64("required-revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return revision, wresp.Err()
Expand All @@ -396,7 +395,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
zap.Uint32("keysapce-group-id", id),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
} else if kgm.isAssignedToMe(group) {
kgm.updateKeyspaceGroup(kgm.ctx, group)
kgm.updateKeyspaceGroup(group)
}
case clientv3.EventTypeDelete:
kgm.deleteKeyspaceGroup(id)
Expand All @@ -414,28 +413,17 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
var assignedToMe = false
for _, member := range group.Members {
if len(member.Address) > 0 && member.Address == kgm.tsoServiceID.ServiceAddr {
assignedToMe = true
break
return true
}
}
return assignedToMe
return false
}

// updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to
// this host/pod, it will join the primary election.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(ctx context.Context, group *endpoint.KeyspaceGroup) {
kgm.mu.Lock()
defer kgm.mu.Unlock()

select {
case <-ctx.Done():
return
default:
}

func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGroup) {
if group.ID >= uint32(len(kgm.ams)) {
log.Warn("keyspace group ID is out of range, ignore it",
zap.Uint32("keyspace-group-id", group.ID), zap.Int("max-keyspace-group-id", len(kgm.ams)-1))
Expand Down Expand Up @@ -485,27 +473,13 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(ctx context.Context, group

// deleteKeyspaceGroup deletes the given keyspace group.
func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(id uint32) {
kgm.mu.Lock()
defer kgm.mu.Unlock()

select {
case <-kgm.ctx.Done():
return
default:
}

am := kgm.ams[id].Load()
kgm.ksgs[id].Store(nil)
am := kgm.ams[id].Swap(nil)
if am == nil {
log.Info("keyspace group already deleted, so do nothing",
zap.Uint32("keyspace-group-id", id))
return
}

log.Info("leaving primary election", zap.Uint32("keyspace-group-id", id))

am.close()
kgm.ams[id].Store(nil)
kgm.ksgs[id].Store(nil)
log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", id))
}

// GetAllocatorManager returns the AllocatorManager of the given keyspace group
Expand Down

0 comments on commit b870c5f

Please sign in to comment.