Skip to content

Conversation

abi87
Copy link
Contributor

@abi87 abi87 commented Aug 24, 2023

Why this should be merged

@darioush introduced goleak to ava-labs/coreth#273.
I applied it to some avalanchego packages and realized we can't currently shutdown the thread pool we use in avalanchego.
This PR fixes the issue and unlock fixing of leaking goroutines (tbd in subsequent PR).

How this works

Hardened thread pool to:

  1. be able to cleanly shut it down
  2. make it able to behave sanely (no panic) when Send/Shutdown are called out of order (e.g. before Start is called)

How this was tested

New UTs (100% package coverage) + CI

@abi87 abi87 self-assigned this Aug 24, 2023
@abi87 abi87 changed the title hardened thread pool Hardened thread pool Aug 24, 2023
@abi87 abi87 requested review from joshua-kim and ceyonur August 24, 2023 09:24
@abi87 abi87 added the cleanup Code quality improvement label Aug 24, 2023
return p, nil
}

func (p *pool) Start() {
Copy link
Contributor

@joshua-kim joshua-kim Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we expect Start to be called multiple times? I see we're using sync.Once to guard against it.

  2. We can also get rid of this edge-case by leaving the goroutine initialization in NewPool. Instead of having a Shutdown code we can pass in a channel/context into NewPool so the caller can shutdown the pool via context/close channel. I think this will simplify a lot of the Start/Shutdown code.

If we go with (2) I would recommend renaming NewPool to StartWorkers or something to make it clear that some goroutines are spinning up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this comment in favor of using errgroup

workersCount int
requests chan Request

shutdown bool
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used to make Send a no-op after Shutdown is called

@abi87 abi87 requested a review from joshua-kim August 24, 2023 17:44
Comment on lines 78 to 81
if p.noMoreSends.Get() {
return
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow what this is needed for. The request queue isn't buffered... so I don't really see what we are gaining out of having this flag at all.

Copy link
Contributor Author

@abi87 abi87 Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to avoid requests being executed after shutdown is called. Indeed this is guaranteed that once quit channel is closed and selected, there won't be any worker listening anymore. Dropped the flag

"github.com/stretchr/testify/require"
)

func TestPoolHandlesRequests(_ *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inserted booleans to check request has been executed and testing them.


// late requests, after Shutdown, are no-ops that won't panic
lateRequest := func() {
time.Sleep(time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this sleeping for a minute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to show that the request is not executed at all. If it was, one minute sleep is enough to make the test be terminated. In hindsight it's better to signal job done with a boolean and check that.

@joshua-kim
Copy link
Contributor

I chatted w/ Alberto offline but I think we can replace most of the code in the existing worker.Pool with an errgroup.Group, since SetLimit caps the amount of goroutines (ref). The semantics of our Send and errgroup.Go are the same, where both block until a worker goroutine is free to accept the task so it should work as a drop-in replacement to the worker loops we manage.

@abi87
Copy link
Contributor Author

abi87 commented Aug 25, 2023

I chatted w/ Alberto offline but I think we can replace most of the code in the existing worker.Pool with an errgroup.Group, since SetLimit caps the amount of goroutines (ref). The semantics of our Send and errgroup.Go are the same, where both block until a worker goroutine is free to accept the task so it should work as a drop-in replacement to the worker loops we manage.

@joshua-kim, thanks for the input! I tried it out and I think there is a difference in the way worker.Pool and errgroup.Group terminate that makes them non equivalent for our use.
We'd like worker.Pool to stop accepting requests after shutdown. IIUC, errgroup.Group has not such a feature built in.
What I think I could do is replace worker.Pool internal goroutines with an errgroup.Group and use the quit channel to ensure Shutdown.
But maybe it's just simpler to keep the goroutines?

@abi87 abi87 requested a review from StephenButtolph August 25, 2023 07:35
@joshua-kim
Copy link
Contributor

We'd like worker.Pool to stop accepting requests after shutdown. IIUC, errgroup.Group has not such a feature built in.

Yeah we can use an atomic bool to indicate the worker pool is closed

Co-authored-by: Stephen Buttolph <stephen@avalabs.org>
Signed-off-by: Alberto Benegiamo <alberto.benegiamo@gmail.com>
@StephenButtolph StephenButtolph added this to the v1.10.10 milestone Aug 25, 2023
// Send the request to the worker pool.
//
// Send should never be called after [Shutdown] is called.
// Send can be safely called after [Shutdown] and it'll be no-op.
Copy link
Contributor

@joshua-kim joshua-kim Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Send can be safely called after [Shutdown] and it'll be no-op.
// Send is a no-op if [Shutdown] has been called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 30 to 36
// [shutdownOnce] ensures Shutdown idempotency
shutdownOnce sync.Once
shutdownWG sync.WaitGroup

// [shutdownWG] makes sure all workers have stopped before Shutdown returns
shutdownWG sync.WaitGroup

// closing [quit] tells the workers to stop working
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leaving this to your preference but I personally don't write comments on variables that are self-documenting. I think these are all being used in pretty idiomatic ways so I don't think these are necessary to have. Feel free to ignore this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropped

p.shutdownWG.Add(1)
go p.runWorker()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: undo diff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for {
select {
case <-p.quit:
return // stop worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this comment isn't necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, dropped

case <-p.quit:
return // stop worker
case request := <-p.requests:
if request != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone handles us a nil request I feel like it's okay to just fail obviously and die instead of silently dropping a nil request somewhere. I think we can remove this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the test. It wasn't there indeed, so it should be fine

Comment on lines 79 to 80
// We don't close requests channel to avoid panics
// upon sending request over a closed channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this comment above the close line. I personally would probably not comment this because i feel like this line is self-documenting but leaving this inclusion of this to your preference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also no longer close p.requests here, is that okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we close p.request we risk a panic if another goroutine invoke Send. I tried to express this in the comment

@abi87 abi87 requested a review from joshua-kim August 25, 2023 16:22
Copy link
Contributor

@StephenButtolph StephenButtolph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have caught this on the first pass... sorry

// we check the value of [h.closing] after the call to [Signal].
h.syncMessageQueue.Shutdown()
h.asyncMessageQueue.Shutdown()
h.asyncMessagePool.Shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can cause the chain router's shutdown to hang indefinitely on a chain that isn't shutting down correctly... (Stop should never block. Can we add that to the comment on Stop?)

Actually looking at this which a fresh set of eyes... I'm not sure the existing code is incorrect at all. It seems like the goroutines are shutdown correctly - and the handler exposes the ability to block on the goroutine shutdown by calling AwaitStopped.

fwiw - I still think the pool changes are good... But I don't think we should be calling shutdown here.

defer h.ctx.Lock.Unlock()

// h.asyncMessagePool must be started before async dispatch
h.asyncMessagePool.Start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joshua-kim, @StephenButtolph, I ended up re-introducing the start method.I believe I need it to avoid some goroutines leak in some UTs.
In production code we always call pool.Start after its creaton in snow/networking/handler/handler.go. However there are some UTs where we don't want to call handler.Start since we want to inspect handler queues with Len() (see TestRouterCrossChainMessages).
TestRouterCrossChainMessages woud leak the pool goroutines if those are started upon NewPool().
Launching goroutines in handler.Start solves the issue: goroutines are never started, since handler.Start is not called, hence no leak. You can see the fully fixed TestRouterCrossChainMessages in my next PR

@abi87
Copy link
Contributor Author

abi87 commented Aug 29, 2023

Closing this PR in favor of #1940

@abi87 abi87 closed this Aug 29, 2023
@StephenButtolph StephenButtolph deleted the thread_pool_rework branch July 24, 2024 20:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cleanup Code quality improvement
Projects
No open projects
Archived in project
Development

Successfully merging this pull request may close these issues.

3 participants