Skip to content

Commit

Permalink
Revert "http2: Send WindowUpdates when remaining bytes are below a th…
Browse files Browse the repository at this point in the history
…reshold"

This reverts commit 2e0b12c.

The calculation for when to return flow control doesn't properly take
data in server read buffers into account, resulting in flow control
credit being returned too quickly without backpressure.

Fixes golang/go#56315
For golang/go#28732

Change-Id: I573afd1a37d8a711da47f05f38f4de04183fb941
Reviewed-on: https://go-review.googlesource.com/c/net/+/448055
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Damien Neil <dneil@google.com>
Reviewed-by: Roland Shoemaker <roland@golang.org>
  • Loading branch information
neild committed Nov 4, 2022
1 parent 702349b commit d7f77dc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 111 deletions.
40 changes: 13 additions & 27 deletions http2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,9 @@ func (sc *serverConn) serve() {

// Each connection starts with initialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
sc.sendWindowUpdate(nil)
if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}

if err := sc.readPreface(); err != nil {
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
Expand Down Expand Up @@ -1469,8 +1471,7 @@ func (sc *serverConn) processFrame(f Frame) error {
if sc.inflow.available() < int32(f.Length) {
return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
}
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil) // conn-level
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
}
return nil
}
Expand Down Expand Up @@ -1599,7 +1600,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
if p := st.body; p != nil {
// Return any buffered unread bytes worth of conn-level flow control.
// See golang.org/issue/16481
sc.sendWindowUpdate(nil)
sc.sendWindowUpdate(nil, p.Len())

p.CloseWithError(err)
}
Expand Down Expand Up @@ -1737,7 +1738,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
// sendWindowUpdate, which also schedules sending the
// frames.
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil) // conn-level
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level

if st != nil && st.resetQueued {
// Already have a stream error in flight. Don't send another.
Expand All @@ -1755,7 +1756,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
}
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil) // conn-level
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level

st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
Expand All @@ -1773,7 +1774,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
if len(data) > 0 {
wrote, err := st.body.Write(data)
if err != nil {
sc.sendWindowUpdate32(nil, int32(f.Length)-int32(wrote))
sc.sendWindowUpdate(nil, int(f.Length)-wrote)
return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
}
if wrote != len(data) {
Expand Down Expand Up @@ -2318,43 +2319,28 @@ func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {

func (sc *serverConn) noteBodyRead(st *stream, n int) {
sc.serveG.check()
sc.sendWindowUpdate(nil) // conn-level
sc.sendWindowUpdate(nil, n) // conn-level
if st.state != stateHalfClosedRemote && st.state != stateClosed {
// Don't send this WINDOW_UPDATE if the stream is closed
// remotely.
sc.sendWindowUpdate(st)
sc.sendWindowUpdate(st, n)
}
}

// st may be nil for conn-level
func (sc *serverConn) sendWindowUpdate(st *stream) {
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
sc.serveG.check()

var n int32
if st == nil {
if avail, windowSize := sc.inflow.n, sc.srv.initialConnRecvWindowSize(); avail > windowSize/2 {
return
} else {
n = windowSize - avail
}
} else {
if avail, windowSize := st.inflow.n, sc.srv.initialStreamRecvWindowSize(); avail > windowSize/2 {
return
} else {
n = windowSize - avail
}
}
// "The legal range for the increment to the flow control
// window is 1 to 2^31-1 (2,147,483,647) octets."
// A Go Read call on 64-bit machines could in theory read
// a larger Read than this. Very unlikely, but we handle it here
// rather than elsewhere for now.
const maxUint31 = 1<<31 - 1
for n >= maxUint31 {
for n > maxUint31 {
sc.sendWindowUpdate32(st, maxUint31)
n -= maxUint31
}
sc.sendWindowUpdate32(st, n)
sc.sendWindowUpdate32(st, int32(n))
}

// st may be nil for conn-level
Expand Down
112 changes: 28 additions & 84 deletions http2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,9 @@ func TestServer_Request_Post_Body_ContentLength_TooSmall(t *testing.T) {
EndHeaders: true,
})
st.writeData(1, true, []byte("12345"))
// Return flow control bytes back, since the data handler closed
// the stream.
st.wantWindowUpdate(0, 5)
})
}

Expand Down Expand Up @@ -1244,41 +1247,6 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) {

st.greet()

st.writeHeaders(HeadersFrameParam{
StreamID: 1, // clients send odd numbers
BlockFragment: st.encodeHeader(":method", "POST"),
EndStream: false, // data coming
EndHeaders: true,
})
updateSize := 1 << 20 / 2 // the conn & stream size before a WindowUpdate
st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
st.writeData(1, false, bytes.Repeat([]byte("b"), 10))
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
puppet.do(readBodyHandler(t, strings.Repeat("b", 10)))

st.wantWindowUpdate(0, uint32(updateSize))
st.wantWindowUpdate(1, uint32(updateSize))

st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
st.writeData(1, true, bytes.Repeat([]byte("c"), 15)) // END_STREAM here
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
puppet.do(readBodyHandler(t, strings.Repeat("c", 15)))

st.wantWindowUpdate(0, uint32(updateSize+5))
}

func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) {
puppet := newHandlerPuppet()
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
puppet.act(w, r)
}, func(s *Server) {
s.MaxUploadBufferPerStream = 6
})
defer st.Close()
defer puppet.done()

st.greet()

