From 1674f14c68ebcc5ff5b3a1fcb0aac03c4c862f7e Mon Sep 17 00:00:00 2001 From: hperl <34397+hperl@users.noreply.github.com> Date: Wed, 17 Aug 2022 17:04:47 +0200 Subject: [PATCH] WIP --- internal/check/checkgroup/checkgroup_test.go | 30 ++++++------ .../check/checkgroup/concurrent_checkgroup.go | 46 +++++++++++-------- internal/check/engine.go | 5 -- 3 files changed, 42 insertions(+), 39 deletions(-) diff --git a/internal/check/checkgroup/checkgroup_test.go b/internal/check/checkgroup/checkgroup_test.go index 538b081bf..d90bb641b 100644 --- a/internal/check/checkgroup/checkgroup_test.go +++ b/internal/check/checkgroup/checkgroup_test.go @@ -45,10 +45,10 @@ func TestCheckgroup_cancels(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) g := checkgroup.New(ctx) g.Add(neverFinishesCheckFunc) - g.Add(neverFinishesCheckFunc) - g.Add(neverFinishesCheckFunc) - g.Add(neverFinishesCheckFunc) - g.Add(neverFinishesCheckFunc) + go g.Add(neverFinishesCheckFunc) + go g.Add(neverFinishesCheckFunc) + go g.Add(neverFinishesCheckFunc) + go g.Add(neverFinishesCheckFunc) cancel() assert.Equal(t, checkgroup.Result{Err: context.Canceled}, g.Result()) } @@ -60,7 +60,7 @@ func TestCheckgroup_reports_first_result(t *testing.T) { defer cancel() g := checkgroup.New(ctx) - g.Add(neverFinishesCheckFunc) + g.Add(notMemberAfterDelayFunc(1 * time.Microsecond)) g.Add(checkgroup.IsMemberFunc) assert.Equal(t, checkgroup.Result{Membership: checkgroup.IsMember}, g.Result()) } @@ -68,23 +68,27 @@ func TestCheckgroup_reports_first_result(t *testing.T) { func TestCheckgroup_cancels_all_other_subchecks(t *testing.T) { t.Parallel() - wasCancelled := make(chan bool) + wasCalled := false + wasCancelled := false var mockCheckFn = func(ctx context.Context, resultCh chan<- checkgroup.Result) { + wasCalled = true <-ctx.Done() - wasCancelled <- true + wasCancelled = true resultCh <- checkgroup.Result{Err: ctx.Err()} } ctx := context.Background() g := checkgroup.New(ctx) - g.Add(mockCheckFn) - g.Add(neverFinishesCheckFunc) + g.Add(notMemberAfterDelayFunc(1 * time.Microsecond)) g.Add(checkgroup.IsMemberFunc) + go g.Add(mockCheckFn) result := g.Result() assert.Equal(t, checkgroup.ResultIsMember, result) - assert.True(t, <-wasCancelled) + if wasCalled { + assert.True(t, wasCancelled) + } assert.True(t, g.Done()) } @@ -143,9 +147,6 @@ func TestCheckgroup_has_no_leaks(t *testing.T) { checkgroup.UnknownMemberFunc, isMemberAfterDelayFunc(5 * time.Millisecond), notMemberAfterDelayFunc(1 * time.Millisecond), - neverFinishesCheckFunc, - neverFinishesCheckFunc, - neverFinishesCheckFunc, }, expected: checkgroup.ResultIsMember, }, @@ -158,9 +159,6 @@ func TestCheckgroup_has_no_leaks(t *testing.T) { checkgroup.UnknownMemberFunc, isMemberAfterDelayFunc(5 * time.Millisecond), notMemberAfterDelayFunc(1 * time.Millisecond), - neverFinishesCheckFunc, - neverFinishesCheckFunc, - neverFinishesCheckFunc, }, expected: checkgroup.ResultIsMember, }, diff --git a/internal/check/checkgroup/concurrent_checkgroup.go b/internal/check/checkgroup/concurrent_checkgroup.go index 88bb7af08..1dabe8ccb 100644 --- a/internal/check/checkgroup/concurrent_checkgroup.go +++ b/internal/check/checkgroup/concurrent_checkgroup.go @@ -12,12 +12,6 @@ type concurrentCheckgroup struct { // ctx.Err()}. ctx context.Context - // pool is the worker pool (or nil if we want unbounded parallel checks), - // derived from the context. - pool Pool - - singleWorkerPool Pool - // subcheckCtx is the context used for the subchecks. subcheckCtx context.Context @@ -41,18 +35,21 @@ type concurrentCheckgroup struct { // result is only written once by the consumer, and can only be read after // the doneCh channel is closed. result Result + + // reading from reserveCheckCh reserves the right to create a concurrent + // check. + reserveCheckCh chan struct{} } func NewConcurrent(ctx context.Context) Checkgroup { g := &concurrentCheckgroup{ - ctx: ctx, - pool: PoolFromContext(ctx), - finalizeCh: make(chan struct{}), - doneCh: make(chan struct{}), - addCheckCh: make(chan CheckFunc), + ctx: ctx, + finalizeCh: make(chan struct{}), + doneCh: make(chan struct{}), + addCheckCh: make(chan CheckFunc), + reserveCheckCh: make(chan struct{}, 1), } g.subcheckCtx, g.cancel = context.WithCancel(g.ctx) - g.singleWorkerPool = NewPool(WithWorkers(1), WithContext(g.subcheckCtx)) g.startConsumer() return g } @@ -67,7 +64,7 @@ func (g *concurrentCheckgroup) startConsumer() { g.startConsumerOnce.Do(func() { go func() { var ( - subcheckCh = make(chan Result, 1) + resultCh = make(chan Result, 1) totalChecks = 0 finishedChecks = 0 finalizing = false @@ -82,17 +79,20 @@ func (g *concurrentCheckgroup) startConsumer() { // `context.Canceled`), but we still want to receive these results // so that there are no dangling goroutines. defer func() { - go receiveRemaining(subcheckCh, totalChecks-finishedChecks) + go receiveRemaining(resultCh, totalChecks-finishedChecks) }() + // Start with one reservation available. + g.reserveCheckCh <- struct{}{} + for { select { - case f := <-g.addCheckCh: + case check := <-g.addCheckCh: if finalizing { continue } totalChecks++ - go f(g.subcheckCtx, subcheckCh) + go check(g.subcheckCtx, resultCh) case <-g.finalizeCh: if finalizing { @@ -107,7 +107,7 @@ func (g *concurrentCheckgroup) startConsumer() { return } - case result := <-subcheckCh: + case result := <-resultCh: finishedChecks++ if result.Err != nil || result.Membership == IsMember { g.result = result @@ -119,6 +119,12 @@ func (g *concurrentCheckgroup) startConsumer() { return } + // ready for a new check + select { + case g.reserveCheckCh <- struct{}{}: + default: + } + case <-g.subcheckCtx.Done(): g.result = Result{Err: g.ctx.Err()} return @@ -140,7 +146,11 @@ func (g *concurrentCheckgroup) Done() bool { // Add adds the CheckFunc to the checkgroup and starts running it. func (g *concurrentCheckgroup) Add(check CheckFunc) { select { - case g.addCheckCh <- check: + case <-g.reserveCheckCh: + select { + case g.addCheckCh <- check: + case <-g.subcheckCtx.Done(): + } case <-g.subcheckCtx.Done(): } } diff --git a/internal/check/engine.go b/internal/check/engine.go index e3d7cfd6b..74fca484b 100644 --- a/internal/check/engine.go +++ b/internal/check/engine.go @@ -49,11 +49,6 @@ func NewEngine(d EngineDependencies, opts ...EngineOpt) *Engine { for _, opt := range opts { opt(e) } - if e.pool == nil { - e.pool = checkgroup.NewPool( - checkgroup.WithWorkers(e.d.Config(context.Background()).MaxParallelChecks()), - ) - } return e }