Skip to content

Commit

Permalink
transport.write receives 2 slices; revert end2end_test
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouyihaiDing committed Aug 23, 2017
1 parent 01dd4b2 commit b24131b
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 71 deletions.
16 changes: 2 additions & 14 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
if c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(hdr)+len(data) > *c.maxSendMessageSize {
if len(data) > *c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize)
}
if len(data) != 0 {
prevOpts := opts.Last
opts.Last = false
err = t.Write(stream, hdr, opts)
if err != nil && err != io.EOF {
return err
}
opts.Last = prevOpts
err = t.Write(stream, data, opts)
} else {
err = t.Write(stream, hdr, opts)
}

err = t.Write(stream, hdr, data, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
dopts.copts.StatsHandler.HandleRPC(ctx, outPayload)
Expand Down
5 changes: 1 addition & 4 deletions call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
t.Errorf("Failed to encode the response: %v", err)
return
}
o := &transport.Options{}
h.t.Write(s, hdr, o)
h.t.Write(s, data, o)
// h.t.Write(s, replyData, &transport.Options{})
h.t.Write(s, hdr, data, &transport.Options{})
h.t.WriteStatus(s, status.New(codes.OK, ""))
}

Expand Down
8 changes: 1 addition & 7 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
if outPayload != nil {
outPayload.WireLength = payloadLen + sizeLen + len(b)
}

secondStart := 16384 - payloadLen - sizeLen
if len(b) < secondStart {
secondStart = len(b)
}
bufHeader = append(bufHeader, b[:secondStart]...)
return bufHeader, b[secondStart:], nil
return bufHeader, b, nil
}

func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error {
Expand Down
15 changes: 2 additions & 13 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,21 +682,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
}
if len(hdr)+len(data) > s.opts.maxSendMessageSize {
if len(data) > s.opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
}
if len(data) != 0 {
prevOpts := opts.Last
opts.Last = false
err = t.Write(stream, hdr, opts)
if err != nil {
return err
}
opts.Last = prevOpts
err = t.Write(stream, data, opts)
} else {
err = t.Write(stream, hdr, opts)
}
err = t.Write(stream, hdr, data, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
Expand Down
23 changes: 5 additions & 18 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,17 +374,10 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if cs.c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(hdr)+len(data) > *cs.c.maxSendMessageSize {
if len(data) > *cs.c.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize)
}
o := &transport.Options{Last: false}
err = cs.t.Write(cs.s, hdr, o)
if err != nil {
return err
}
if len(data) != 0 {
err = cs.t.Write(cs.s, data, o)
}
err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: false})
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
cs.statsHandler.HandleRPC(cs.statsCtx, outPayload)
Expand Down Expand Up @@ -456,7 +449,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}

func (cs *clientStream) CloseSend() (err error) {
err = cs.t.Write(cs.s, nil, &transport.Options{Last: true})
err = cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true})
defer func() {
if err != nil {
cs.finish(err)
Expand Down Expand Up @@ -624,18 +617,12 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err != nil {
return err
}
if len(hdr)+len(data) > ss.maxSendMessageSize {
if len(data) > ss.maxSendMessageSize {
return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize)
}
o := &transport.Options{Last: false}
if err := ss.t.Write(ss.s, hdr, o); err != nil {
if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if len(data) != 0 {
if err := ss.t.Write(ss.s, data, o); err != nil {
return toRPCErr(err)
}
}
if outPayload != nil {
outPayload.SentTime = time.Now()
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
Expand Down
6 changes: 3 additions & 3 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,7 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) {
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err := stream.Send(sreq); err != nil && err != io.EOF {
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
Expand Down Expand Up @@ -2164,7 +2164,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
ResponseParameters: respParam,
Payload: spayload,
}
if err := stream.Send(sreq); err != nil && err != io.EOF {
if err := stream.Send(sreq); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
Expand Down Expand Up @@ -4933,7 +4933,7 @@ func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
if err = stream.Send(sreq); err != nil && err != io.EOF {
if err = stream.Send(sreq); err != nil {
t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
}
if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
Expand Down
3 changes: 2 additions & 1 deletion transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,10 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
}
}

func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error {
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
return ht.do(func() {
ht.writeCommonHeaders(s)
ht.rw.Write(hdr)
ht.rw.Write(data)
if !opts.Delay {
ht.rw.(http.Flusher).Flush()
Expand Down
24 changes: 18 additions & 6 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,15 @@ func (t *http2Client) GracefulClose() error {
// should proceed only if Write returns nil.
// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
// if it improves the performance.
func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
r := bytes.NewBuffer(data)
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
secondStart := 16379 // 16384 - payloadLen - sizeLen
if len(data) < secondStart {
secondStart = len(data)
}
hdr = append(hdr, data[:secondStart]...)
data = data[secondStart:]
isLastSlice := (len(data) == 0)
r := bytes.NewBuffer(hdr)
var (
p []byte
oqv uint32
Expand Down Expand Up @@ -721,9 +728,6 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
endStream bool
forceFlush bool
)
if opts.Last && r.Len() == 0 {
endStream = true
}
// Indicate there is a writer who is about to write a data frame.
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the transport.
Expand Down Expand Up @@ -763,11 +767,19 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
t.writableChan <- 0
continue
}
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && isLastSlice {
// Do a force flush iff this is last frame for the entire gRPC message
// and the caller is the only writer at this moment.
forceFlush = true
}
if r.Len() == 0 {
if isLastSlice && opts.Last {
endStream = true
} else if !isLastSlice && len(data) != 0 {
r = bytes.NewBuffer(data)
}
isLastSlice = true
}
// If WriteData fails, all the pending streams will be handled
// by http2Client.Close(). No explicit CloseStream() needs to be
// invoked.
Expand Down
19 changes: 16 additions & 3 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,8 +821,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {

// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) {
// TODO(zhaoq): Support multi-writers for a single stream.
secondStart := 16379 // 16384 - payloadLen - sizeLen
if len(data) < secondStart {
secondStart = len(data)
}
hdr = append(hdr, data[:secondStart]...)
data = data[secondStart:]
isLastSlice := (len(data) == 0)
var writeHeaderFrame bool
s.mu.Lock()
if s.state == streamDone {
Expand All @@ -836,7 +843,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
if writeHeaderFrame {
t.WriteHeader(s, nil)
}
r := bytes.NewBuffer(data)
r := bytes.NewBuffer(hdr)
var (
p []byte
oqv uint32
Expand Down Expand Up @@ -915,9 +922,15 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
continue
}
var forceFlush bool
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last && isLastSlice {
forceFlush = true
}
if r.Len() == 0 {
if !isLastSlice {
r = bytes.NewBuffer(data)
}
isLastSlice = true
}
// Reset ping strikes when sending data since this might cause
// the peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
Expand Down
4 changes: 2 additions & 2 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ type ClientTransport interface {

// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.
Write(s *Stream, data []byte, opts *Options) error
Write(s *Stream, hdr []byte, data []byte, opts *Options) error

// NewStream creates a Stream for an RPC.
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
Expand Down Expand Up @@ -606,7 +606,7 @@ type ServerTransport interface {

// Write sends the data for the given stream.
// Write may not be called on all streams.
Write(s *Stream, data []byte, opts *Options) error
Write(s *Stream, hdr []byte, data []byte, opts *Options) error

// WriteStatus sends the status of a stream to the client. WriteStatus is
// the final call made on a stream and always occurs.
Expand Down

0 comments on commit b24131b

Please sign in to comment.