diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4c725fb4ae88..0f8a4fa3dbab 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -49,7 +49,7 @@ const ( defaultLoadKeyspaceGroupsBatchSize = int64(400) loadFromEtcdMaxRetryTimes = 6 loadFromEtcdRetryInterval = 500 * time.Millisecond - watchKEtcdChangeRetryInterval = 500 * time.Millisecond + watchKEtcdChangeRetryInterval = 1 * time.Second ) // KeyspaceGroupManager manages the members of the keyspace groups assigned to this host. @@ -188,9 +188,9 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { return err } - // Watch keyspace group membership/distribution meta changes and apply dynamically. + // Watch/apply keyspace group membership/distribution meta changes dynamically. kgm.wg.Add(1) - go kgm.watchKeyspaceGroupsMetaChange(watchStartRevision) + go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision) return nil } @@ -331,92 +331,85 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( return revison, kgs, resp.More, nil } -// watchKeyspaceGroupsMetaChange watches any change in keyspace group membership/distribution +// watchKeyspaceGroupsMetaChangeLoop watches any change in keyspace group membership/distribution // and apply the change dynamically. -func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(startRevision int64) { +func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) { defer logutil.LogPanic() defer kgm.wg.Done() + // Repeatedly watch/apply keyspace group membership/distribution changes until the context is canceled. + for { + select { + case <-kgm.ctx.Done(): + return + default: + } + + nextRevision, err := kgm.watchKeyspaceGroupsMetaChange(revision) + if err != nil { + log.Error("watcher canceled unexpectedly. Will start a new watcher after a while", + zap.Int64("next-revision", nextRevision), + zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), + zap.Error(err)) + time.Sleep(watchKEtcdChangeRetryInterval) + } + } +} + +// watchKeyspaceGroupsMetaChange watches any change in keyspace group membership/distribution +// and apply the change dynamically. +func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (int64, error) { watcher := clientv3.NewWatcher(kgm.etcdClient) defer watcher.Close() ksgPrefix := strings.Join([]string{kgm.legacySvcRootPath, endpoint.KeyspaceGroupIDPrefix()}, "/") - // The revision is the revision of last modification on this key. - // If the revision is compacted, will meet required revision has been compacted error. - // In this case, use the compact revision to re-watch the key. for { - var ( - resp clientv3.WatchResponse - ok bool - ) - ksgWatchCH := watcher.Watch(kgm.ctx, ksgPrefix, clientv3.WithPrefix(), clientv3.WithRev(startRevision)) - ksgWatchLoop: - for { - select { - case <-kgm.ctx.Done(): - // Manager shutdown. - log.Info("exit keyspace groups meta change watcher") - return - case resp, ok = <-ksgWatchCH: - if !ok || resp.Err() != nil { - // If chan is closed or canceled, exit watch loop - // Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams - break ksgWatchLoop + watchChan := watcher.Watch(kgm.ctx, ksgPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) + for wresp := range watchChan { + if wresp.CompactRevision != 0 { + log.Warn("Required revision has been compacted, the watcher will watch again with the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) + revision = wresp.CompactRevision + break + } + if wresp.Canceled { + log.Error("watcher is canceled", + zap.Int64("required-revision", revision), + errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) + return revision, wresp.Err() + } + for _, event := range wresp.Events { + id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) + if err != nil { + log.Warn("failed to extract keyspace group ID from the key path", + zap.String("key-path", string(event.Kv.Key)), zap.Error(err)) + continue } - for _, event := range resp.Events { - id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) - if err != nil { - log.Warn("failed to extract keyspace group ID from the key path", - zap.String("key-path", string(event.Kv.Key)), zap.Error(err)) - continue - } - if event.Type == clientv3.EventTypePut { - group := &endpoint.KeyspaceGroup{} - if err := json.Unmarshal(event.Kv.Value, group); err != nil { - log.Warn("failed to unmarshal keyspace group", - zap.Uint32("keysapce-group-id", id), - zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) - continue - } - if kgm.isAssignedToMe(group) { - kgm.updateKeyspaceGroup(kgm.ctx, group) - } - } else if event.Type == clientv3.EventTypeDelete { - kgm.deleteKeyspaceGroup(id) - } else { - log.Fatal("unexpected event type", - zap.Uint32("keyspace-group-id", id), - zap.String("event-type", event.Type.String())) + switch event.Type { + case clientv3.EventTypePut: + group := &endpoint.KeyspaceGroup{} + if err := json.Unmarshal(event.Kv.Value, group); err != nil { + log.Warn("failed to unmarshal keyspace group", + zap.Uint32("keysapce-group-id", id), + zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) + } else if kgm.isAssignedToMe(group) { + kgm.updateKeyspaceGroup(kgm.ctx, group) } + case clientv3.EventTypeDelete: + kgm.deleteKeyspaceGroup(id) } - startRevision = resp.Header.Revision } + revision = wresp.Header.Revision } select { case <-kgm.ctx.Done(): - // Manager shutdown. - log.Info("exit keyspace groups meta change watcher") - return + return revision, nil default: } - - if resp.CompactRevision != 0 { - // meet compacted error - log.Warn("revision has been compacted, the watcher will watch again", - zap.Int64("start-revision", startRevision), - zap.Int64("compact-revision", resp.CompactRevision)) - startRevision = resp.CompactRevision - } else { - // other errors - log.Error("keyspace groups meta change watcher canceled. The watcher will watch again after a while", - zap.Int64("start-revision", startRevision), - zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), - errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err())) - time.Sleep(watchKEtcdChangeRetryInterval) - } } }