Skip to content

Commit

Permalink
util: support enhance waitGroupper with exited checking (pingcap#40557)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Jan 16, 2023
1 parent ee6d291 commit a21098f
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 0 deletions.
118 changes: 118 additions & 0 deletions util/wait_group_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,126 @@ package util

import (
"sync"
"time"

"github.com/pingcap/tidb/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// WaitGroupEnhancedWrapper wrapper wg, it provides the basic ability of WaitGroupWrapper with checking unexited process
// if the `exited` signal is true by print them on log.
type WaitGroupEnhancedWrapper struct {
sync.WaitGroup
exited *atomic.Bool
source string
registerProcess sync.Map
}

// NewWaitGroupEnhancedWrapper returns WaitGroupEnhancedWrapper, the empty source indicates the unit test, then
// the `checkUnExitedProcess` won't be executed.
func NewWaitGroupEnhancedWrapper(source string, exited *atomic.Bool) *WaitGroupEnhancedWrapper {
wgew := &WaitGroupEnhancedWrapper{
exited: exited,
source: source,
registerProcess: sync.Map{},
}
if len(source) > 0 {
go wgew.checkUnExitedProcess()
}
return wgew
}

func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess() {
logutil.BgLogger().Info("waitGroupWrapper ready to check unexited process", zap.String("source", w.source))
ticker := time.NewTimer(10 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
continueCheck := w.check()
if !continueCheck {
return
}
}
}

func (w *WaitGroupEnhancedWrapper) check() bool {
if !w.exited.Load() {
return true
}
unexitedProcess := make([]string, 0)
w.registerProcess.Range(func(key, value any) bool {
unexitedProcess = append(unexitedProcess, key.(string))
return true
})
if len(unexitedProcess) > 0 {
logutil.BgLogger().Warn("background process unexited while received exited signal",
zap.Strings("process", unexitedProcess),
zap.String("source", w.source))
return true
}
logutil.BgLogger().Info("waitGroupWrapper finish checking unexited process", zap.String("source", w.source))
return false
}

// Run runs a function in a goroutine, adds 1 to WaitGroup
// and calls done when function returns. Please DO NOT use panic
// in the cb function.
// Note that the registered label shouldn't be duplicated.
func (w *WaitGroupEnhancedWrapper) Run(exec func(), label string) {
w.onStart(label)
w.Add(1)
go func() {
defer func() {
w.onExit(label)
w.Done()
}()
exec()
}()
}

// RunWithRecover wraps goroutine startup call with force recovery, add 1 to WaitGroup
// and call done when function return.
// exec is that execute logic function. recoverFn is that handler will be called after recover and before dump stack,
// passing `nil` means noop.
// Note that the registered label shouldn't be duplicated.
func (w *WaitGroupEnhancedWrapper) RunWithRecover(exec func(), recoverFn func(r interface{}), label string) {
w.onStart(label)
w.Add(1)
go func() {
defer func() {
r := recover()
if r != nil && recoverFn != nil {
logutil.BgLogger().Info("WaitGroupEnhancedWrapper exec panic recovered", zap.String("process", label))
recoverFn(r)
}
w.onExit(label)
w.Done()
}()
exec()
}()
}

func (w *WaitGroupEnhancedWrapper) onStart(label string) {
_, ok := w.registerProcess.Load(label)
if ok {
logutil.BgLogger().Panic("WaitGroupEnhancedWrapper received duplicated source process",
zap.String("source", w.source),
zap.String("process", label))
}
w.registerProcess.Store(label, struct{}{})
logutil.BgLogger().Info("background process started",
zap.String("source", w.source),
zap.String("process", label))
}

func (w *WaitGroupEnhancedWrapper) onExit(label string) {
w.registerProcess.Delete(label)
logutil.BgLogger().Info("background process exited",
zap.String("source", w.source),
zap.String("process", label))
}

// WaitGroupWrapper is a wrapper for sync.WaitGroup
type WaitGroupWrapper struct {
sync.WaitGroup
Expand Down
47 changes: 47 additions & 0 deletions util/wait_group_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package util

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand All @@ -32,6 +34,17 @@ func TestWaitGroupWrapperRun(t *testing.T) {
}
wg.Wait()
require.Equal(t, expect, val.Load())

val.Store(0)
exited := atomic.NewBool(false)
wg2 := NewWaitGroupEnhancedWrapper("", exited)
for i := int32(0); i < expect; i++ {
wg2.Run(func() {
val.Inc()
}, fmt.Sprintf("test_%v", i))
}
wg2.Wait()
require.Equal(t, expect, val.Load())
}

func TestWaitGroupWrapperRunWithRecover(t *testing.T) {
Expand All @@ -47,4 +60,38 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) {
}
wg.Wait()
require.Equal(t, expect, val.Load())

val.Store(0)
exited := atomic.NewBool(false)
wg2 := NewWaitGroupEnhancedWrapper("", exited)
for i := int32(0); i < expect; i++ {
wg2.RunWithRecover(func() {
panic("test1")
}, func(r interface{}) {
val.Inc()
}, fmt.Sprintf("test_%v", i))
}
wg2.Wait()
require.Equal(t, expect, val.Load())
}

func TestWaitGroupWrapperCheck(t *testing.T) {
exited := atomic.NewBool(false)
wg := NewWaitGroupEnhancedWrapper("", exited)
quit := make(chan struct{})
wg.Run(func() {
<-quit
}, "test")

// directly skip check if exited is false
require.True(t, wg.check())

// need continue check as existed unexited process
exited.Store(true)
require.True(t, wg.check())

// no need to continue check as all process exited
quit <- struct{}{}
time.Sleep(1 * time.Second)
require.False(t, wg.check())
}

0 comments on commit a21098f

Please sign in to comment.