Skip to content

grpc: Move some stats handler callouts from transport to gRPC layer server side #6624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (ht *serverHandlerTransport) Close(err error) {

func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }

func (ht *serverHandlerTransport) LocalAddr() net.Addr { return nil }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This server handler transport has no access to local addr unfortunately. It was actually calling into stats handler with a nil local addr set in InHeader since it has no access to this. I read LocalAddr() (method I added to server transport) in my new PR, so this will keep the same functionality as in master and not log LocalAddr().


// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
// the empty string if unknown.
type strAddr string
Expand Down
37 changes: 11 additions & 26 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type http2Server struct {
// returns a nil transport and a non-nil error. For a special case where the
// underlying conn gets closed before the client preface could be read, it
// returns a nil transport and a nil error.
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
func NewServerTransport(ctx context.Context, conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
var authInfo credentials.AuthInfo
rawConn := conn
if config.Credentials != nil {
Expand Down Expand Up @@ -249,7 +249,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

done := make(chan struct{})
t := &http2Server{
ctx: setConnection(context.Background(), rawConn),
ctx: setConnection(ctx, rawConn),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
Expand Down Expand Up @@ -282,14 +282,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
for _, sh := range t.stats {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
return nil, err
Expand Down Expand Up @@ -374,10 +366,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(

buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
headerWireLength: int(frame.Header().Length),
}
var (
// if false, content-type was missing or invalid
Expand Down Expand Up @@ -597,18 +590,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
for _, sh := range t.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: mdata.Copy(),
}
sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
Expand Down Expand Up @@ -1317,6 +1298,10 @@ func (t *http2Server) RemoteAddr() net.Addr {
return t.remoteAddr
}

func (t *http2Server) LocalAddr() net.Addr {
return t.localAddr
}

func (t *http2Server) Drain(debugData string) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
20 changes: 18 additions & 2 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@ type Stream struct {
// On server-side it is unused.
status *status.Status

bytesReceived uint32 // indicates whether any bytes have been received on this stream
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
bytesReceived uint32 // indicates whether any bytes have been received on this stream
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
headerWireLength int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be grouped with other header stuff probably. And it needs a comment that it's only set by the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// contentSubtype is the content-subtype for requests.
// this must be lowercase or the behavior is undefined.
Expand Down Expand Up @@ -425,6 +426,12 @@ func (s *Stream) Context() context.Context {
return s.ctx
}

// SetContext sets the context of the stream. This will be deleted once the
// stats handler callouts all move to gRPC layer.
func (s *Stream) SetContext(ctx context.Context) {
s.ctx = ctx
}

// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
Expand All @@ -437,6 +444,12 @@ func (s *Stream) Status() *status.Status {
return s.status
}

// HeaderWireLength returns the size of theheaders of the stream as received
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addaspace pls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, done :).

// from the wire.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.... "Valid only on the server"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Added. Does it not know/need to know the client side eventually?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, but at that time we can fix the comment.

Honestly one of the next cleanups I'd love to do is to split the transport.Stream type into "client" and "server" versions instead of sharing code awkwardly where most of it needs to check a boolean to figure out whether it's a client or server stream and do radically different behavior depending on which.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm ok sounds good.

func (s *Stream) HeaderWireLength() int {
return s.headerWireLength
}

// SetHeader sets the header metadata. This can be called multiple times.
// Server side only.
// This should not be called in parallel to other data writes.
Expand Down Expand Up @@ -720,6 +733,9 @@ type ServerTransport interface {
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr

// LocalAddr returns the local network address.
LocalAddr() net.Addr

// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain(debugData string)

Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT
return
}
rawConn := conn
transport, err := NewServerTransport(conn, serverConfig)
transport, err := NewServerTransport(context.Background(), conn, serverConfig)
if err != nil {
return
}
Expand Down
28 changes: 25 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,9 +912,17 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
return
}
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
ctx := context.Background()
for _, sh := range s.opts.statsHandlers {
ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
RemoteAddr: rawConn.RemoteAddr(),
LocalAddr: rawConn.LocalAddr(),
})
sh.HandleConn(ctx, &stats.ConnBegin{})
Comment on lines +917 to +921
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behavior, but it's subtle.

If the handshaker replaces the conn with something connected to another remote/local address, then that's not reflected here.

More importantly it doesn't match what we do later for remote/local addr, which is use the server transports addresses. We probably want to move this after NewServerTransport and use st.LocalAddr() and st.RemoteAddr() instead.

(Also this change means we call the stats handler for every connection even if it doesn't finish handshaking correctly, which is probably also not what we want.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm ok I was wondering whether I should do this after handshaking, I noticed it changed to log unconditionally. I remember you mentioned either passing conn around or just using the wrapper of conn to get addr info, and we chose latter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L372 moving it after transport will slightly change behavior as well, as previously TagConn was still called if there's a connection error in say writing intial http/2 preface/settings frame. For me, this is dependent on semantically what does TagConn represent? Accepting a TCP connection? Getting an HTTP/2 connection to a point where it finishes the initial frames?

Copy link
Member

@dfawley dfawley Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, waiting for (HTTP/2) handshaking before calling TagConn seems fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this after newHTTP2Transport in new PR (still comes before first stats handler call after TagConn, so as we discussed offline, this is fine).

}

// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)
st := s.newHTTP2Transport(ctx, rawConn)
rawConn.SetDeadline(time.Time{})
if st == nil {
return
Expand All @@ -940,7 +948,7 @@ func (s *Server) drainServerTransports(addr string) {

// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
func (s *Server) newHTTP2Transport(ctx context.Context, c net.Conn) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
ConnectionTimeout: s.opts.connectionTimeout,
Expand All @@ -958,7 +966,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
}
st, err := transport.NewServerTransport(c, config)
st, err := transport.NewServerTransport(ctx, c, config)
if err != nil {
s.mu.Lock()
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
Expand Down Expand Up @@ -1707,6 +1715,20 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
}

md, _ := metadata.FromIncomingContext(ctx)
for _, sh := range s.opts.statsHandlers {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
stream.SetContext(ctx) // To have calls in stream callouts work. Will delete once all stats handler calls come from the gRPC layer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move this to after the loop so it only happens once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

sh.HandleRPC(ctx, &stats.InHeader{
FullMethod: stream.Method(),
RemoteAddr: t.RemoteAddr(),
LocalAddr: t.LocalAddr(),
Compression: stream.RecvCompress(),
WireLength: stream.HeaderWireLength(),
Header: md,
})
}

sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
Expand Down