Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: fix LRS stream leaks when errors are encountered #5505

Merged
merged 4 commits into from
Jul 15, 2022

Conversation

dfawley
Copy link
Member

@dfawley dfawley commented Jul 13, 2022

RELEASE NOTES:

  • xdsclient: fix goroutine leaks when load reporting is enabled

@@ -149,5 +152,17 @@ func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data)

req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats}
v2c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req))
return stream.Send(req)
if err := stream.Send(req); err != nil {
return getStreamError(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getStreamError/getStreamRecvError/?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is trying to achieve?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned from Send is only ever io.EOF in the event of the server terminating the stream or a connection loss. This function receives the actual error from the stream.

grpc-go/stream.go

Lines 108 to 110 in 6417495

// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Recv always supposed to contain a more informational error on the client side, even if the error was encountered on the client? I'm thinking about the case where the error for Send happens on the client, but we still end up using the error we get from Recv. Is it possible that the former contained more contextual information?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is whether we should do this only when Send returns an error which is no io.EOF.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Yes, that's a good idea: done.

Comment on lines 163 to 164
_, err := stream.Recv()
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine these two lines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

func getStreamError(stream lrsStream) error {
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a way for this for loop to terminate other than when Recv() returns non-nil error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.Recv will terminate with an error if the context for the RPC is canceled, which is also how sendLoads exits (in transport.go). So I believe this is fine.

stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
// streamCtx is created and canceled in case we terminate the stream
// early for any reason, to avoid gRPC-Go leaking the RPC's monitoring
// goroutine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RPC's monitoring goroutine? Which goroutine are you taking about here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one:

grpc-go/stream.go

Lines 354 to 361 in 6417495

go func() {
select {
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
case <-ctx.Done():
cs.finish(toRPCErr(ctx.Err()))
}
}()

if err != nil {
t.logger.Warningf("lrs: failed to create stream: %v", err)
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a defer cancel() and put everything that is currently inside of the for loop in an anonymous function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about this sort of thing. Done; let me know what you think.

@easwars easwars assigned dfawley and unassigned easwars Jul 14, 2022
@easwars easwars assigned easwars and unassigned dfawley Jul 15, 2022
@easwars easwars assigned dfawley and unassigned easwars Jul 15, 2022
@dfawley dfawley merged commit 3a77d29 into grpc:master Jul 15, 2022
@dfawley dfawley deleted the lrs_fix branch July 15, 2022 18:58
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jan 14, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants