From 961af87472fc351de4ad586db5dfd7d0179dcd53 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 10 Dec 2021 23:05:57 +0800 Subject: [PATCH] workerpool: limit the rate to output deadlock warning (#3775) (#3795) --- pkg/workerpool/pool_impl.go | 20 +++++++++++++++++--- pkg/workerpool/pool_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/pkg/workerpool/pool_impl.go b/pkg/workerpool/pool_impl.go index e3f754450b1..5927c12069d 100644 --- a/pkg/workerpool/pool_impl.go +++ b/pkg/workerpool/pool_impl.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/pkg/notify" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) const ( @@ -298,6 +299,9 @@ type worker struct { isRunning int32 // notifies exits of run() stopNotifier notify.Notifier + + slowSynchronizeThreshold time.Duration + slowSynchronizeLimiter *rate.Limiter } func newWorker() *worker { @@ -305,6 +309,9 @@ func newWorker() *worker { taskCh: make(chan task, 128), handles: make(map[*defaultEventHandle]struct{}), handleCancelCh: make(chan struct{}), // this channel must be unbuffered, i.e. blocking + + slowSynchronizeThreshold: 10 * time.Second, + slowSynchronizeLimiter: rate.NewLimiter(rate.Every(time.Second*5), 1), } } @@ -398,13 +405,20 @@ func (w *worker) synchronize() { break } - if time.Since(startTime) > time.Second*10 { - // likely the workerpool has deadlocked, or there is a bug in the event handlers. - log.Warn("synchronize is taking too long, report a bug", zap.Duration("elapsed", time.Since(startTime))) + if time.Since(startTime) > w.slowSynchronizeThreshold && + w.slowSynchronizeLimiter.Allow() { + // likely the workerpool has deadlocked, or there is a bug + // in the event handlers. + logWarn("synchronize is taking too long, report a bug", + zap.Duration("elapsed", time.Since(startTime)), + zap.Stack("stacktrace")) } } } +// A delegate to log.Warn. It exists only for testing. +var logWarn = log.Warn + func (w *worker) addHandle(handle *defaultEventHandle) { w.handleRWLock.Lock() defer w.handleRWLock.Unlock() diff --git a/pkg/workerpool/pool_test.go b/pkg/workerpool/pool_test.go index 76cf1701ec5..3aad1b0b1f7 100644 --- a/pkg/workerpool/pool_test.go +++ b/pkg/workerpool/pool_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) func TestTaskError(t *testing.T) { @@ -507,6 +508,38 @@ func TestGracefulUnregisterTimeout(t *testing.T) { require.Truef(t, cerror.ErrWorkerPoolGracefulUnregisterTimedOut.Equal(err), "%s", err.Error()) } +func TestSynchronizeLog(t *testing.T) { + w := newWorker() + w.isRunning = 1 + // Always report "synchronize is taking too long". + w.slowSynchronizeThreshold = time.Duration(0) + w.slowSynchronizeLimiter = rate.NewLimiter(rate.Every(100*time.Minute), 1) + + counter := int32(0) + logWarn = func(msg string, fields ...zap.Field) { + atomic.AddInt32(&counter, 1) + } + defer func() { logWarn = log.Warn }() + + doneCh := make(chan struct{}) + go func() { + w.synchronize() + close(doneCh) + }() + + time.Sleep(300 * time.Millisecond) + w.stopNotifier.Notify() + time.Sleep(300 * time.Millisecond) + w.stopNotifier.Notify() + + // Close worker. + atomic.StoreInt32(&w.isRunning, 0) + w.stopNotifier.Close() + <-doneCh + + require.EqualValues(t, 1, atomic.LoadInt32(&counter)) +} + // Benchmark workerpool with ping-pong workflow. // go test -benchmem -run='^$' -bench '^(BenchmarkWorkerpool)$' github.com/pingcap/ticdc/pkg/workerpool func BenchmarkWorkerpool(b *testing.B) {