Skip to content

Commit

Permalink
fix exit bug
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
hawkingrei committed Jan 3, 2023
1 parent 5814339 commit f62832a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
5 changes: 5 additions & 0 deletions resourcemanager/pooltask/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() {
close(t.resultCh)
}

// TaskID is to get the task id.
func (t *TaskController[T, U, C, CT, TF]) TaskID() uint64 {
return t.taskID
}

// Task is a task that can be executed.
type Task[T any] struct {
Task T
Expand Down
1 change: 1 addition & 0 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func (p *Pool[T, U, C, CT, TF]) release() {
// There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent
// those callers blocking infinitely.
p.cond.Broadcast()
close(p.taskCh)
}

func isClose(exitCh chan struct{}) bool {
Expand Down
31 changes: 19 additions & 12 deletions util/gpool/spmc/spmcpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,22 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) {
p.SetConsumerFunc(func(a struct{}, b int, c any) struct{} {
return struct{}{}
})
var twg util.WaitGroupWrapper
var twg sync.WaitGroup
for i := 0; i < 10; i++ {
twg.Run(func() {
twg.Add(1)
go func() {
defer twg.Done()
sema := make(chan struct{}, 10)
var wg util.WaitGroupWrapper
var wg sync.WaitGroup
exitCh := make(chan struct{})
wg.Run(func() {
wg.Add(1)
go func() {
wg.Done()
for j := 0; j < RunTimes; j++ {
sema <- struct{}{}
}
close(exitCh)
})
}()
producerFunc := func() (struct{}, error) {
for {
select {
Expand All @@ -218,14 +222,15 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) {
}
}
resultCh, ctl := p.AddProducer(producerFunc, RunTimes, pooltask.NilContext{}, WithConcurrency(concurrency))

wg.Run(func() {
wg.Add(1)
go func() {
defer wg.Done()
for range resultCh {
}
})
}()
ctl.Wait()
wg.Wait()
})
}()
}
twg.Wait()
}
Expand All @@ -240,14 +245,16 @@ func TestBenchPool(t *testing.T) {

for i := 0; i < 1000; i++ {
sema := make(chan struct{}, 10)
var wg util.WaitGroupWrapper
var wg sync.WaitGroup
exitCh := make(chan struct{})
wg.Run(func() {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < RunTimes; j++ {
sema <- struct{}{}
}
close(exitCh)
})
}()
producerFunc := func() (struct{}, error) {
for {
select {
Expand Down

0 comments on commit f62832a

Please sign in to comment.