Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
hperl committed Aug 17, 2022
1 parent 2cd5de6 commit 1674f14
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 39 deletions.
30 changes: 14 additions & 16 deletions internal/check/checkgroup/checkgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func TestCheckgroup_cancels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
g := checkgroup.New(ctx)
g.Add(neverFinishesCheckFunc)
g.Add(neverFinishesCheckFunc)
g.Add(neverFinishesCheckFunc)
g.Add(neverFinishesCheckFunc)
g.Add(neverFinishesCheckFunc)
go g.Add(neverFinishesCheckFunc)
go g.Add(neverFinishesCheckFunc)
go g.Add(neverFinishesCheckFunc)
go g.Add(neverFinishesCheckFunc)
cancel()
assert.Equal(t, checkgroup.Result{Err: context.Canceled}, g.Result())
}
Expand All @@ -60,31 +60,35 @@ func TestCheckgroup_reports_first_result(t *testing.T) {
defer cancel()

g := checkgroup.New(ctx)
g.Add(neverFinishesCheckFunc)
g.Add(notMemberAfterDelayFunc(1 * time.Microsecond))
g.Add(checkgroup.IsMemberFunc)
assert.Equal(t, checkgroup.Result{Membership: checkgroup.IsMember}, g.Result())
}

func TestCheckgroup_cancels_all_other_subchecks(t *testing.T) {
t.Parallel()

wasCancelled := make(chan bool)
wasCalled := false
wasCancelled := false
var mockCheckFn = func(ctx context.Context, resultCh chan<- checkgroup.Result) {
wasCalled = true
<-ctx.Done()
wasCancelled <- true
wasCancelled = true
resultCh <- checkgroup.Result{Err: ctx.Err()}
}

ctx := context.Background()

g := checkgroup.New(ctx)
g.Add(mockCheckFn)
g.Add(neverFinishesCheckFunc)
g.Add(notMemberAfterDelayFunc(1 * time.Microsecond))
g.Add(checkgroup.IsMemberFunc)
go g.Add(mockCheckFn)
result := g.Result()

assert.Equal(t, checkgroup.ResultIsMember, result)
assert.True(t, <-wasCancelled)
if wasCalled {
assert.True(t, wasCancelled)
}
assert.True(t, g.Done())
}

Expand Down Expand Up @@ -143,9 +147,6 @@ func TestCheckgroup_has_no_leaks(t *testing.T) {
checkgroup.UnknownMemberFunc,
isMemberAfterDelayFunc(5 * time.Millisecond),
notMemberAfterDelayFunc(1 * time.Millisecond),
neverFinishesCheckFunc,
neverFinishesCheckFunc,
neverFinishesCheckFunc,
},
expected: checkgroup.ResultIsMember,
},
Expand All @@ -158,9 +159,6 @@ func TestCheckgroup_has_no_leaks(t *testing.T) {
checkgroup.UnknownMemberFunc,
isMemberAfterDelayFunc(5 * time.Millisecond),
notMemberAfterDelayFunc(1 * time.Millisecond),
neverFinishesCheckFunc,
neverFinishesCheckFunc,
neverFinishesCheckFunc,
},
expected: checkgroup.ResultIsMember,
},
Expand Down
46 changes: 28 additions & 18 deletions internal/check/checkgroup/concurrent_checkgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ type concurrentCheckgroup struct {
// ctx.Err()}.
ctx context.Context

// pool is the worker pool (or nil if we want unbounded parallel checks),
// derived from the context.
pool Pool

singleWorkerPool Pool

// subcheckCtx is the context used for the subchecks.
subcheckCtx context.Context

Expand All @@ -41,18 +35,21 @@ type concurrentCheckgroup struct {
// result is only written once by the consumer, and can only be read after
// the doneCh channel is closed.
result Result

// reading from reserveCheckCh reserves the right to create a concurrent
// check.
reserveCheckCh chan struct{}
}

func NewConcurrent(ctx context.Context) Checkgroup {
g := &concurrentCheckgroup{
ctx: ctx,
pool: PoolFromContext(ctx),
finalizeCh: make(chan struct{}),
doneCh: make(chan struct{}),
addCheckCh: make(chan CheckFunc),
ctx: ctx,
finalizeCh: make(chan struct{}),
doneCh: make(chan struct{}),
addCheckCh: make(chan CheckFunc),
reserveCheckCh: make(chan struct{}, 1),
}
g.subcheckCtx, g.cancel = context.WithCancel(g.ctx)
g.singleWorkerPool = NewPool(WithWorkers(1), WithContext(g.subcheckCtx))
g.startConsumer()
return g
}
Expand All @@ -67,7 +64,7 @@ func (g *concurrentCheckgroup) startConsumer() {
g.startConsumerOnce.Do(func() {
go func() {
var (
subcheckCh = make(chan Result, 1)
resultCh = make(chan Result, 1)
totalChecks = 0
finishedChecks = 0
finalizing = false
Expand All @@ -82,17 +79,20 @@ func (g *concurrentCheckgroup) startConsumer() {
// `context.Canceled`), but we still want to receive these results
// so that there are no dangling goroutines.
defer func() {
go receiveRemaining(subcheckCh, totalChecks-finishedChecks)
go receiveRemaining(resultCh, totalChecks-finishedChecks)
}()

// Start with one reservation available.
g.reserveCheckCh <- struct{}{}

for {
select {
case f := <-g.addCheckCh:
case check := <-g.addCheckCh:
if finalizing {
continue
}
totalChecks++
go f(g.subcheckCtx, subcheckCh)
go check(g.subcheckCtx, resultCh)

case <-g.finalizeCh:
if finalizing {
Expand All @@ -107,7 +107,7 @@ func (g *concurrentCheckgroup) startConsumer() {
return
}

case result := <-subcheckCh:
case result := <-resultCh:
finishedChecks++
if result.Err != nil || result.Membership == IsMember {
g.result = result
Expand All @@ -119,6 +119,12 @@ func (g *concurrentCheckgroup) startConsumer() {
return
}

// ready for a new check
select {
case g.reserveCheckCh <- struct{}{}:
default:
}

case <-g.subcheckCtx.Done():
g.result = Result{Err: g.ctx.Err()}
return
Expand All @@ -140,7 +146,11 @@ func (g *concurrentCheckgroup) Done() bool {
// Add adds the CheckFunc to the checkgroup and starts running it.
func (g *concurrentCheckgroup) Add(check CheckFunc) {
select {
case g.addCheckCh <- check:
case <-g.reserveCheckCh:
select {
case g.addCheckCh <- check:
case <-g.subcheckCtx.Done():
}
case <-g.subcheckCtx.Done():
}
}
Expand Down
5 changes: 0 additions & 5 deletions internal/check/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ func NewEngine(d EngineDependencies, opts ...EngineOpt) *Engine {
for _, opt := range opts {
opt(e)
}
if e.pool == nil {
e.pool = checkgroup.NewPool(
checkgroup.WithWorkers(e.d.Config(context.Background()).MaxParallelChecks()),
)
}

return e
}
Expand Down

0 comments on commit 1674f14

Please sign in to comment.