From b24131b5181fb9ba6faf526c854eff88de53e547 Mon Sep 17 00:00:00 2001 From: ZhouyihaiDing Date: Wed, 23 Aug 2017 14:00:10 -0700 Subject: [PATCH] transport.write receives 2 slices; revert end2end_test --- call.go | 16 ++-------------- call_test.go | 5 +---- rpc_util.go | 8 +------- server.go | 15 ++------------- stream.go | 23 +++++------------------ test/end2end_test.go | 6 +++--- transport/handler_server.go | 3 ++- transport/http2_client.go | 24 ++++++++++++++++++------ transport/http2_server.go | 19 ++++++++++++++++--- transport/transport.go | 4 ++-- 10 files changed, 52 insertions(+), 71 deletions(-) diff --git a/call.go b/call.go index 04a088dfdaae..438758fc3ed1 100644 --- a/call.go +++ b/call.go @@ -106,22 +106,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, if c.maxSendMessageSize == nil { return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") } - if len(hdr)+len(data) > *c.maxSendMessageSize { + if len(data) > *c.maxSendMessageSize { return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize) } - if len(data) != 0 { - prevOpts := opts.Last - opts.Last = false - err = t.Write(stream, hdr, opts) - if err != nil && err != io.EOF { - return err - } - opts.Last = prevOpts - err = t.Write(stream, data, opts) - } else { - err = t.Write(stream, hdr, opts) - } - + err = t.Write(stream, hdr, data, opts) if err == nil && outPayload != nil { outPayload.SentTime = time.Now() dopts.copts.StatsHandler.HandleRPC(ctx, outPayload) diff --git a/call_test.go b/call_test.go index 377c698a2c72..f3113092948d 100644 --- a/call_test.go +++ b/call_test.go @@ -109,10 +109,7 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { t.Errorf("Failed to encode the response: %v", err) return } - o := &transport.Options{} - h.t.Write(s, hdr, o) - h.t.Write(s, data, o) - // h.t.Write(s, replyData, &transport.Options{}) + h.t.Write(s, hdr, data, &transport.Options{}) h.t.WriteStatus(s, status.New(codes.OK, "")) } diff --git a/rpc_util.go b/rpc_util.go index b312b9540879..caded6522098 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -332,13 +332,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl if outPayload != nil { outPayload.WireLength = payloadLen + sizeLen + len(b) } - - secondStart := 16384 - payloadLen - sizeLen - if len(b) < secondStart { - secondStart = len(b) - } - bufHeader = append(bufHeader, b[:secondStart]...) - return bufHeader, b[secondStart:], nil + return bufHeader, b, nil } func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error { diff --git a/server.go b/server.go index ee3a9a51cdfd..86fe20a53a19 100644 --- a/server.go +++ b/server.go @@ -682,21 +682,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str grpclog.Errorln("grpc: server failed to encode response: ", err) return err } - if len(hdr)+len(data) > s.opts.maxSendMessageSize { + if len(data) > s.opts.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize) } - if len(data) != 0 { - prevOpts := opts.Last - opts.Last = false - err = t.Write(stream, hdr, opts) - if err != nil { - return err - } - opts.Last = prevOpts - err = t.Write(stream, data, opts) - } else { - err = t.Write(stream, hdr, opts) - } + err = t.Write(stream, hdr, data, opts) if err == nil && outPayload != nil { outPayload.SentTime = time.Now() s.opts.statsHandler.HandleRPC(stream.Context(), outPayload) diff --git a/stream.go b/stream.go index 147c5cdbcacb..2fcf36873746 100644 --- a/stream.go +++ b/stream.go @@ -374,17 +374,10 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if cs.c.maxSendMessageSize == nil { return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") } - if len(hdr)+len(data) > *cs.c.maxSendMessageSize { + if len(data) > *cs.c.maxSendMessageSize { return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize) } - o := &transport.Options{Last: false} - err = cs.t.Write(cs.s, hdr, o) - if err != nil { - return err - } - if len(data) != 0 { - err = cs.t.Write(cs.s, data, o) - } + err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: false}) if err == nil && outPayload != nil { outPayload.SentTime = time.Now() cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) @@ -456,7 +449,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } func (cs *clientStream) CloseSend() (err error) { - err = cs.t.Write(cs.s, nil, &transport.Options{Last: true}) + err = cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true}) defer func() { if err != nil { cs.finish(err) @@ -624,18 +617,12 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { if err != nil { return err } - if len(hdr)+len(data) > ss.maxSendMessageSize { + if len(data) > ss.maxSendMessageSize { return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize) } - o := &transport.Options{Last: false} - if err := ss.t.Write(ss.s, hdr, o); err != nil { + if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil { return toRPCErr(err) } - if len(data) != 0 { - if err := ss.t.Write(ss.s, data, o); err != nil { - return toRPCErr(err) - } - } if outPayload != nil { outPayload.SentTime = time.Now() ss.statsHandler.HandleRPC(ss.s.Context(), outPayload) diff --git a/test/end2end_test.go b/test/end2end_test.go index bec6b5bfc336..1d1b6c52456f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1771,7 +1771,7 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err != nil && err != io.EOF { + if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { @@ -2164,7 +2164,7 @@ func testExceedMsgLimit(t *testing.T, e env) { ResponseParameters: respParam, Payload: spayload, } - if err := stream.Send(sreq); err != nil && err != io.EOF { + if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { @@ -4933,7 +4933,7 @@ func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err = stream.Send(sreq); err != nil && err != io.EOF { + if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send() = _, %v, want ", stream, err) } if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { diff --git a/transport/handler_server.go b/transport/handler_server.go index c2648a3f0592..071023c765b8 100644 --- a/transport/handler_server.go +++ b/transport/handler_server.go @@ -246,9 +246,10 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { } } -func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error { +func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { return ht.do(func() { ht.writeCommonHeaders(s) + ht.rw.Write(hdr) ht.rw.Write(data) if !opts.Delay { ht.rw.(http.Flusher).Flush() diff --git a/transport/http2_client.go b/transport/http2_client.go index 344f6010fa37..fa2abf47887b 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -678,8 +678,15 @@ func (t *http2Client) GracefulClose() error { // should proceed only if Write returns nil. // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later // if it improves the performance. -func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { - r := bytes.NewBuffer(data) +func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { + secondStart := 16379 // 16384 - payloadLen - sizeLen + if len(data) < secondStart { + secondStart = len(data) + } + hdr = append(hdr, data[:secondStart]...) + data = data[secondStart:] + isLastSlice := (len(data) == 0) + r := bytes.NewBuffer(hdr) var ( p []byte oqv uint32 @@ -721,9 +728,6 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { endStream bool forceFlush bool ) - if opts.Last && r.Len() == 0 { - endStream = true - } // Indicate there is a writer who is about to write a data frame. t.framer.adjustNumWriters(1) // Got some quota. Try to acquire writing privilege on the transport. @@ -763,11 +767,19 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { t.writableChan <- 0 continue } - if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { + if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && isLastSlice { // Do a force flush iff this is last frame for the entire gRPC message // and the caller is the only writer at this moment. forceFlush = true } + if r.Len() == 0 { + if isLastSlice && opts.Last { + endStream = true + } else if !isLastSlice && len(data) != 0 { + r = bytes.NewBuffer(data) + } + isLastSlice = true + } // If WriteData fails, all the pending streams will be handled // by http2Client.Close(). No explicit CloseStream() needs to be // invoked. diff --git a/transport/http2_server.go b/transport/http2_server.go index b6f93e3c0c9f..4a53c14c3991 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -821,8 +821,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { // Write converts the data into HTTP2 data frame and sends it out. Non-nil error // is returns if it fails (e.g., framing error, transport error). -func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { +func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) { // TODO(zhaoq): Support multi-writers for a single stream. + secondStart := 16379 // 16384 - payloadLen - sizeLen + if len(data) < secondStart { + secondStart = len(data) + } + hdr = append(hdr, data[:secondStart]...) + data = data[secondStart:] + isLastSlice := (len(data) == 0) var writeHeaderFrame bool s.mu.Lock() if s.state == streamDone { @@ -836,7 +843,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { if writeHeaderFrame { t.WriteHeader(s, nil) } - r := bytes.NewBuffer(data) + r := bytes.NewBuffer(hdr) var ( p []byte oqv uint32 @@ -915,9 +922,15 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { continue } var forceFlush bool - if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { + if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last && isLastSlice { forceFlush = true } + if r.Len() == 0 { + if !isLastSlice { + r = bytes.NewBuffer(data) + } + isLastSlice = true + } // Reset ping strikes when sending data since this might cause // the peer to send ping. atomic.StoreUint32(&t.resetPingStrikes, 1) diff --git a/transport/transport.go b/transport/transport.go index ec0fe678dbf4..c5732beec052 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -564,7 +564,7 @@ type ClientTransport interface { // Write sends the data for the given stream. A nil stream indicates // the write is to be performed on the transport as a whole. - Write(s *Stream, data []byte, opts *Options) error + Write(s *Stream, hdr []byte, data []byte, opts *Options) error // NewStream creates a Stream for an RPC. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) @@ -606,7 +606,7 @@ type ServerTransport interface { // Write sends the data for the given stream. // Write may not be called on all streams. - Write(s *Stream, data []byte, opts *Options) error + Write(s *Stream, hdr []byte, data []byte, opts *Options) error // WriteStatus sends the status of a stream to the client. WriteStatus is // the final call made on a stream and always occurs.