Skip to content

Commit 026fed9

Browse files
committed
Move conn begin conn end and in header to grpc layer
1 parent 59f57b1 commit 026fed9

File tree

9 files changed

+125
-151
lines changed

9 files changed

+125
-151
lines changed

internal/transport/handler_server.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ func (ht *serverHandlerTransport) Close(err error) {
167167

168168
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
169169

170+
func (ht *serverHandlerTransport) LocalAddr() net.Addr { return nil } // Server Handler transport has no access to local addr (was simply not calling sh with local addr).
171+
172+
func (ht *serverHandlerTransport) Peer() *peer.Peer {
173+
return &peer.Peer{
174+
Addr: ht.RemoteAddr(),
175+
}
176+
}
177+
170178
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
171179
// the empty string if unknown.
172180
type strAddr string
@@ -347,7 +355,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
347355
return err
348356
}
349357

350-
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
358+
func (ht *serverHandlerTransport) HandleStreams(_ context.Context, startStream func(*Stream)) {
351359
// With this transport type there will be exactly 1 stream: this HTTP request.
352360

353361
ctx := ht.req.Context()
@@ -371,16 +379,16 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
371379
}()
372380

373381
req := ht.req
374-
375382
s := &Stream{
376-
id: 0, // irrelevant
377-
requestRead: func(int) {},
378-
cancel: cancel,
379-
buf: newRecvBuffer(),
380-
st: ht,
381-
method: req.URL.Path,
382-
recvCompress: req.Header.Get("grpc-encoding"),
383-
contentSubtype: ht.contentSubtype,
383+
id: 0, // irrelevant
384+
requestRead: func(int) {},
385+
cancel: cancel,
386+
buf: newRecvBuffer(),
387+
st: ht,
388+
method: req.URL.Path,
389+
recvCompress: req.Header.Get("grpc-encoding"),
390+
contentSubtype: ht.contentSubtype,
391+
headerWireLength: 0, // doesn't know header wire length, will call into stats handler as 0.
384392
}
385393
pr := &peer.Peer{
386394
Addr: ht.RemoteAddr(),
@@ -390,15 +398,6 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
390398
}
391399
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
392400
s.ctx = peer.NewContext(ctx, pr)
393-
for _, sh := range ht.stats {
394-
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
395-
inHeader := &stats.InHeader{
396-
FullMethod: s.method,
397-
RemoteAddr: ht.RemoteAddr(),
398-
Compression: s.recvCompress,
399-
}
400-
sh.HandleRPC(s.ctx, inHeader)
401-
}
402401
s.trReader = &transportReader{
403402
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
404403
windowHandler: func(int) {},

internal/transport/handler_server_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func (s) TestHandlerTransport_HandleStreams(t *testing.T) {
314314
st.ht.WriteStatus(s, status.New(codes.OK, ""))
315315
}
316316
st.ht.HandleStreams(
317-
func(s *Stream) { go handleStream(s) },
317+
context.Background(), func(s *Stream) { go handleStream(s) },
318318
)
319319
wantHeader := http.Header{
320320
"Date": nil,
@@ -347,7 +347,7 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
347347
st.ht.WriteStatus(s, status.New(statusCode, msg))
348348
}
349349
st.ht.HandleStreams(
350-
func(s *Stream) { go handleStream(s) },
350+
context.Background(), func(s *Stream) { go handleStream(s) },
351351
)
352352
wantHeader := http.Header{
353353
"Date": nil,
@@ -396,7 +396,7 @@ func (s) TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
396396
ht.WriteStatus(s, status.New(codes.DeadlineExceeded, "too slow"))
397397
}
398398
ht.HandleStreams(
399-
func(s *Stream) { go runStream(s) },
399+
context.Background(), func(s *Stream) { go runStream(s) },
400400
)
401401
wantHeader := http.Header{
402402
"Date": nil,
@@ -448,7 +448,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
448448
func testHandlerTransportHandleStreams(t *testing.T, handleStream func(st *handleStreamTest, s *Stream)) {
449449
st := newHandleStreamTest(t)
450450
st.ht.HandleStreams(
451-
func(s *Stream) { go handleStream(st, s) },
451+
context.Background(), func(s *Stream) { go handleStream(st, s) },
452452
)
453453
}
454454

@@ -481,7 +481,7 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
481481
hst.ht.WriteStatus(s, st)
482482
}
483483
hst.ht.HandleStreams(
484-
func(s *Stream) { go handleStream(s) },
484+
context.Background(), func(s *Stream) { go handleStream(s) },
485485
)
486486
wantHeader := http.Header{
487487
"Date": nil,

internal/transport/http2_server.go

Lines changed: 16 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ var serverConnectionCounter uint64
6969
// http2Server implements the ServerTransport interface with HTTP2.
7070
type http2Server struct {
7171
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
72-
ctx context.Context
7372
done chan struct{}
7473
conn net.Conn
7574
loopy *loopyWriter
@@ -249,7 +248,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
249248

250249
done := make(chan struct{})
251250
t := &http2Server{
252-
ctx: setConnection(context.Background(), rawConn),
253251
done: done,
254252
conn: conn,
255253
remoteAddr: conn.RemoteAddr(),
@@ -272,8 +270,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
272270
bufferPool: newBufferPool(),
273271
}
274272
t.logger = prefixLoggerForServerTransport(t)
275-
// Add peer information to the http2server context.
276-
t.ctx = peer.NewContext(t.ctx, t.getPeer())
277273

278274
t.controlBuf = newControlBuffer(t.done)
279275
if dynamicWindow {
@@ -282,14 +278,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
282278
updateFlowControl: t.updateFlowControl,
283279
}
284280
}
285-
for _, sh := range t.stats {
286-
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
287-
RemoteAddr: t.remoteAddr,
288-
LocalAddr: t.localAddr,
289-
})
290-
connBegin := &stats.ConnBegin{}
291-
sh.HandleConn(t.ctx, connBegin)
292-
}
293281
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
294282
if err != nil {
295283
return nil, err
@@ -347,7 +335,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
347335

348336
// operateHeaders takes action on the decoded headers. Returns an error if fatal
349337
// error encountered and transport needs to close, otherwise returns nil.
350-
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
338+
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
351339
// Acquire max stream ID lock for entire duration
352340
t.maxStreamMu.Lock()
353341
defer t.maxStreamMu.Unlock()
@@ -374,10 +362,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
374362

375363
buf := newRecvBuffer()
376364
s := &Stream{
377-
id: streamID,
378-
st: t,
379-
buf: buf,
380-
fc: &inFlow{limit: uint32(t.initialWindowSize)},
365+
id: streamID,
366+
st: t,
367+
buf: buf,
368+
fc: &inFlow{limit: uint32(t.initialWindowSize)},
369+
headerWireLength: int(frame.Header().Length),
381370
}
382371
var (
383372
// if false, content-type was missing or invalid
@@ -516,9 +505,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
516505
s.state = streamReadDone
517506
}
518507
if timeoutSet {
519-
s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
508+
s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
520509
} else {
521-
s.ctx, s.cancel = context.WithCancel(t.ctx)
510+
s.ctx, s.cancel = context.WithCancel(ctx)
522511
}
523512

524513
// Attach the received metadata to the context.
@@ -597,18 +586,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
597586
s.requestRead = func(n int) {
598587
t.adjustWindow(s, uint32(n))
599588
}
600-
for _, sh := range t.stats {
601-
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
602-
inHeader := &stats.InHeader{
603-
FullMethod: s.method,
604-
RemoteAddr: t.remoteAddr,
605-
LocalAddr: t.localAddr,
606-
Compression: s.recvCompress,
607-
WireLength: int(frame.Header().Length),
608-
Header: mdata.Copy(),
609-
}
610-
sh.HandleRPC(s.ctx, inHeader)
611-
}
612589
s.ctxDone = s.ctx.Done()
613590
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
614591
s.trReader = &transportReader{
@@ -634,7 +611,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
634611
// HandleStreams receives incoming streams using the given handler. This is
635612
// typically run in a separate goroutine.
636613
// traceCtx attaches trace to ctx and returns the new context.
637-
func (t *http2Server) HandleStreams(handle func(*Stream)) {
614+
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
638615
defer close(t.readerDone)
639616
for {
640617
t.controlBuf.throttle()
@@ -669,7 +646,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
669646
}
670647
switch frame := frame.(type) {
671648
case *http2.MetaHeadersFrame:
672-
if err := t.operateHeaders(frame, handle); err != nil {
649+
if err := t.operateHeaders(ctx, frame, handle); err != nil {
673650
t.Close(err)
674651
break
675652
}
@@ -1247,10 +1224,6 @@ func (t *http2Server) Close(err error) {
12471224
for _, s := range streams {
12481225
s.cancel()
12491226
}
1250-
for _, sh := range t.stats {
1251-
connEnd := &stats.ConnEnd{}
1252-
sh.HandleConn(t.ctx, connEnd)
1253-
}
12541227
}
12551228

12561229
// deleteStream deletes the stream s from transport's active streams.
@@ -1316,6 +1289,10 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo
13161289
})
13171290
}
13181291

1292+
func (t *http2Server) LocalAddr() net.Addr {
1293+
return t.localAddr
1294+
}
1295+
13191296
func (t *http2Server) RemoteAddr() net.Addr {
13201297
return t.remoteAddr
13211298
}
@@ -1438,7 +1415,8 @@ func (t *http2Server) getOutFlowWindow() int64 {
14381415
}
14391416
}
14401417

1441-
func (t *http2Server) getPeer() *peer.Peer {
1418+
// Peer returns the peer of the transport.
1419+
func (t *http2Server) Peer() *peer.Peer {
14421420
return &peer.Peer{
14431421
Addr: t.remoteAddr,
14441422
AuthInfo: t.authInfo, // Can be nil
@@ -1454,18 +1432,3 @@ func getJitter(v time.Duration) time.Duration {
14541432
j := grpcrand.Int63n(2*r) - r
14551433
return time.Duration(j)
14561434
}
1457-
1458-
type connectionKey struct{}
1459-
1460-
// GetConnection gets the connection from the context.
1461-
func GetConnection(ctx context.Context) net.Conn {
1462-
conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1463-
return conn
1464-
}
1465-
1466-
// SetConnection adds the connection to the context to be able to get
1467-
// information about the destination ip and port for an incoming RPC. This also
1468-
// allows any unary or streaming interceptors to see the connection.
1469-
func setConnection(ctx context.Context, conn net.Conn) context.Context {
1470-
return context.WithValue(ctx, connectionKey{}, conn)
1471-
}

internal/transport/transport.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"google.golang.org/grpc/internal/channelz"
3838
"google.golang.org/grpc/keepalive"
3939
"google.golang.org/grpc/metadata"
40+
"google.golang.org/grpc/peer"
4041
"google.golang.org/grpc/resolver"
4142
"google.golang.org/grpc/stats"
4243
"google.golang.org/grpc/status"
@@ -265,7 +266,8 @@ type Stream struct {
265266
// headerValid indicates whether a valid header was received. Only
266267
// meaningful after headerChan is closed (always call waitOnHeader() before
267268
// reading its value). Not valid on server side.
268-
headerValid bool
269+
headerValid bool
270+
headerWireLength int // Only set on server side.
269271

270272
// hdrMu protects header and trailer metadata on the server-side.
271273
hdrMu sync.Mutex
@@ -425,6 +427,12 @@ func (s *Stream) Context() context.Context {
425427
return s.ctx
426428
}
427429

430+
// SetContext sets the context of the stream. This will be deleted once the
431+
// stats handler callouts all move to gRPC layer.
432+
func (s *Stream) SetContext(ctx context.Context) {
433+
s.ctx = ctx
434+
}
435+
428436
// Method returns the method for the stream.
429437
func (s *Stream) Method() string {
430438
return s.method
@@ -437,6 +445,12 @@ func (s *Stream) Status() *status.Status {
437445
return s.status
438446
}
439447

448+
// HeaderWireLength returns the size of the headers of the stream as received
449+
// from the wire. Valid only on the server.
450+
func (s *Stream) HeaderWireLength() int {
451+
return s.headerWireLength
452+
}
453+
440454
// SetHeader sets the header metadata. This can be called multiple times.
441455
// Server side only.
442456
// This should not be called in parallel to other data writes.
@@ -698,7 +712,7 @@ type ClientTransport interface {
698712
// Write methods for a given Stream will be called serially.
699713
type ServerTransport interface {
700714
// HandleStreams receives incoming streams using the given handler.
701-
HandleStreams(func(*Stream))
715+
HandleStreams(context.Context, func(*Stream))
702716

703717
// WriteHeader sends the header metadata for the given stream.
704718
// WriteHeader may not be called on all streams.
@@ -717,9 +731,15 @@ type ServerTransport interface {
717731
// handlers will be terminated asynchronously.
718732
Close(err error)
719733

734+
// LocalAddr returns the local network address.
735+
LocalAddr() net.Addr
736+
720737
// RemoteAddr returns the remote network address.
721738
RemoteAddr() net.Addr
722739

740+
// Peer returns the peer of the server transport.
741+
Peer() *peer.Peer
742+
723743
// Drain notifies the client this ServerTransport stops accepting new RPCs.
724744
Drain(debugData string)
725745

0 commit comments

Comments
 (0)