Skip to content

Commit b115b69

Browse files
committed
Optimize and fix regression on recvAndDecompress method when utilizing compression
Signed-off-by: alanprot <alanprot@gmail.com>
1 parent cb32937 commit b115b69

File tree

2 files changed

+89
-23
lines changed

2 files changed

+89
-23
lines changed

rpc_util.go

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -876,31 +876,27 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes
876876
if err != nil {
877877
return nil, 0, err
878878
}
879-
880-
// TODO: Can/should this still be preserved with the new BufferSlice API? Are
881-
// there any actual benefits to allocating a single large buffer instead of
882-
// multiple smaller ones?
883-
//if sizer, ok := compressor.(interface {
884-
// DecompressedSize(compressedBytes []byte) int
885-
//}); ok {
886-
// if size := sizer.DecompressedSize(d); size >= 0 {
887-
// if size > maxReceiveMessageSize {
888-
// return nil, size, nil
889-
// }
890-
// // size is used as an estimate to size the buffer, but we
891-
// // will read more data if available.
892-
// // +MinRead so ReadFrom will not reallocate if size is correct.
893-
// //
894-
// // TODO: If we ensure that the buffer size is the same as the DecompressedSize,
895-
// // we can also utilize the recv buffer pool here.
896-
// buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
897-
// bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
898-
// return buf.Bytes(), int(bytesRead), err
899-
// }
900-
//}
879+
bufferSize := 32 * 1024
880+
881+
if sizer, ok := compressor.(interface {
882+
DecompressedSize(compressedBytes []byte) int
883+
}); ok {
884+
if size := sizer.DecompressedSize(d.Materialize()); size >= 0 {
885+
if size > maxReceiveMessageSize {
886+
return nil, size, nil
887+
}
888+
// size is used as an estimate to size the buffer, but we
889+
// will read more data if available
890+
// io.CopyBuffer will not reallocate if size is correct.
891+
bufferSize = size
892+
}
893+
}
901894

902895
var out mem.BufferSlice
903-
_, err = io.Copy(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
896+
buff := pool.Get(bufferSize)
897+
b := mem.NewBuffer(buff, pool)
898+
_, err = io.CopyBuffer(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1), *buff)
899+
b.Free()
904900
if err != nil {
905901
out.Free()
906902
return nil, 0, err

server_ext_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ package grpc_test
2020

2121
import (
2222
"context"
23+
"fmt"
24+
"google.golang.org/grpc/encoding"
25+
"google.golang.org/grpc/encoding/gzip"
2326
"io"
2427
"runtime"
2528
"sync"
@@ -330,3 +333,70 @@ func (s) TestServer_GracefulStopWaits(t *testing.T) {
330333
t.Fatalf("Timed out waiting for second RPC to start on server.")
331334
}
332335
}
336+
337+
type nopCloserWriter struct{ io.Writer }
338+
339+
func (n nopCloserWriter) Close() error { return nil }
340+
341+
type noopCompressor struct {
342+
}
343+
344+
func (i *noopCompressor) Compress(w io.Writer) (io.WriteCloser, error) {
345+
return &nopCloserWriter{
346+
Writer: w,
347+
}, nil
348+
}
349+
350+
func (i *noopCompressor) Decompress(r io.Reader) (io.Reader, error) {
351+
return r, nil
352+
}
353+
354+
func (i *noopCompressor) Name() string {
355+
return "noop"
356+
}
357+
358+
func BenchmarkRPCCompressor(b *testing.B) {
359+
encoding.RegisterCompressor(&noopCompressor{})
360+
361+
for _, comp := range []string{gzip.Name, "noop"} {
362+
for _, payloadSize := range []int{1024, 10 * 1024, 500 * 1024} {
363+
b.Run(fmt.Sprintf("comp=%v,payloadSize=%v", comp, payloadSize), func(b *testing.B) {
364+
ss := stubserver.StubServer{
365+
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
366+
return &testpb.SimpleResponse{
367+
Username: "test",
368+
}, nil
369+
},
370+
}
371+
372+
// Start one RPC to the server.
373+
if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.UseCompressor(comp))); err != nil {
374+
b.Fatal("Error starting server:", err)
375+
}
376+
defer ss.Stop()
377+
paylaod := make([]byte, payloadSize)
378+
for i := 0; i < payloadSize; i++ {
379+
paylaod[i] = byte(i)
380+
}
381+
382+
for i := 0; i < b.N; i++ {
383+
b.ReportAllocs()
384+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
385+
defer cancel()
386+
_, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{
387+
ResponseType: testpb.PayloadType_COMPRESSABLE,
388+
ResponseSize: 1024,
389+
Payload: &testpb.Payload{
390+
Type: testpb.PayloadType_COMPRESSABLE,
391+
Body: paylaod,
392+
},
393+
})
394+
395+
if err != nil {
396+
b.Fatal("Error staring call:", err)
397+
}
398+
}
399+
})
400+
}
401+
}
402+
}

0 commit comments

Comments
 (0)