Skip to content

Commit

Permalink
update after review, MaxMsgSizeTest fail in travis, remain hrd len
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouyihaiDing committed Aug 14, 2017
1 parent 394a7de commit 388cab6
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 46 deletions.
13 changes: 7 additions & 6 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,21 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
Client: true,
}
}
outBuf, outData, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
hdr, data, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
if err != nil {
return err
}
if c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(outBuf)+len(outData) > *c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
if len(data) > *c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize)
}
prevLast := opts.Last
opts.Last = false
errHeader := t.Write(stream, outBuf, opts)
opts.Last = true
err = t.Write(stream, outData, opts)
errHeader := t.Write(stream, hdr, opts)
opts.Last = prevLast
err = t.Write(stream, data, opts)
if err == nil && errHeader == nil && outPayload != nil {
outPayload.SentTime = time.Now()
dopts.copts.StatsHandler.HandleRPC(ctx, outPayload)
Expand Down
7 changes: 4 additions & 3 deletions call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
}
}
// send a response back to end the stream.
reply, replyData, err := encode(testCodec{}, &expectedResponse, nil, nil, nil)
hdr, data, err := encode(testCodec{}, &expectedResponse, nil, nil, nil)
if err != nil {
t.Errorf("Failed to encode the response: %v", err)
return
}
h.t.Write(s, reply, &transport.Options{})
h.t.Write(s, replyData, &transport.Options{})
o := &transport.Options{}
h.t.Write(s, hdr, o)
h.t.Write(s, data, o)
h.t.WriteStatus(s, status.New(codes.OK, ""))
}

Expand Down
18 changes: 7 additions & 11 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,10 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
return pf, msg, nil
}

// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
// encode serializes msg and returns a header buffer and a message buffer. If msg is nil, it
// generates the message header and an empty message buffer.
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, []byte, error) {
var (
b []byte
length int
)
var b []byte
const payloadLen = 1
const sizeLen = 4

Expand All @@ -319,9 +316,8 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
}
}

length = len(b)
if length > math.MaxUint32 {
return nil, nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", length)
if len(b) > math.MaxUint32 {
return nil, nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
}

bufHeader := make([]byte, payloadLen+sizeLen)
Expand All @@ -331,9 +327,9 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
bufHeader[0] = byte(compressionNone)
}
// Write length of b into buf
binary.BigEndian.PutUint32(bufHeader[payloadLen:], uint32(length))
binary.BigEndian.PutUint32(bufHeader[payloadLen:], uint32(len(b)))
if outPayload != nil {
outPayload.WireLength = payloadLen + sizeLen + length
outPayload.WireLength = payloadLen + sizeLen + len(b)
}
return bufHeader, b, nil
}
Expand Down
20 changes: 10 additions & 10 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ func TestEncode(t *testing.T) {
msg proto.Message
cp Compressor
// outputs
b []byte
err error
bHeader []byte
bData []byte
err error
}{
{nil, nil, []byte{0, 0, 0, 0, 0}, nil},
{nil, nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil},
} {
b, bData, err := encode(protoCodec{}, test.msg, nil, nil, nil)
b = append(b, bData...)
if err != test.err || !bytes.Equal(b, test.b) {
t.Fatalf("encode(_, _, %v, _) = %v, %v\nwant %v, %v", test.cp, b, err, test.b, test.err)
hdr, data, err := encode(protoCodec{}, test.msg, nil, nil, nil)
if err != test.err || !bytes.Equal(hdr, test.bHeader) || !bytes.Equal(data, test.bData) {
t.Fatalf("encode(_, _, %v, _) = %v, %v, %v\nwant %v, %v, %v", test.cp, hdr, data, err, test.bHeader, test.bData, test.err)
}
}
}
Expand Down Expand Up @@ -165,9 +165,9 @@ func TestToRPCErr(t *testing.T) {
// bytes.
func bmEncode(b *testing.B, mSize int) {
msg := &perfpb.Buffer{Body: make([]byte, mSize)}
encoded, encodeData, _ := encode(protoCodec{}, msg, nil, nil, nil)
encoded = append(encoded, encodeData...)
encodedSz := int64(len(encoded))
hdr, data, _ := encode(protoCodec{}, msg, nil, nil, nil)
hdr = append(hdr, data...)
encodedSz := int64(len(hdr))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
13 changes: 7 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,18 +652,19 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
if s.opts.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
p, pData, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
if err != nil {
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
}
if len(p)+len(pData) > s.opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
if len(data) > s.opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
}
prevLast := opts.Last
opts.Last = false
errHeader := t.Write(stream, p, opts)
opts.Last = true
err = t.Write(stream, pData, opts)
errHeader := t.Write(stream, hdr, opts)
opts.Last = prevLast
err = t.Write(stream, data, opts)
if err == nil && errHeader == nil && outPayload != nil {
outPayload.SentTime = time.Now()
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
Expand Down
20 changes: 10 additions & 10 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
Client: true,
}
}
out, outData, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload)
hdr, data, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload)
defer func() {
if cs.cbuf != nil {
cs.cbuf.Reset()
Expand All @@ -374,11 +374,11 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if cs.c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(out)+len(outData) > *cs.c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize)
if len(data) > *cs.c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize)
}
errHeader := cs.t.Write(cs.s, out, &transport.Options{Last: false})
err = cs.t.Write(cs.s, outData, &transport.Options{Last: false})
errHeader := cs.t.Write(cs.s, hdr, &transport.Options{Last: false})
err = cs.t.Write(cs.s, data, &transport.Options{Last: false})
if err == nil && errHeader == nil && outPayload != nil {
outPayload.SentTime = time.Now()
cs.statsHandler.HandleRPC(cs.statsCtx, outPayload)
Expand Down Expand Up @@ -605,7 +605,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if ss.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
out, outData, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
hdr, data, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
defer func() {
if ss.cbuf != nil {
ss.cbuf.Reset()
Expand All @@ -614,13 +614,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err != nil {
return err
}
if len(out)+len(outData) > ss.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), ss.maxSendMessageSize)
if len(data) > ss.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize)
}
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
if err := ss.t.Write(ss.s, hdr, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if err := ss.t.Write(ss.s, outData, &transport.Options{Last: false}); err != nil {
if err := ss.t.Write(ss.s, data, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if outPayload != nil {
Expand Down

0 comments on commit 388cab6

Please sign in to comment.