Skip to content

Commit

Permalink
Refine keyspace groups meta change watcher loop.
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Apr 3, 2023
1 parent 7922205 commit 9cffa70
Showing 1 changed file with 61 additions and 68 deletions.
129 changes: 61 additions & 68 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
defaultLoadKeyspaceGroupsBatchSize = int64(400)
loadFromEtcdMaxRetryTimes = 6
loadFromEtcdRetryInterval = 500 * time.Millisecond
watchKEtcdChangeRetryInterval = 500 * time.Millisecond
watchKEtcdChangeRetryInterval = 1 * time.Second
)

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
Expand Down Expand Up @@ -188,9 +188,9 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
return err
}

// Watch keyspace group membership/distribution meta changes and apply dynamically.
// Watch/apply keyspace group membership/distribution meta changes dynamically.
kgm.wg.Add(1)
go kgm.watchKeyspaceGroupsMetaChange(watchStartRevision)
go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision)

return nil
}
Expand Down Expand Up @@ -331,92 +331,85 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups(
return revison, kgs, resp.More, nil
}

// watchKeyspaceGroupsMetaChange watches any change in keyspace group membership/distribution
// watchKeyspaceGroupsMetaChangeLoop watches any change in keyspace group membership/distribution
// and apply the change dynamically.
func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(startRevision int64) {
func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) {
defer logutil.LogPanic()
defer kgm.wg.Done()

// Repeatedly watch/apply keyspace group membership/distribution changes until the context is canceled.
for {
select {
case <-kgm.ctx.Done():
return
default:
}

nextRevision, err := kgm.watchKeyspaceGroupsMetaChange(revision)
if err != nil {
log.Error("watcher canceled unexpectedly. Will start a new watcher after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)),
zap.Error(err))
time.Sleep(watchKEtcdChangeRetryInterval)
}
}
}

// watchKeyspaceGroupsMetaChange watches any change in keyspace group membership/distribution
// and apply the change dynamically.
func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (int64, error) {
watcher := clientv3.NewWatcher(kgm.etcdClient)
defer watcher.Close()

ksgPrefix := strings.Join([]string{kgm.legacySvcRootPath, endpoint.KeyspaceGroupIDPrefix()}, "/")

// The revision is the revision of last modification on this key.
// If the revision is compacted, will meet required revision has been compacted error.
// In this case, use the compact revision to re-watch the key.
for {
var (
resp clientv3.WatchResponse
ok bool
)
ksgWatchCH := watcher.Watch(kgm.ctx, ksgPrefix, clientv3.WithPrefix(), clientv3.WithRev(startRevision))
ksgWatchLoop:
for {
select {
case <-kgm.ctx.Done():
// Manager shutdown.
log.Info("exit keyspace groups meta change watcher")
return
case resp, ok = <-ksgWatchCH:
if !ok || resp.Err() != nil {
// If chan is closed or canceled, exit watch loop
// Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams
break ksgWatchLoop
watchChan := watcher.Watch(kgm.ctx, ksgPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
for wresp := range watchChan {
if wresp.CompactRevision != 0 {
log.Warn("Required revision has been compacted, the watcher will watch again with the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
log.Error("watcher is canceled",
zap.Int64("required-revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return revision, wresp.Err()
}
for _, event := range wresp.Events {
id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key))
if err != nil {
log.Warn("failed to extract keyspace group ID from the key path",
zap.String("key-path", string(event.Kv.Key)), zap.Error(err))
continue
}
for _, event := range resp.Events {
id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key))
if err != nil {
log.Warn("failed to extract keyspace group ID from the key path",
zap.String("key-path", string(event.Kv.Key)), zap.Error(err))
continue
}

if event.Type == clientv3.EventTypePut {
group := &endpoint.KeyspaceGroup{}
if err := json.Unmarshal(event.Kv.Value, group); err != nil {
log.Warn("failed to unmarshal keyspace group",
zap.Uint32("keysapce-group-id", id),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
continue
}
if kgm.isAssignedToMe(group) {
kgm.updateKeyspaceGroup(kgm.ctx, group)
}
} else if event.Type == clientv3.EventTypeDelete {
kgm.deleteKeyspaceGroup(id)
} else {
log.Fatal("unexpected event type",
zap.Uint32("keyspace-group-id", id),
zap.String("event-type", event.Type.String()))
switch event.Type {
case clientv3.EventTypePut:
group := &endpoint.KeyspaceGroup{}
if err := json.Unmarshal(event.Kv.Value, group); err != nil {
log.Warn("failed to unmarshal keyspace group",
zap.Uint32("keysapce-group-id", id),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
} else if kgm.isAssignedToMe(group) {
kgm.updateKeyspaceGroup(kgm.ctx, group)
}
case clientv3.EventTypeDelete:
kgm.deleteKeyspaceGroup(id)
}
startRevision = resp.Header.Revision
}
revision = wresp.Header.Revision
}

select {
case <-kgm.ctx.Done():
// Manager shutdown.
log.Info("exit keyspace groups meta change watcher")
return
return revision, nil
default:
}

if resp.CompactRevision != 0 {
// meet compacted error
log.Warn("revision has been compacted, the watcher will watch again",
zap.Int64("start-revision", startRevision),
zap.Int64("compact-revision", resp.CompactRevision))
startRevision = resp.CompactRevision
} else {
// other errors
log.Error("keyspace groups meta change watcher canceled. The watcher will watch again after a while",
zap.Int64("start-revision", startRevision),
zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)),
errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err()))
time.Sleep(watchKEtcdChangeRetryInterval)
}
}
}

Expand Down

0 comments on commit 9cffa70

Please sign in to comment.