Skip to content

Commit

Permalink
http2: detect hung client connections by confirming stream resets
Browse files Browse the repository at this point in the history
Consider the case of an unresponsive client connection, where
the server has stopped responding. We send an infinite sequence of
requests to the connection in sequence, each with a timeout.
Each request counts against the concurrency limit for the
connection while active, but when a request times out we send
a RST_STREAM and free up the concurrency slot it was using.

We continue to try to send requests to the connection forever (or
until the kernel closes the underlying TCP connection, or until
ReadIdleTimeout/WriteByteTimeout results in us closing the connection).

Defend against this scenario by counting a canceled request
against the connection concurrency limit until we confirm the
server is responding. Specifically:

Track the number of in-flight request cancellations in cc.pendingResets.
This total counts against the connection concurrency limit.

When sending a RST_STREAM for a canceled request, increment
cc.pendingResets. Send a PING frame to the server, unless a PING
is already in flight.

When receiving a PING response, set cc.pendingResets to 0.

A hung connection will be used for at most
SETTINGS_MAX_CONCURRENT_STREAMS requests.

When StrictMaxConcurrentStreams is false, we will create a
new connection after reaching the concurrency limit for a hung one.

When StrictMaxConcurrentStreams is true, we will continue to
wait for the existing connection until some timeout closes it
or it becomes responsive again.

For golang/go#59690

Change-Id: I0151f9a594af14b32bcb6005a239fa19eb103704
Reviewed-on: https://go-review.googlesource.com/c/net/+/617655
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Carlos Amedee <carlos@golang.org>
  • Loading branch information
neild committed Nov 1, 2024
1 parent e883dae commit f35fec9
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 7 deletions.
8 changes: 8 additions & 0 deletions http2/http2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,11 @@ func TestNoUnicodeStrings(t *testing.T) {
t.Fatal(err)
}
}

// must returns v if err is nil, or panics otherwise.
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}
71 changes: 64 additions & 7 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ type ClientConn struct {
readIdleTimeout time.Duration
pingTimeout time.Duration

// pendingResets is the number of RST_STREAM frames we have sent to the peer,
// without confirming that the peer has received them. When we send a RST_STREAM,
// we bundle it with a PING frame, unless a PING is already in flight. We count
// the reset stream against the connection's concurrency limit until we get
// a PING response. This limits the number of requests we'll try to send to a
// completely unresponsive connection.
pendingResets int

// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
// Write to reqHeaderMu to lock it, read from it to unlock.
// Lock reqmu BEFORE mu or wmu.
Expand Down Expand Up @@ -960,7 +968,7 @@ func (cc *ClientConn) State() ClientConnState {
return ClientConnState{
Closed: cc.closed,
Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
StreamsActive: len(cc.streams),
StreamsActive: len(cc.streams) + cc.pendingResets,
StreamsReserved: cc.streamsReserved,
StreamsPending: cc.pendingRequests,
LastIdle: cc.lastIdle,
Expand Down Expand Up @@ -992,7 +1000,13 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
// writing it.
maxConcurrentOkay = true
} else {
maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams)
// We can take a new request if the total of
// - active streams;
// - reservation slots for new streams; and
// - streams for which we have sent a RST_STREAM and a PING,
// but received no subsequent frame
// is less than the concurrency limit.
maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
}

st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
Expand All @@ -1002,6 +1016,12 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
return
}

// currentRequestCountLocked reports the number of concurrency slots currently in use,
// including active streams, reserved slots, and reset streams waiting for acknowledgement.
func (cc *ClientConn) currentRequestCountLocked() int {
return len(cc.streams) + cc.streamsReserved + cc.pendingResets
}

func (cc *ClientConn) canTakeNewRequestLocked() bool {
st := cc.idleStateLocked()
return st.canTakeNewRequest
Expand Down Expand Up @@ -1578,6 +1598,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
cs.reqBodyClosed = make(chan struct{})
}
bodyClosed := cs.reqBodyClosed
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
cc.mu.Unlock()
if mustCloseBody {
cs.reqBody.Close()
Expand All @@ -1602,16 +1623,40 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
if cs.sentHeaders {
if se, ok := err.(StreamError); ok {
if se.Cause != errFromPeer {
cc.writeStreamReset(cs.ID, se.Code, err)
cc.writeStreamReset(cs.ID, se.Code, false, err)
}
} else {
cc.writeStreamReset(cs.ID, ErrCodeCancel, err)
// We're cancelling an in-flight request.
//
// This could be due to the server becoming unresponsive.
// To avoid sending too many requests on a dead connection,
// we let the request continue to consume a concurrency slot
// until we can confirm the server is still responding.
// We do this by sending a PING frame along with the RST_STREAM
// (unless a ping is already in flight).
//
// For simplicity, we don't bother tracking the PING payload:
// We reset cc.pendingResets any time we receive a PING ACK.
//
// We skip this if the conn is going to be closed on idle,
// because it's short lived and will probably be closed before
// we get the ping response.
ping := false
if !closeOnIdle {
cc.mu.Lock()
if cc.pendingResets == 0 {
ping = true
}
cc.pendingResets++
cc.mu.Unlock()
}
cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
}
}
cs.bufPipe.CloseWithError(err) // no-op if already closed
} else {
if cs.sentHeaders && !cs.sentEndStream {
cc.writeStreamReset(cs.ID, ErrCodeNo, nil)
cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
}
cs.bufPipe.CloseWithError(errRequestCanceled)
}
Expand All @@ -1638,7 +1683,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
return errClientConnUnusable
}
cc.lastIdle = time.Time{}
if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) {
if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
return nil
}
cc.pendingRequests++
Expand Down Expand Up @@ -3065,6 +3110,11 @@ func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
close(c)
delete(cc.pings, f.Data)
}
if cc.pendingResets > 0 {
// See clientStream.cleanupWriteRequest.
cc.pendingResets = 0
cc.cond.Broadcast()
}
return nil
}
cc := rl.cc
Expand All @@ -3087,13 +3137,20 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
return ConnectionError(ErrCodeProtocol)
}