st.writeHeaders(HeadersFrameParam{
StreamID: 1, // clients send odd numbers
BlockFragment: st.encodeHeader(":method", "POST"),
Expand All @@ -1287,14 +1255,18 @@ func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) {
})
st.writeData(1, false, []byte("abcdef"))
puppet.do(readBodyHandler(t, "abc"))
puppet.do(readBodyHandler(t, "d"))
puppet.do(readBodyHandler(t, "ef"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)

st.wantWindowUpdate(1, 6)
puppet.do(readBodyHandler(t, "def"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)

st.writeData(1, true, []byte("ghijkl")) // END_STREAM here
puppet.do(readBodyHandler(t, "ghi"))
puppet.do(readBodyHandler(t, "jkl"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(0, 3) // no more stream-level, since END_STREAM
}

// the version of the TestServer_Handler_Sends_WindowUpdate with padding.
Expand Down Expand Up @@ -1323,45 +1295,12 @@ func TestServer_Handler_Sends_WindowUpdate_Padding(t *testing.T) {
st.wantWindowUpdate(1, 5)

puppet.do(readBodyHandler(t, "abc"))
puppet.do(readBodyHandler(t, "def"))
}

// This is a regression test to make sure the correct window increment size is
// calculated for a stream.
// See https://go.dev/issue/56315#issuecomment-1287642591.
func TestServer_Handler_Sends_WindowUpdate_IncrementSize(t *testing.T) {
maxSizePerConn := initialWindowSize * 2
maxSizePerStream := maxSizePerConn*2 + 100
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)

puppet := newHandlerPuppet()
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
puppet.act(w, r)
}, func(s *Server) {
s.MaxUploadBufferPerConnection = int32(maxSizePerConn)
s.MaxUploadBufferPerStream = int32(maxSizePerStream)
})
defer st.Close()
defer puppet.done()

st.greet()

st.writeHeaders(HeadersFrameParam{
StreamID: 1,
BlockFragment: st.encodeHeader(":method", "POST"),
EndStream: false,
EndHeaders: true,
})

st.writeData(1, false, bytes.Repeat([]byte("a"), maxSizePerConn/2))
puppet.do(readBodyHandler(t, strings.Repeat("a", maxSizePerConn/2)))
st.wantWindowUpdate(0, uint32(maxSizePerConn/2))

st.writeData(1, false, bytes.Repeat([]byte("b"), maxSizePerConn/2+100))
puppet.do(readBodyHandler(t, strings.Repeat("b", maxSizePerConn/2+100)))
st.wantWindowUpdate(0, uint32(maxSizePerConn/2+100))
st.wantWindowUpdate(1, uint32(maxSizePerConn+100))

st.writeData(1, true, nil) // END_STREAM here
puppet.do(readBodyHandler(t, "def"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)
}

func TestServer_Send_GoAway_After_Bogus_WindowUpdate(t *testing.T) {
Expand Down Expand Up @@ -2357,6 +2296,8 @@ func TestServer_Response_Automatic100Continue(t *testing.T) {
// gigantic and/or sensitive "foo" payload now.
st.writeData(1, true, []byte(msg))

st.wantWindowUpdate(0, uint32(len(msg)))

hf = st.wantHeaders()
if hf.StreamEnded() {
t.Fatal("expected data to follow")
Expand Down Expand Up @@ -2544,6 +2485,9 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
// it did before.
st.writeData(1, true, []byte("foo"))

// Get our flow control bytes back, since the handler didn't get them.
st.wantWindowUpdate(0, uint32(len("foo")))

// Sent after a peer sends data anyway (admittedly the
// previous RST_STREAM might've still been in-flight),
// but they'll get the more friendly 'cancel' code
Expand Down Expand Up @@ -3986,6 +3930,7 @@ func TestServer_Rejects_TooSmall(t *testing.T) {
EndHeaders: true,
})
st.writeData(1, true, []byte("12345"))
st.wantWindowUpdate(0, 5)
st.wantRSTStream(1, ErrCodeProtocol)
})
}
Expand Down Expand Up @@ -4312,6 +4257,7 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
st.writeData(1, false, []byte(content[5:]))
blockCh <- true

increments := len(content)
for {
f, err := st.readFrame()
if err == io.EOF {
Expand All @@ -4320,12 +4266,10 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if rs, ok := f.(*RSTStreamFrame); ok && rs.StreamID == 1 {
break
}
if wu, ok := f.(*WindowUpdateFrame); ok && wu.StreamID == 0 {
if e, a := uint32(3), wu.Increment; e != a {
t.Errorf("Increment=%d, want %d", a, e)
increments -= int(wu.Increment)
if increments == 0 {
break
}
}
}
Expand Down Expand Up @@ -4468,22 +4412,22 @@ func TestServerSendsEarlyHints(t *testing.T) {

func TestProtocolErrorAfterGoAway(t *testing.T) {
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.(http.Flusher).Flush()
io.Copy(io.Discard, r.Body)
})
defer st.Close()

st.greet()
content := "some content"
st.writeHeaders(HeadersFrameParam{
StreamID: 1,
BlockFragment: st.encodeHeader(
":method", "POST",
"content-length", "1",
"content-length", strconv.Itoa(len(content)),
),
EndStream: false,
EndHeaders: true,
})
st.writeData(1, false, []byte(content[:5]))

_, err := st.readFrame()
if err != nil {
Expand Down

0 comments on commit d7f77dc

Please sign in to comment.