From 67fa7970c50f2a5b48fbdd38f8dff9b6d935131a Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 7 Jan 2016 00:51:13 +0000 Subject: [PATCH] net/http: update bundled http2; fixes TestTransportAndServerSharedBodyRace_h2 Update bundled http2 to git rev d1ba260648 (https://golang.org/cl/18288). Fixes the flaky TestTransportAndServerSharedBodyRace_h2. Also adds some debugging to TestTransportAndServerSharedBodyRace_h2 which I hope won't ever be necessary again, but I know will be. Fixes #13556 Change-Id: Ibcf2fc23ec0122dcac8891fdc3bd7f8acddd880e Reviewed-on: https://go-review.googlesource.com/18289 Reviewed-by: Andrew Gerrand Run-TryBot: Brad Fitzpatrick TryBot-Result: Gobot Gobot --- src/net/http/h2_bundle.go | 53 +++++++++++++++++++++++++++++++++----- src/net/http/serve_test.go | 53 ++++++++++++++++++++++++++++++++++---- 2 files changed, 95 insertions(+), 11 deletions(-) diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index c7bf2ab84d800..030ca207295cf 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -4219,6 +4219,8 @@ type http2clientStream struct { peerReset chan struct{} // closed on peer reset resetErr error // populated before peerReset is closed + done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu + // owned by clientConnReadLoop: headersDone bool // got HEADERS w/ END_HEADERS trailersDone bool // got second HEADERS frame w/ END_HEADERS @@ -4227,7 +4229,11 @@ type http2clientStream struct { resTrailer Header // client's Response.Trailer } -// awaitRequestCancel runs in its own goroutine and waits for the user's +// awaitRequestCancel runs in its own goroutine and waits for the user +// to either cancel a RoundTrip request (using the provided +// Request.Cancel channel), or for the request to be done (any way it +// might be removed from the cc.streams map: peer reset, successful +// completion, TCP connection breakage, etc) func (cs *http2clientStream) awaitRequestCancel(cancel <-chan struct{}) { if cancel == nil { return @@ -4235,7 +4241,8 @@ func (cs *http2clientStream) awaitRequestCancel(cancel <-chan struct{}) { select { case <-cancel: cs.bufPipe.CloseWithError(http2errRequestCanceled) - case <-cs.bufPipe.Done(): + cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + case <-cs.done: } } @@ -4594,6 +4601,11 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { cc.mu.Unlock() if werr != nil { + if hasBody { + req.Body.Close() + } + cc.forgetStreamID(cs.ID) + return nil, werr } @@ -4605,26 +4617,47 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { }() } + readLoopResCh := cs.resc + requestCanceledCh := http2requestCancel(req) + requestCanceled := false for { select { - case re := <-cs.resc: + case re := <-readLoopResCh: res := re.res if re.err != nil || res.StatusCode > 299 { cs.abortRequestBodyWrite() } if re.err != nil { + cc.forgetStreamID(cs.ID) return nil, re.err } res.Request = req res.TLS = cc.tlsState return res, nil - case <-http2requestCancel(req): + case <-requestCanceledCh: + cc.forgetStreamID(cs.ID) cs.abortRequestBodyWrite() - return nil, http2errRequestCanceled + if !hasBody { + cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + return nil, http2errRequestCanceled + } + + requestCanceled = true + requestCanceledCh = nil + readLoopResCh = nil case <-cs.peerReset: + if requestCanceled { + + return nil, http2errRequestCanceled + } + return nil, cs.resetErr case err := <-bodyCopyErrc: + if requestCanceled { + cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + return nil, http2errRequestCanceled + } if err != nil { return nil, err } @@ -4856,6 +4889,7 @@ func (cc *http2ClientConn) newStream() *http2clientStream { ID: cc.nextStreamID, resc: make(chan http2resAndError, 1), peerReset: make(chan struct{}), + done: make(chan struct{}), } cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) @@ -4866,12 +4900,17 @@ func (cc *http2ClientConn) newStream() *http2clientStream { return cs } +func (cc *http2ClientConn) forgetStreamID(id uint32) { + cc.streamByID(id, true) +} + func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream { cc.mu.Lock() defer cc.mu.Unlock() cs := cc.streams[id] - if andRemove { + if andRemove && cs != nil { delete(cc.streams, id) + close(cs.done) } return cs } @@ -4926,6 +4965,7 @@ func (rl *http2clientConnReadLoop) cleanup() { case cs.resc <- http2resAndError{err: err}: default: } + close(cs.done) } cc.closed = true cc.cond.Broadcast() @@ -5291,6 +5331,7 @@ func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, cc.wmu.Lock() cc.fr.WriteRSTStream(streamID, code) + cc.bw.Flush() cc.wmu.Unlock() } diff --git a/src/net/http/serve_test.go b/src/net/http/serve_test.go index cbe85d255bb4d..4a006fb369bf5 100644 --- a/src/net/http/serve_test.go +++ b/src/net/http/serve_test.go @@ -27,6 +27,7 @@ import ( "os/exec" "reflect" "runtime" + "runtime/debug" "sort" "strconv" "strings" @@ -3038,7 +3039,6 @@ func TestTransportAndServerSharedBodyRace_h1(t *testing.T) { testTransportAndServerSharedBodyRace(t, h1Mode) } func TestTransportAndServerSharedBodyRace_h2(t *testing.T) { - t.Skip("failing in http2 mode; golang.org/issue/13556") testTransportAndServerSharedBodyRace(t, h2Mode) } func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) { @@ -3046,11 +3046,40 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) { const bodySize = 1 << 20 + // errorf is like t.Errorf, but also writes to println. When + // this test fails, it hangs. This helps debugging and I've + // added this enough times "temporarily". It now gets added + // full time. + errorf := func(format string, args ...interface{}) { + v := fmt.Sprintf(format, args...) + println(v) + t.Error(v) + } + unblockBackend := make(chan bool) backend := newClientServerTest(t, h2, HandlerFunc(func(rw ResponseWriter, req *Request) { - io.CopyN(rw, req.Body, bodySize) + gone := rw.(CloseNotifier).CloseNotify() + didCopy := make(chan interface{}) + go func() { + n, err := io.CopyN(rw, req.Body, bodySize) + didCopy <- []interface{}{n, err} + }() + isGone := false + Loop: + for { + select { + case <-didCopy: + break Loop + case <-gone: + isGone = true + case <-time.After(time.Second): + println("1 second passes in backend, proxygone=", isGone) + } + } <-unblockBackend })) + var quitTimer *time.Timer + defer func() { quitTimer.Stop() }() defer backend.close() backendRespc := make(chan *Response, 1) @@ -3063,17 +3092,17 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) { bresp, err := proxy.c.Do(req2) if err != nil { - t.Errorf("Proxy outbound request: %v", err) + errorf("Proxy outbound request: %v", err) return } _, err = io.CopyN(ioutil.Discard, bresp.Body, bodySize/2) if err != nil { - t.Errorf("Proxy copy error: %v", err) + errorf("Proxy copy error: %v", err) return } backendRespc <- bresp // to close later - // Try to cause a race: Both the DefaultTransport and the proxy handler's Server + // Try to cause a race: Both the Transport and the proxy handler's Server // will try to read/close req.Body (aka req2.Body) if h2 { close(cancel) @@ -3083,6 +3112,20 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) { rw.Write([]byte("OK")) })) defer proxy.close() + defer func() { + // Before we shut down our two httptest.Servers, start a timer. + // We choose 7 seconds because httptest.Server starts logging + // warnings to stderr at 5 seconds. If we don't disarm this bomb + // in 7 seconds (after the two httptest.Server.Close calls above), + // then we explode with stacks. + quitTimer = time.AfterFunc(7*time.Second, func() { + debug.SetTraceback("ALL") + stacks := make([]byte, 1<<20) + stacks = stacks[:runtime.Stack(stacks, true)] + fmt.Fprintf(os.Stderr, "%s", stacks) + log.Fatalf("Timeout.") + }) + }() defer close(unblockBackend) req, _ := NewRequest("POST", proxy.ts.URL, io.LimitReader(neverEnding('a'), bodySize))