Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove buf copy when the compressor exist #1427

Merged
merged 10 commits into from
Aug 25, 2017

Conversation

ZhouyihaiDing
Copy link
Contributor

Re-use the buffer used by compressor. Doesn't need to create new "buf" and copy the old data.

rpc_util.go Outdated
length uint
b []byte
length uint
payloadLen = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two should be const instead.

rpc_util.go Outdated
}

if length > math.MaxUint32 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unlikely to even trigger if there is a 4GB message. length should be a uint64 if we're serious about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the size can be larger than 4GB, we need to add one more byte (or 2 to 3 bits?) in the header for every message sent to save the length of the message. It's that worth?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not an option -- we must follow the protocol, which is 4 bytes to encode the length.

My point is about the type of the variables. Checking whether a uint is > MaxUint32 will only work on platforms where a uint is 64 bits.

tl;dr: please make length a uint64.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

rpc_util.go Outdated
if cp != nil {
// pre-alloc place
cbuf.Write(make([]byte, 5))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be consistent about your use of constants -- use "payloadLen+sizeLen" where you have 5.

rpc_util.go Outdated
// Write payload format
b[0] = byte(compressionMade)
// Write length of b into buf
binary.BigEndian.PutUint32(b[1:], uint32(length))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1->payloadLen

rpc_util.go Outdated

var buf []byte
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about inverting things to share the length-writing and wirelength code?

var buf []byte
if cp == nil {
  buf = make([]byte, payloadLen+sizeLen+len(b))
  buf[0] = byte(compressionNone)
} else {
  buf = b
  buf[0] = byte(compressionMade)
}
binary.BigEndian.PutUint32(buf[1:], uint32(length))
if outPayload != nil {
  outPayload.WireLength = len(buf)
}
return buf, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I thought buf = b will assign new alloc for buf, but it turns out it doesn't.

@dfawley
Copy link
Member

dfawley commented Aug 8, 2017 via email

@ZhouyihaiDing ZhouyihaiDing force-pushed the pr_remove_cbuf branch 17 times, most recently from 3c0da79 to 394a7de Compare August 10, 2017 18:44
@dfawley dfawley self-assigned this Aug 10, 2017
call.go Outdated
@@ -102,18 +102,21 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
Client: true,
}
}
outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
outBuf, outData, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename "outBuf" to "outHdr" please, to represent what it has. Or just "hdr" and "data" would be fine, too -- it might look a bit nicer that way.

Please make a similar change everywhere encode is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

