From eeb8b08b812537cab8c68ab7e240220f4d61362e Mon Sep 17 00:00:00 2001 From: JimhHan <50871214+JimhHan@users.noreply.github.com> Date: Sat, 13 Mar 2021 19:46:34 +0800 Subject: [PATCH] Add: 0-copy reader --- common/buf/buffer.go | 16 ++++++++++ transport/internet/grpc/encoding/conn.go | 30 ++++++++++++++----- transport/internet/grpc/encoding/stream.pb.go | 2 +- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/common/buf/buffer.go b/common/buf/buffer.go index 9b32c6c7197a..ec0c32bd77e4 100644 --- a/common/buf/buffer.go +++ b/common/buf/buffer.go @@ -31,6 +31,22 @@ func New() *Buffer { } } +func NewExisted(b []byte) *Buffer { + if cap(b) < Size { + panic("Invalid buffer") + } + + oLen := len(b) + if oLen < Size { + b = append(b, make([]byte, Size-oLen)...) + } + + return &Buffer{ + v: b, + end: int32(oLen), + } +} + // StackNew creates a new Buffer object on stack. // This method is for buffers that is released in the same function. func StackNew() Buffer { diff --git a/transport/internet/grpc/encoding/conn.go b/transport/internet/grpc/encoding/conn.go index 008d30e0ac04..f255b5ffbe74 100644 --- a/transport/internet/grpc/encoding/conn.go +++ b/transport/internet/grpc/encoding/conn.go @@ -42,18 +42,13 @@ func NewHunkConn(hc HunkConn, cancel context.CancelFunc) net.Conn { } func (h *HunkReaderWriter) forceFetch() error { - // clean up buffer, safety first! - h.buf = h.buf[:0] - h.index = 0 - - hunk := new(Hunk) - hunk.Data = h.buf - err := h.hc.RecvMsg(hunk) + hunk, err := h.hc.Recv() if err != nil { return newError("failed to fetch hunk from gRPC tunnel").Base(err) } h.buf = hunk.Data + h.index = 0 return nil } @@ -70,6 +65,27 @@ func (h *HunkReaderWriter) Read(buf []byte) (int, error) { return n, nil } +func (h *HunkReaderWriter) ReadMultiBuffer() (buf.MultiBuffer, error) { + if h.index >= len(h.buf) { + if err := h.forceFetch(); err != nil { + return nil, err + } + } + + if cap(h.buf) == buf.Size { + b := h.buf + h.index = len(h.buf) + return buf.MultiBuffer{buf.NewExisted(b)}, nil + } + + b := buf.New() + _, err := b.ReadFrom(h) + if err != nil { + return nil, err + } + return buf.MultiBuffer{b}, nil +} + func (h *HunkReaderWriter) Write(buf []byte) (int, error) { err := h.hc.Send(&Hunk{Data: buf[:]}) if err != nil { diff --git a/transport/internet/grpc/encoding/stream.pb.go b/transport/internet/grpc/encoding/stream.pb.go index a914bfd1ae1e..a1a2a400b724 100644 --- a/transport/internet/grpc/encoding/stream.pb.go +++ b/transport/internet/grpc/encoding/stream.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.15.5 +// protoc v3.15.6 // source: transport/internet/grpc/encoding/stream.proto package encoding