diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 0f8a4fa3dbab..08f418824833 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -45,7 +45,7 @@ const ( primaryElectionSuffix = "primary" // defaultLoadKeyspaceGroupsTimeout is the default timeout for loading the initial // keyspace group assignment - defaultLoadKeyspaceGroupsTimeout = 3 * time.Minute + defaultLoadKeyspaceGroupsTimeout = 30 * time.Second defaultLoadKeyspaceGroupsBatchSize = int64(400) loadFromEtcdMaxRetryTimes = 6 loadFromEtcdRetryInterval = 500 * time.Millisecond @@ -56,12 +56,7 @@ const ( // The replicas campaign for the leaders which provide the tso service for the corresponding // keyspace groups. type KeyspaceGroupManager struct { - // mu protects the `ams` and `ksgs` data structures from concurrent addition/removal of - // keyspace groups during critical periods such as service shutdown and online keyspace group - // membership/distribution changes. - // It's important to note that accessing `ams[i]` does not require acquiring this lock. - mu sync.Mutex - // ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned + // ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned // with an allocator manager managing its global/local tso allocators. // Use a fixed size array to maximize the efficiency of concurrent access to // different keyspace groups for tso service. @@ -113,7 +108,7 @@ type KeyspaceGroupManager struct { // cfg is the TSO config cfg ServiceConfig maxResetTSGap func() time.Duration - // loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment + // loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment. loadKeyspaceGroupsTimeout time.Duration loadKeyspaceGroupsBatchSize int64 } @@ -171,7 +166,7 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, Keyspaces: []uint32{mcsutils.DefaultKeyspaceID}, } - kgm.updateKeyspaceGroup(kgm.ctx, group) + kgm.updateKeyspaceGroup(group) return nil } @@ -198,16 +193,20 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { // Close this KeyspaceGroupManager func (kgm *KeyspaceGroupManager) Close() { log.Info("closing keyspace group manager") + + // Note: don't change the order. We need to cancel all service loops in the keyspace group manager + // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups + // during critical periods such as service shutdown and online keyspace group, while the former requires + // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is + // added/initialized after that. kgm.cancel() kgm.wg.Wait() kgm.closeKeyspaceGroups() + log.Info("keyspace group manager closed") } func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() { - kgm.mu.Lock() - defer kgm.mu.Unlock() - log.Info("closing all keyspace groups") wg := sync.WaitGroup{} @@ -276,7 +275,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err if kgm.isAssignedToMe(group) { keyspaceGroupsApplied++ - kgm.updateKeyspaceGroup(ctx, group) + kgm.updateKeyspaceGroup(group) } } @@ -331,7 +330,7 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( return revison, kgs, resp.More, nil } -// watchKeyspaceGroupsMetaChangeLoop watches any change in keyspace group membership/distribution +// startKeyspaceGroupsMetaWatchLoop Repeatedly watches any change in keyspace group membership/distribution // and apply the change dynamically. func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) { defer logutil.LogPanic() @@ -374,8 +373,8 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( revision = wresp.CompactRevision break } - if wresp.Canceled { - log.Error("watcher is canceled", + if wresp.Err() != nil { + log.Error("watch is canceled or closed", zap.Int64("required-revision", revision), errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) return revision, wresp.Err() @@ -396,7 +395,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( zap.Uint32("keysapce-group-id", id), zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) } else if kgm.isAssignedToMe(group) { - kgm.updateKeyspaceGroup(kgm.ctx, group) + kgm.updateKeyspaceGroup(group) } case clientv3.EventTypeDelete: kgm.deleteKeyspaceGroup(id) @@ -414,28 +413,17 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( } func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { - var assignedToMe = false for _, member := range group.Members { if len(member.Address) > 0 && member.Address == kgm.tsoServiceID.ServiceAddr { - assignedToMe = true - break + return true } } - return assignedToMe + return false } // updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to // this host/pod, it will join the primary election. -func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(ctx context.Context, group *endpoint.KeyspaceGroup) { - kgm.mu.Lock() - defer kgm.mu.Unlock() - - select { - case <-ctx.Done(): - return - default: - } - +func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGroup) { if group.ID >= uint32(len(kgm.ams)) { log.Warn("keyspace group ID is out of range, ignore it", zap.Uint32("keyspace-group-id", group.ID), zap.Int("max-keyspace-group-id", len(kgm.ams)-1)) @@ -485,27 +473,13 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(ctx context.Context, group // deleteKeyspaceGroup deletes the given keyspace group. func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(id uint32) { - kgm.mu.Lock() - defer kgm.mu.Unlock() - - select { - case <-kgm.ctx.Done(): - return - default: - } - - am := kgm.ams[id].Load() + kgm.ksgs[id].Store(nil) + am := kgm.ams[id].Swap(nil) if am == nil { - log.Info("keyspace group already deleted, so do nothing", - zap.Uint32("keyspace-group-id", id)) return } - - log.Info("leaving primary election", zap.Uint32("keyspace-group-id", id)) - am.close() - kgm.ams[id].Store(nil) - kgm.ksgs[id].Store(nil) + log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", id)) } // GetAllocatorManager returns the AllocatorManager of the given keyspace group