Skip to content

Commit

Permalink
plugin: keep plugin enable/disable state when tidb restart (pingcap#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Sep 19, 2022
1 parent 413a091 commit bc0f951
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ func Init(ctx context.Context, cfg Config) (err error) {
plugin: &tiPlugins.plugins[kind][i],
}
tiPlugins.plugins[kind][i].flushWatcher = watcher
if err = watcher.refreshPluginState(); err != nil {
if cfg.SkipWhenFail {
tiPlugins.plugins[kind][i].State = Disable
err = nil
go util.WithRecovery(watcher.watchLoop, nil)
continue
}
return
}
go util.WithRecovery(watcher.watchLoop, nil)
}
tiPlugins.plugins[kind][i].State = Ready
Expand All @@ -245,26 +254,33 @@ type flushWatcher struct {
plugin *Plugin
}

func (w *flushWatcher) refreshPluginState() error {
disabled, err := w.getPluginDisabledFlag()
if err != nil {
logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err))
return err
}
if disabled {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 1)
} else {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 0)
}
err = w.manifest.OnFlush(w.ctx, w.manifest)
if err != nil {
logutil.BgLogger().Error("plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err))
return err
}
return nil
}

func (w *flushWatcher) watchLoop() {
watchChan := w.etcd.Watch(w.ctx, w.path)
for {
select {
case <-w.ctx.Done():
return
case <-watchChan:
disabled, err := w.getPluginDisabledFlag()
if err != nil {
logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err))
}
if disabled {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 1)
} else {
atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 0)
}
err = w.manifest.OnFlush(w.ctx, w.manifest)
if err != nil {
logutil.BgLogger().Error("notify plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err))
}
_ = w.refreshPluginState()
}
}
}
Expand Down

0 comments on commit bc0f951

Please sign in to comment.