-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xdsclient: fix LRS stream leaks when errors are encountered #5505
Conversation
@@ -149,5 +152,17 @@ func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) | |||
|
|||
req := &lrspb.LoadStatsRequest{ClusterStats: clusterStats} | |||
v2c.logger.Infof("lrs: sending LRS loads: %+v", pretty.ToJSON(req)) | |||
return stream.Send(req) | |||
if err := stream.Send(req); err != nil { | |||
return getStreamError(stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getStreamError/getStreamRecvError/?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is trying to achieve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error returned from Send
is only ever io.EOF
in the event of the server terminating the stream or a connection loss. This function receives the actual error from the stream.
Lines 108 to 110 in 6417495
// the stream. If the error was generated by the client, the status is | |
// returned directly; otherwise, io.EOF is returned and the status of | |
// the stream may be discovered using RecvMsg. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Recv
always supposed to contain a more informational error on the client side, even if the error was encountered on the client? I'm thinking about the case where the error for Send
happens on the client, but we still end up using the error we get from Recv
. Is it possible that the former contained more contextual information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question is whether we should do this only when Send
returns an error which is no io.EOF
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. Yes, that's a good idea: done.
_, err := stream.Recv() | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combine these two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
func getStreamError(stream lrsStream) error { | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a way for this for loop to terminate other than when Recv()
returns non-nil error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream.Recv
will terminate with an error if the context for the RPC is canceled, which is also how sendLoads
exits (in transport.go
). So I believe this is fine.
stream, err := t.vClient.NewLoadStatsStream(ctx, cc) | ||
// streamCtx is created and canceled in case we terminate the stream | ||
// early for any reason, to avoid gRPC-Go leaking the RPC's monitoring | ||
// goroutine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RPC's monitoring goroutine? Which goroutine are you taking about here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one:
Lines 354 to 361 in 6417495
go func() { | |
select { | |
case <-cc.ctx.Done(): | |
cs.finish(ErrClientConnClosing) | |
case <-ctx.Done(): | |
cs.finish(toRPCErr(ctx.Err())) | |
} | |
}() |
if err != nil { | ||
t.logger.Warningf("lrs: failed to create stream: %v", err) | ||
cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do a defer cancel()
and put everything that is currently inside of the for loop in an anonymous function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mixed feelings about this sort of thing. Done; let me know what you think.
RELEASE NOTES: