-
Notifications
You must be signed in to change notification settings - Fork 807
Hardened thread pool #1913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hardened thread pool #1913
Conversation
return p, nil | ||
} | ||
|
||
func (p *pool) Start() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Do we expect
Start
to be called multiple times? I see we're usingsync.Once
to guard against it. -
We can also get rid of this edge-case by leaving the goroutine initialization in
NewPool
. Instead of having aShutdown
code we can pass in a channel/context intoNewPool
so the caller can shutdown the pool via context/close channel. I think this will simplify a lot of theStart
/Shutdown
code.
If we go with (2) I would recommend renaming NewPool
to StartWorkers
or something to make it clear that some goroutines are spinning up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore this comment in favor of using errgroup
snow/networking/worker/pool.go
Outdated
workersCount int | ||
requests chan Request | ||
|
||
shutdown bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used to make Send a no-op after Shutdown is called
snow/networking/worker/pool.go
Outdated
if p.noMoreSends.Get() { | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow what this is needed for. The request queue isn't buffered... so I don't really see what we are gaining out of having this flag at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to avoid requests being executed after shutdown is called. Indeed this is guaranteed that once quit channel is closed and selected, there won't be any worker listening anymore. Dropped the flag
snow/networking/worker/pool_test.go
Outdated
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestPoolHandlesRequests(_ *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inserted booleans to check request has been executed and testing them.
snow/networking/worker/pool_test.go
Outdated
|
||
// late requests, after Shutdown, are no-ops that won't panic | ||
lateRequest := func() { | ||
time.Sleep(time.Minute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this sleeping for a minute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to show that the request is not executed at all. If it was, one minute sleep is enough to make the test be terminated. In hindsight it's better to signal job done with a boolean and check that.
I chatted w/ Alberto offline but I think we can replace most of the code in the existing |
@joshua-kim, thanks for the input! I tried it out and I think there is a difference in the way worker.Pool and errgroup.Group terminate that makes them non equivalent for our use. |
Yeah we can use an atomic bool to indicate the worker pool is closed |
Co-authored-by: Stephen Buttolph <stephen@avalabs.org> Signed-off-by: Alberto Benegiamo <alberto.benegiamo@gmail.com>
snow/networking/worker/pool.go
Outdated
// Send the request to the worker pool. | ||
// | ||
// Send should never be called after [Shutdown] is called. | ||
// Send can be safely called after [Shutdown] and it'll be no-op. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Send can be safely called after [Shutdown] and it'll be no-op. | |
// Send is a no-op if [Shutdown] has been called. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
snow/networking/worker/pool.go
Outdated
// [shutdownOnce] ensures Shutdown idempotency | ||
shutdownOnce sync.Once | ||
shutdownWG sync.WaitGroup | ||
|
||
// [shutdownWG] makes sure all workers have stopped before Shutdown returns | ||
shutdownWG sync.WaitGroup | ||
|
||
// closing [quit] tells the workers to stop working |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: leaving this to your preference but I personally don't write comments on variables that are self-documenting. I think these are all being used in pretty idiomatic ways so I don't think these are necessary to have. Feel free to ignore this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dropped
snow/networking/worker/pool.go
Outdated
p.shutdownWG.Add(1) | ||
go p.runWorker() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: undo diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
snow/networking/worker/pool.go
Outdated
for { | ||
select { | ||
case <-p.quit: | ||
return // stop worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this comment isn't necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed, dropped
snow/networking/worker/pool.go
Outdated
case <-p.quit: | ||
return // stop worker | ||
case request := <-p.requests: | ||
if request != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone handles us a nil request I feel like it's okay to just fail obviously and die instead of silently dropping a nil request somewhere. I think we can remove this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the test. It wasn't there indeed, so it should be fine
snow/networking/worker/pool.go
Outdated
// We don't close requests channel to avoid panics | ||
// upon sending request over a closed channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this comment above the close
line. I personally would probably not comment this because i feel like this line is self-documenting but leaving this inclusion of this to your preference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also no longer close p.requests
here, is that okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we close p.request we risk a panic if another goroutine invoke Send. I tried to express this in the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have caught this on the first pass... sorry
snow/networking/handler/handler.go
Outdated
// we check the value of [h.closing] after the call to [Signal]. | ||
h.syncMessageQueue.Shutdown() | ||
h.asyncMessageQueue.Shutdown() | ||
h.asyncMessagePool.Shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can cause the chain router's shutdown to hang indefinitely on a chain that isn't shutting down correctly... (Stop
should never block. Can we add that to the comment on Stop
?)
Actually looking at this which a fresh set of eyes... I'm not sure the existing code is incorrect at all. It seems like the goroutines are shutdown correctly - and the handler exposes the ability to block on the goroutine shutdown by calling AwaitStopped
.
fwiw - I still think the pool changes are good... But I don't think we should be calling shutdown here.
defer h.ctx.Lock.Unlock() | ||
|
||
// h.asyncMessagePool must be started before async dispatch | ||
h.asyncMessagePool.Start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshua-kim, @StephenButtolph, I ended up re-introducing the start method.I believe I need it to avoid some goroutines leak in some UTs.
In production code we always call pool.Start
after its creaton in snow/networking/handler/handler.go
. However there are some UTs where we don't want to call handler.Start
since we want to inspect handler queues with Len() (see TestRouterCrossChainMessages).
TestRouterCrossChainMessages
woud leak the pool goroutines if those are started upon NewPool()
.
Launching goroutines in handler.Start
solves the issue: goroutines are never started, since handler.Start
is not called, hence no leak. You can see the fully fixed TestRouterCrossChainMessages
in my next PR
Closing this PR in favor of #1940 |
Why this should be merged
@darioush introduced goleak to ava-labs/coreth#273.
I applied it to some avalanchego packages and realized we can't currently shutdown the thread pool we use in avalanchego.
This PR fixes the issue and unlock fixing of leaking goroutines (tbd in subsequent PR).
How this works
Hardened thread pool to:
How this was tested
New UTs (100% package coverage) + CI