Skip to content

Commit

Permalink
rpc_util: Reuse memory buffer for receiving message
Browse files Browse the repository at this point in the history
  • Loading branch information
hueypark committed Dec 13, 2022
1 parent 3e27f89 commit d198e0a
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ type parser struct {
// The header of a gRPC message. Find more detail at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte

// buffer is recycled memory used to read the message.
buffer []byte
}

// recvMsg reads a complete gRPC message from the stream.
Expand Down Expand Up @@ -573,9 +576,10 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if uint32(cap(p.buffer)) < length {
p.buffer = make([]byte, int(length))
}
msg = p.buffer[:length]
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
Expand Down Expand Up @@ -688,12 +692,12 @@ type payloadInfo struct {
}

func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
pf, buf, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return nil, err
}
if payInfo != nil {
payInfo.wireLength = len(d)
payInfo.wireLength = len(buf)
}

if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
Expand All @@ -705,10 +709,10 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
d, err = dc.Do(bytes.NewReader(d))
size = len(d)
buf, err = dc.Do(bytes.NewReader(buf))
size = len(buf)
} else {
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
buf, size, err = decompress(compressor, buf, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
Expand All @@ -719,7 +723,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
}
return d, nil
return buf, nil
}

// Using compressor, decompress d, returning data and size.
Expand Down Expand Up @@ -754,15 +758,16 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
if err != nil {
return err
}
if err := c.Unmarshal(d, m); err != nil {
if err := c.Unmarshal(buf, m); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
if payInfo != nil {
payInfo.uncompressedBytes = d
if payInfo != nil && buf != nil {
payInfo.uncompressedBytes = make([]byte, len(buf))
copy(payInfo.uncompressedBytes, buf)
}
return nil
}
Expand Down

0 comments on commit d198e0a

Please sign in to comment.