call.go Outdated
if err != nil {
return err
}
if c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(outBuf) > *c.maxSendMessageSize {
if len(outBuf)+len(outData) > *c.maxSendMessageSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's well-specified, but I would say actually this check should not include the header. So len(outData) alone is better.

Please make a similar change everywhere encode is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

call.go Outdated
if err == nil && outPayload != nil {
opts.Last = false
errHeader := t.Write(stream, outBuf, opts)
opts.Last = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be saved before setting it to false (on line 115) and restored to its previous value here instead of set to true.

There is the same problem in another spot, too, please fix all of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

call_test.go Outdated
if err != nil {
t.Errorf("Failed to encode the response: %v", err)
return
}
h.t.Write(s, reply, &transport.Options{})
h.t.Write(s, replyData, &transport.Options{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates another allocation. This would be better:

o := &transport.Options{}
h.t.Write(..., o)
h.t.Write(..., o)

This occurs below, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

rpc_util.go Outdated
@@ -289,59 +289,53 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt

// encode serializes msg and prepends the message header. If msg is nil, it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs updating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

rpc_util.go Outdated
}

length = len(b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just remove "length" now. len(b) is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

rpc_util_test.go Outdated
@@ -109,7 +109,8 @@ func TestEncode(t *testing.T) {
}{
{nil, nil, []byte{0, 0, 0, 0, 0}, nil},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hardly tests anything!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Assert both header and data.

@ZhouyihaiDing ZhouyihaiDing force-pushed the pr_remove_cbuf branch 5 times, most recently from a48e9e6 to 46a54bc Compare August 11, 2017 21:52
@ZhouyihaiDing ZhouyihaiDing force-pushed the pr_remove_cbuf branch 3 times, most recently from 388cab6 to 3cb43d6 Compare August 14, 2017 23:33
@ZhouyihaiDing ZhouyihaiDing force-pushed the pr_remove_cbuf branch 2 times, most recently from e3d0ddd to 03d3d52 Compare August 16, 2017 22:31
@ZhouyihaiDing
Copy link
Contributor Author

Update end2end_test.go to make it pass travis. Can you help me review it again?
Thank you very much!

@ZhouyihaiDing ZhouyihaiDing removed their assignment Aug 17, 2017
@dfawley dfawley self-assigned this Aug 17, 2017
@dfawley dfawley added the Type: Performance Performance improvements (CPU, network, memory, etc) label Aug 22, 2017
call.go Outdated
if err != nil {
return err
}
if c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(outBuf) > *c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
if len(hdr)+len(data) > *c.maxSendMessageSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we said we wouldn't include the header as the message for this check. (Also the error message would be out of sync.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

rpc_util.go Outdated
if len(b) < secondStart {
secondStart = len(b)
}
bufHeader = append(bufHeader, b[:secondStart]...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see what happened here.

Please keep outputting this the way it was before: only the header + all of the message data.

Then pass them into the transport.Write function as two slices (instead of making two separate calls), then combine within there. This way, the knowledge that a frame is 16kb is contained within the transport, which is desirable as it's really a function of the transport not gRPC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Transport.Write will append data to the header slice.

func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
r := bytes.NewBuffer(data)
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
secondStart := 16379 // 16384 - payloadLen - sizeLen
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be http2MaxFrameLen - len(hdr), to avoid presuming the length of the header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if isLastSlice && opts.Last {
endStream = true
} else if !isLastSlice && len(data) != 0 {
r = bytes.NewBuffer(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.Reset(data)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not Done. I need some time to check the overhead if using slice directly.

@@ -763,11 +767,19 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
t.writableChan <- 0
continue
}
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && isLastSlice {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this after the new if r.Len() == 0 that you added so you don't need to check isLastSlice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -821,8 +821,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {

// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

....soooo much duplication :/

All of the comments in http2Client.Write apply here as well.

(Don't try to fix the duplication in this PR.)

@dfawley dfawley merged commit 01089b2 into grpc:master Aug 25, 2017
irfansharif added a commit to irfansharif/grpc-go-fork that referenced this pull request Aug 27, 2017
irfansharif added a commit to irfansharif/grpc-go-fork that referenced this pull request Aug 27, 2017
grpc#1427 avoided the allocation of an
extra buffer by re-using the buffer generated by the Compressor. It did
so by separating out the header and data buffers and delegating to the
transport layer the chunking of the two into appropriately sized frames.

At the transport layer however, given the affinity to generate 16KB
frames, we re-concatenated the header and data buffers and subsequently
flushed out contiguous chunks. Given that the header buffer was always
5KB in size, merging the data bytes into the header buffer necessarily
involved allocations and copying. In practice for every
{Client,Server}Transport.Write we would allocate & copy from 0 to ~16KB
bytes of the data buffer into the header buffer.

By reverting grpc#1427 and re-working
the implementation of rpc_util.go:encode, we can stage a single buffer
comprised of both the serialized{,compressed} message with the
pre-pended message header. The transport layer is again agnostic to the
exact delineation and can simply write out 16KB chunks as needed without
having to copy/allocate.
irfansharif added a commit to irfansharif/grpc-go-fork that referenced this pull request Aug 27, 2017
irfansharif added a commit to irfansharif/grpc-go-fork that referenced this pull request Aug 27, 2017
grpc#1427 avoided the allocation of an
extra buffer by re-using the buffer generated by the Compressor. It did
so by separating out the header and data buffers and delegating to the
transport layer the chunking of the two into appropriately sized frames.

At the transport layer however, given the affinity to generate 16KB
frames, we re-concatenated the header and data buffers and subsequently
flushed out contiguous chunks. Given that the header buffer was always
5KB in size, merging the data bytes into the header buffer necessarily
involved allocations and copying. In practice for every
{Client,Server}Transport.Write we would allocate & copy from 0 to ~16KB
bytes of the data buffer into the header buffer.

By reverting grpc#1427 and re-working
the implementation of rpc_util.go:encode, we can stage a single buffer
comprised of both the serialized{,compressed} message with the
pre-pended message header. The transport layer is again agnostic to the
exact delineation and can simply write out 16KB chunks as needed without
having to copy/allocate.
menghanl pushed a commit to menghanl/grpc-go that referenced this pull request Aug 30, 2017
@lock lock bot locked as resolved and limited conversation to collaborators Jan 18, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Type: Performance Performance improvements (CPU, network, memory, etc)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants