-
Notifications
You must be signed in to change notification settings - Fork 4.5k
grpc: Move some stats handler callouts from transport to gRPC layer server side #6624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -287,8 +287,9 @@ type Stream struct { | |
// On server-side it is unused. | ||
status *status.Status | ||
|
||
bytesReceived uint32 // indicates whether any bytes have been received on this stream | ||
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream | ||
bytesReceived uint32 // indicates whether any bytes have been received on this stream | ||
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream | ||
headerWireLength int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be grouped with other header stuff probably. And it needs a comment that it's only set by the server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
// contentSubtype is the content-subtype for requests. | ||
// this must be lowercase or the behavior is undefined. | ||
|
@@ -425,6 +426,12 @@ func (s *Stream) Context() context.Context { | |
return s.ctx | ||
} | ||
|
||
// SetContext sets the context of the stream. This will be deleted once the | ||
// stats handler callouts all move to gRPC layer. | ||
func (s *Stream) SetContext(ctx context.Context) { | ||
s.ctx = ctx | ||
} | ||
|
||
// Method returns the method for the stream. | ||
func (s *Stream) Method() string { | ||
return s.method | ||
|
@@ -437,6 +444,12 @@ func (s *Stream) Status() *status.Status { | |
return s.status | ||
} | ||
|
||
// HeaderWireLength returns the size of theheaders of the stream as received | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addaspace pls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whoops, done :). |
||
// from the wire. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. .... "Valid only on the server"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Added. Does it not know/need to know the client side eventually? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, but at that time we can fix the comment. Honestly one of the next cleanups I'd love to do is to split the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm ok sounds good. |
||
func (s *Stream) HeaderWireLength() int { | ||
return s.headerWireLength | ||
} | ||
|
||
// SetHeader sets the header metadata. This can be called multiple times. | ||
// Server side only. | ||
// This should not be called in parallel to other data writes. | ||
|
@@ -720,6 +733,9 @@ type ServerTransport interface { | |
// RemoteAddr returns the remote network address. | ||
RemoteAddr() net.Addr | ||
|
||
// LocalAddr returns the local network address. | ||
LocalAddr() net.Addr | ||
|
||
// Drain notifies the client this ServerTransport stops accepting new RPCs. | ||
Drain(debugData string) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -912,9 +912,17 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { | |
return | ||
} | ||
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) | ||
ctx := context.Background() | ||
for _, sh := range s.opts.statsHandlers { | ||
ctx = sh.TagConn(ctx, &stats.ConnTagInfo{ | ||
RemoteAddr: rawConn.RemoteAddr(), | ||
LocalAddr: rawConn.LocalAddr(), | ||
}) | ||
sh.HandleConn(ctx, &stats.ConnBegin{}) | ||
Comment on lines
+917
to
+921
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a change in behavior, but it's subtle. If the handshaker replaces the More importantly it doesn't match what we do later for remote/local addr, which is use the server transports addresses. We probably want to move this after (Also this change means we call the stats handler for every connection even if it doesn't finish handshaking correctly, which is probably also not what we want.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmmm ok I was wondering whether I should do this after handshaking, I noticed it changed to log unconditionally. I remember you mentioned either passing conn around or just using the wrapper of conn to get addr info, and we chose latter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L372 moving it after transport will slightly change behavior as well, as previously TagConn was still called if there's a connection error in say writing intial http/2 preface/settings frame. For me, this is dependent on semantically what does TagConn represent? Accepting a TCP connection? Getting an HTTP/2 connection to a point where it finishes the initial frames? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed offline, waiting for (HTTP/2) handshaking before calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did this after newHTTP2Transport in new PR (still comes before first stats handler call after TagConn, so as we discussed offline, this is fine). |
||
} | ||
|
||
// Finish handshaking (HTTP2) | ||
st := s.newHTTP2Transport(rawConn) | ||
st := s.newHTTP2Transport(ctx, rawConn) | ||
rawConn.SetDeadline(time.Time{}) | ||
if st == nil { | ||
return | ||
|
@@ -940,7 +948,7 @@ func (s *Server) drainServerTransports(addr string) { | |
|
||
// newHTTP2Transport sets up a http/2 transport (using the | ||
// gRPC http2 server transport in transport/http2_server.go). | ||
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { | ||
func (s *Server) newHTTP2Transport(ctx context.Context, c net.Conn) transport.ServerTransport { | ||
config := &transport.ServerConfig{ | ||
MaxStreams: s.opts.maxConcurrentStreams, | ||
ConnectionTimeout: s.opts.connectionTimeout, | ||
|
@@ -958,7 +966,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { | |
MaxHeaderListSize: s.opts.maxHeaderListSize, | ||
HeaderTableSize: s.opts.headerTableSize, | ||
} | ||
st, err := transport.NewServerTransport(c, config) | ||
st, err := transport.NewServerTransport(ctx, c, config) | ||
if err != nil { | ||
s.mu.Lock() | ||
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) | ||
|
@@ -1707,6 +1715,20 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str | |
} | ||
} | ||
|
||
md, _ := metadata.FromIncomingContext(ctx) | ||
for _, sh := range s.opts.statsHandlers { | ||
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()}) | ||
stream.SetContext(ctx) // To have calls in stream callouts work. Will delete once all stats handler calls come from the gRPC layer. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can move this to after the loop so it only happens once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Done. |
||
sh.HandleRPC(ctx, &stats.InHeader{ | ||
FullMethod: stream.Method(), | ||
RemoteAddr: t.RemoteAddr(), | ||
LocalAddr: t.LocalAddr(), | ||
Compression: stream.RecvCompress(), | ||
WireLength: stream.HeaderWireLength(), | ||
Header: md, | ||
}) | ||
} | ||
|
||
sm := stream.Method() | ||
if sm != "" && sm[0] == '/' { | ||
sm = sm[1:] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This server handler transport has no access to local addr unfortunately. It was actually calling into stats handler with a nil local addr set in InHeader since it has no access to this. I read LocalAddr() (method I added to server transport) in my new PR, so this will keep the same functionality as in master and not log LocalAddr().