diff --git a/plugin/plugin.go b/plugin/plugin.go index b3aac9694193f..6aec16183cc6a 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,6 +20,7 @@ import ( gplugin "plugin" "strconv" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -246,12 +247,32 @@ type flushWatcher struct { } func (w *flushWatcher) watchLoop() { - watchChan := w.etcd.Watch(w.ctx, w.path) + const reWatchInterval = time.Second * 5 + logutil.BgLogger().Info("plugin flushWatcher loop started", zap.String("plugin", w.manifest.Name)) + for w.ctx.Err() == nil { + ch := w.etcd.Watch(w.ctx, w.path) + if exit := w.watchLoopWithChan(ch); exit { + break + } + + logutil.BgLogger().Info( + "plugin flushWatcher old chan closed, restart loop later", + zap.String("plugin", w.manifest.Name), + zap.Duration("after", reWatchInterval)) + time.Sleep(reWatchInterval) + } +} + +func (w *flushWatcher) watchLoopWithChan(ch clientv3.WatchChan) (exit bool) { for { select { case <-w.ctx.Done(): - return - case <-watchChan: + return true + case _, ok := <-ch: + if !ok { + return false + } + logutil.BgLogger().Info("plugin flushWatcher detected event to reload plugin config", zap.String("plugin", w.manifest.Name)) disabled, err := w.getPluginDisabledFlag() if err != nil { logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err)) diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index cfcc85ef310f3..088da35e59bfa 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -18,10 +18,13 @@ import ( "context" "io" "strconv" + "sync/atomic" "testing" + "time" "github.com/pingcap/tidb/sessionctx/variable" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" ) func TestLoadPluginSuccess(t *testing.T) { @@ -242,3 +245,43 @@ func TestPluginsClone(t *testing.T) { require.Equal(t, uint16(1), cps.versions["whitelist"]) require.Len(t, cps.dyingPlugins, 1) } + +func TestPluginWatcherLoop(t *testing.T) { + // exit when ctx done + ctx, cancel := context.WithCancel(context.Background()) + watcher := &flushWatcher{ + ctx: ctx, + manifest: &Manifest{ + Name: "test", + }, + } + ch := make(chan clientv3.WatchResponse) + var cancelled atomic.Bool + go func() { + time.Sleep(10 * time.Millisecond) + cancelled.Store(true) + cancel() + }() + exit := watcher.watchLoopWithChan(ch) + require.True(t, exit) + require.True(t, cancelled.Load()) + + // exit when ch closed + watcher = &flushWatcher{ + ctx: context.Background(), + manifest: &Manifest{ + Name: "test", + }, + } + + var closed atomic.Bool + ch = make(chan clientv3.WatchResponse) + go func() { + time.Sleep(10 * time.Millisecond) + closed.Store(true) + close(ch) + }() + exit = watcher.watchLoopWithChan(ch) + require.False(t, exit) + require.True(t, cancelled.Load()) +}