Skip to content

Commit dc355bc

Browse files
authored
transport: Remove buffer copies while writing HTTP/2 Data frames (#8667)
This PR removes 2 buffer copies while writing data frames to the underlying net.Conn: one [within gRPC](https://github.com/grpc/grpc-go/blob/58d4b2b1492dbcfdf26daa7ed93830ebb871faf1/internal/transport/controlbuf.go#L1009-L1022) and the other [in the framer](https://cs.opensource.google/go/x/net/+/master:http2/frame.go;l=743;drc=6e243da531559f8c99439dabc7647dec07191f9b). Care is taken to avoid any extra heap allocations which can affect performance for smaller payloads. A [CL](https://go-review.git.corp.google.com/c/net/+/711620) is out for review which allows using the framer to write frame headers. This PR duplicates the header writing code as a temporary workaround. This PR will be merged only after the CL is merged. ## Results ### Small payloads Performance for small payloads increases slightly due to the reduction of a `deferred` statement. ``` $ go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \ -compression=off -maxConcurrentCalls=120 -trace=off \ -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}" $ go run benchmark/benchresult/main.go unary-before unary-after Title Before After Percentage TotalOps 7600878 7653522 0.69% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 10007.07 10000.89 -0.07% Allocs/op 146.93 146.91 0.00% ReqT/op 101345040.00 102046960.00 0.69% RespT/op 101345040.00 102046960.00 0.69% 50th-Lat 833.724µs 830.041µs -0.44% 90th-Lat 1.281969ms 1.275336ms -0.52% 99th-Lat 2.403961ms 2.360606ms -1.80% Avg-Lat 946.123µs 939.734µs -0.68% GoVersion go1.24.8 go1.24.8 GrpcVersion 1.77.0-dev 1.77.0-dev ``` ### Large payloads Local benchmarks show a ~5-10% regression with 1 MB payloads on my dev machine. The profiles show increased time spent in the copy operation [inside the buffered writer](https://github.com/grpc/grpc-go/blob/58d4b2b1492dbcfdf26daa7ed93830ebb871faf1/internal/transport/http_util.go#L334). Counterintuitively, copying the grpc header and message data into a larger buffer increased the performance by 4% (compared to master). To validate this behaviour (extra copy increasing performance) I ran [the k8s benchmark for 1MB payloads](https://github.com/grpc/grpc/blob/65c9be86830b0e423dd970c066c69a06a9240298/tools/run_tests/performance/scenario_config.py#L291-L305) and 100 concurrent streams which showed ~5% increase in QPS without the copies across multiple runs. Adding a copy reduced the performance. Load test config file: [loadtest.yaml](https://github.com/user-attachments/files/23055312/loadtest.yaml) ``` # 30 core client and server Before QPS: 498.284 (16.6095/server core) Latencies (50/90/95/99/99.9%-ile): 233256/275972/281250/291803/298533 us Server system time: 93.0164 Server user time: 142.533 Client system time: 97.2688 Client user time: 144.542 After QPS: 526.776 (17.5592/server core) Latencies (50/90/95/99/99.9%-ile): 211010/263189/270969/280656/288828 us Server system time: 96.5959 Server user time: 147.668 Client system time: 101.973 Client user time: 150.234 # 8 core client and server Before QPS: 291.049 (36.3811/server core) Latencies (50/90/95/99/99.9%-ile): 294552/685822/903554/1.48399e+06/1.50757e+06 us Server system time: 49.0355 Server user time: 87.1783 Client system time: 60.1945 Client user time: 103.633 After QPS: 334.119 (41.7649/server core) Latencies (50/90/95/99/99.9%-ile): 279395/518849/706327/1.09273e+06/1.11629e+06 us Server system time: 69.3136 Server user time: 102.549 Client system time: 80.9804 Client user time: 107.103 ``` RELEASE NOTES: * transport: Avoid two buffer copies when writing Data frames.
1 parent 363018c commit dc355bc

File tree

4 files changed

+322
-24
lines changed

4 files changed

+322
-24
lines changed

internal/transport/controlbuf.go

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,16 @@ const (
496496
serverSide
497497
)
498498

499+
// maxWriteBufSize is the maximum length (number of elements) the cached
500+
// writeBuf can grow to. The length depends on the number of buffers
501+
// contained within the BufferSlice produced by the codec, which is
502+
// generally small.
503+
//
504+
// If a writeBuf larger than this limit is required, it will be allocated
505+
// and freed after use, rather than being cached. This avoids holding
506+
// on to large amounts of memory.
507+
const maxWriteBufSize = 64
508+
499509
// Loopy receives frames from the control buffer.
500510
// Each frame is handled individually; most of the work done by loopy goes
501511
// into handling data frames. Loopy maintains a queue of active streams, and each
@@ -530,6 +540,8 @@ type loopyWriter struct {
530540

531541
// Side-specific handlers
532542
ssGoAwayHandler func(*goAway) (bool, error)
543+
544+
writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
533545
}
534546

535547
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
@@ -962,11 +974,11 @@ func (l *loopyWriter) processData() (bool, error) {
962974

963975
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
964976
// Client sends out empty data frame with endStream = true
965-
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
977+
if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
966978
return false, err
967979
}
968980
str.itl.dequeue() // remove the empty data item from stream
969-
_ = reader.Close()
981+
reader.Close()
970982
if str.itl.isEmpty() {
971983
str.state = empty
972984
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -999,25 +1011,20 @@ func (l *loopyWriter) processData() (bool, error) {
9991011
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
10001012
size := hSize + dSize
10011013

1002-
var buf *[]byte
1003-
1004-
if hSize != 0 && dSize == 0 {
1005-
buf = &dataItem.h
1006-
} else {
1007-
// Note: this is only necessary because the http2.Framer does not support
1008-
// partially writing a frame, so the sequence must be materialized into a buffer.
1009-
// TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
1010-
pool := l.bufferPool
1011-
if pool == nil {
1012-
// Note that this is only supposed to be nil in tests. Otherwise, stream is
1013-
// always initialized with a BufferPool.
1014-
pool = mem.DefaultBufferPool()
1014+
l.writeBuf = l.writeBuf[:0]
1015+
if hSize > 0 {
1016+
l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
1017+
}
1018+
if dSize > 0 {
1019+
var err error
1020+
l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
1021+
if err != nil {
1022+
// This must never happen since the reader must have at least dSize
1023+
// bytes.
1024+
// Log an error to fail tests.
1025+
l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
1026+
return false, err
10151027
}
1016-
buf = pool.Get(size)
1017-
defer pool.Put(buf)
1018-
1019-
copy((*buf)[:hSize], dataItem.h)
1020-
_, _ = reader.Read((*buf)[hSize:])
10211028
}
10221029

10231030
// Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1030,15 +1037,22 @@ func (l *loopyWriter) processData() (bool, error) {
10301037
if dataItem.onEachWrite != nil {
10311038
dataItem.onEachWrite()
10321039
}
1033-
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
1040+
err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
1041+
reader.Discard(dSize)
1042+
if cap(l.writeBuf) > maxWriteBufSize {
1043+
l.writeBuf = nil
1044+
} else {
1045+
clear(l.writeBuf)
1046+
}
1047+
if err != nil {
10341048
return false, err
10351049
}
10361050
str.bytesOutStanding += size
10371051
l.sendQuota -= uint32(size)
10381052
dataItem.h = dataItem.h[hSize:]
10391053

10401054
if remainingBytes == 0 { // All the data from that message was written out.
1041-
_ = reader.Close()
1055+
reader.Close()
10421056
str.itl.dequeue()
10431057
}
10441058
if str.itl.isEmpty() {

internal/transport/http_util.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ func (df *parsedDataFrame) StreamEnded() bool {
400400
type framer struct {
401401
writer *bufWriter
402402
fr *http2.Framer
403+
headerBuf []byte // cached slice for framer headers to reduce heap allocs.
403404
reader io.Reader
404405
dataFrame parsedDataFrame // Cached data frame to avoid heap allocations.
405406
pool mem.BufferPool
@@ -443,6 +444,41 @@ func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWr
443444
return f
444445
}
445446

447+
// writeData writes a DATA frame.
448+
//
449+
// It is the caller's responsibility not to violate the maximum frame size.
450+
func (f *framer) writeData(streamID uint32, endStream bool, data [][]byte) error {
451+
var flags http2.Flags
452+
if endStream {
453+
flags = http2.FlagDataEndStream
454+
}
455+
length := uint32(0)
456+
for _, d := range data {
457+
length += uint32(len(d))
458+
}
459+
// TODO: Replace the header write with the framer API being added in
460+
// https://github.com/golang/go/issues/66655.
461+
f.headerBuf = append(f.headerBuf[:0],
462+
byte(length>>16),
463+
byte(length>>8),
464+
byte(length),
465+
byte(http2.FrameData),
466+
byte(flags),
467+
byte(streamID>>24),
468+
byte(streamID>>16),
469+
byte(streamID>>8),
470+
byte(streamID))
471+
if _, err := f.writer.Write(f.headerBuf); err != nil {
472+
return err
473+
}
474+
for _, d := range data {
475+
if _, err := f.writer.Write(d); err != nil {
476+
return err
477+
}
478+
}
479+
return nil
480+
}
481+
446482
// readFrame reads a single frame. The returned Frame is only valid
447483
// until the next call to readFrame.
448484
func (f *framer) readFrame() (any, error) {

mem/buffer_slice.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package mem
2020

2121
import (
22+
"fmt"
2223
"io"
2324
)
2425

@@ -126,9 +127,10 @@ func (s BufferSlice) Reader() *Reader {
126127
}
127128

128129
// Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
129-
// with other parts systems. It also provides an additional convenience method
130-
// Remaining(), which returns the number of unread bytes remaining in the slice.
130+
// with other systems.
131+
//
131132
// Buffers will be freed as they are read.
133+
//
132134
// A Reader can be constructed from a BufferSlice; alternatively the zero value
133135
// of a Reader may be used after calling Reset on it.
134136
type Reader struct {
@@ -285,3 +287,59 @@ nextBuffer:
285287
}
286288
}
287289
}
290+
291+
// Discard skips the next n bytes, returning the number of bytes discarded.
292+
//
293+
// It frees buffers as they are fully consumed.
294+
//
295+
// If Discard skips fewer than n bytes, it also returns an error.
296+
func (r *Reader) Discard(n int) (discarded int, err error) {
297+
total := n
298+
for n > 0 && r.len > 0 {
299+
curData := r.data[0].ReadOnlyData()
300+
curSize := min(n, len(curData)-r.bufferIdx)
301+
n -= curSize
302+
r.len -= curSize
303+
r.bufferIdx += curSize
304+
if r.bufferIdx >= len(curData) {
305+
r.data[0].Free()
306+
r.data = r.data[1:]
307+
r.bufferIdx = 0
308+
}
309+
}
310+
discarded = total - n
311+
if n > 0 {
312+
return discarded, fmt.Errorf("insufficient bytes in reader")
313+
}
314+
return discarded, nil
315+
}
316+
317+
// Peek returns the next n bytes without advancing the reader.
318+
//
319+
// Peek appends results to the provided res slice and returns the updated slice.
320+
// This pattern allows re-using the storage of res if it has sufficient
321+
// capacity.
322+
//
323+
// The returned subslices are views into the underlying buffers and are only
324+
// valid until the reader is advanced past the corresponding buffer.
325+
//
326+
// If Peek returns fewer than n bytes, it also returns an error.
327+
func (r *Reader) Peek(n int, res [][]byte) ([][]byte, error) {
328+
for i := 0; n > 0 && i < len(r.data); i++ {
329+
curData := r.data[i].ReadOnlyData()
330+
start := 0
331+
if i == 0 {
332+
start = r.bufferIdx
333+
}
334+
curSize := min(n, len(curData)-start)
335+
if curSize == 0 {
336+
continue
337+
}
338+
res = append(res, curData[start:start+curSize])
339+
n -= curSize
340+
}
341+
if n > 0 {
342+
return nil, fmt.Errorf("insufficient bytes in reader")
343+
}
344+
return res, nil
345+
}

0 commit comments

Comments
 (0)