func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
// writeStreamReset sends a RST_STREAM frame.
// When ping is true, it also sends a PING frame with a random payload.
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
// TODO: map err to more interesting error codes, once the
// HTTP community comes up with some. But currently for
// RST_STREAM there's no equivalent to GOAWAY frame's debug
// data, and the error codes are all pretty vague ("cancel").
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
if ping {
var payload [8]byte
rand.Read(payload[:])
cc.fr.WritePing(false, payload)
}
cc.bw.Flush()
cc.wmu.Unlock()
}
Expand Down
126 changes: 126 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,9 @@ func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
}
return true
},
func(f *PingFrame) bool {
return true
},
func(f *WindowUpdateFrame) bool {
if !oneDataFrame && !sentAdditionalData {
t.Fatalf("Got WindowUpdateFrame, don't expect one yet")
Expand Down Expand Up @@ -5512,3 +5515,126 @@ func TestTransport1xxLimits(t *testing.T) {
})
}
}

func TestTransportSendPingWithReset(t *testing.T) {
tc := newTestClientConn(t, func(tr *Transport) {
tr.StrictMaxConcurrentStreams = true
})

const maxConcurrent = 3
tc.greet(Setting{SettingMaxConcurrentStreams, maxConcurrent})

// Start several requests.
var rts []*testRoundTrip
for i := 0; i < maxConcurrent+1; i++ {
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt := tc.roundTrip(req)
if i >= maxConcurrent {
tc.wantIdle()
continue
}
tc.wantFrameType(FrameHeaders)
tc.writeHeaders(HeadersFrameParam{
StreamID: rt.streamID(),
EndHeaders: true,
BlockFragment: tc.makeHeaderBlockFragment(
":status", "200",
),
})
rt.wantStatus(200)
rts = append(rts, rt)
}

// Cancel one request. We send a PING frame along with the RST_STREAM.
rts[0].response().Body.Close()
tc.wantRSTStream(rts[0].streamID(), ErrCodeCancel)
pf := readFrame[*PingFrame](t, tc)
tc.wantIdle()

// Cancel another request. No PING frame, since one is in flight.
rts[1].response().Body.Close()
tc.wantRSTStream(rts[1].streamID(), ErrCodeCancel)
tc.wantIdle()

// Respond to the PING.
// This finalizes the previous resets, and allows the pending request to be sent.
tc.writePing(true, pf.Data)
tc.wantFrameType(FrameHeaders)
tc.wantIdle()

// Cancel the last request. We send another PING, since none are in flight.
rts[2].response().Body.Close()
tc.wantRSTStream(rts[2].streamID(), ErrCodeCancel)
tc.wantFrameType(FramePing)
tc.wantIdle()
}

func TestTransportConnBecomesUnresponsive(t *testing.T) {
// We send a number of requests in series to an unresponsive connection.
// Each request is canceled or times out without a response.
// Eventually, we open a new connection rather than trying to use the old one.
tt := newTestTransport(t)

const maxConcurrent = 3

t.Logf("first request opens a new connection and succeeds")
req1 := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt1 := tt.roundTrip(req1)
tc1 := tt.getConn()
tc1.wantFrameType(FrameSettings)
tc1.wantFrameType(FrameWindowUpdate)
hf1 := readFrame[*HeadersFrame](t, tc1)
tc1.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent})
tc1.wantFrameType(FrameSettings) // ack
tc1.writeHeaders(HeadersFrameParam{
StreamID: hf1.StreamID,
EndHeaders: true,
EndStream: true,
BlockFragment: tc1.makeHeaderBlockFragment(
":status", "200",
),
})
rt1.wantStatus(200)
rt1.response().Body.Close()

// Send more requests.
// None receive a response.
// Each is canceled.
for i := 0; i < maxConcurrent; i++ {
t.Logf("request %v receives no response and is canceled", i)
ctx, cancel := context.WithCancel(context.Background())
req := must(http.NewRequestWithContext(ctx, "GET", "https://dummy.tld/", nil))
tt.roundTrip(req)
if tt.hasConn() {
t.Fatalf("new connection created; expect existing conn to be reused")
}
tc1.wantFrameType(FrameHeaders)
cancel()
tc1.wantFrameType(FrameRSTStream)
if i == 0 {
tc1.wantFrameType(FramePing)
}
tc1.wantIdle()
}

// The conn has hit its concurrency limit.
// The next request is sent on a new conn.
req2 := must(http.NewRequest("GET", "https://dummy.tld/", nil))
rt2 := tt.roundTrip(req2)
tc2 := tt.getConn()
tc2.wantFrameType(FrameSettings)
tc2.wantFrameType(FrameWindowUpdate)
hf := readFrame[*HeadersFrame](t, tc2)
tc2.writeSettings(Setting{SettingMaxConcurrentStreams, maxConcurrent})
tc2.wantFrameType(FrameSettings) // ack
tc2.writeHeaders(HeadersFrameParam{
StreamID: hf.StreamID,
EndHeaders: true,
EndStream: true,
BlockFragment: tc2.makeHeaderBlockFragment(
":status", "200",
),
})
rt2.wantStatus(200)
rt2.response().Body.Close()
}

0 comments on commit f35fec9

Please sign in to comment.