Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: add timeout for writing GOAWAY on http2Client.Close() #7371

Merged
merged 19 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
// atomically.
var clientConnectionCounter uint64

var goAwayLoopyWriterTimeout = 5 * time.Second

var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))

// http2Client implements the ClientTransport interface with HTTP2.
Expand Down Expand Up @@ -983,6 +985,7 @@
// only once on a transport. Once it is called, the transport should not be
// accessed anymore.
func (t *http2Client) Close(err error) {
t.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
t.mu.Lock()
// Make sure we only close once.
if t.state == closing {
Expand All @@ -1006,10 +1009,20 @@
t.kpDormancyCond.Signal()
}
t.mu.Unlock()

// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
aranjans marked this conversation as resolved.
Show resolved Hide resolved
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY.
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It
// also waits for loopyWriter to be closed with a timer to avoid the
// long blocking in case the connection is blackholed, i.e. TCP is
// just stuck.
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
<-t.writerDone
timer := time.NewTimer(goAwayLoopyWriterTimeout)
defer timer.Stop()
select {
case <-t.writerDone: // success
case <-timer.C:
t.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout)

Check warning on line 1024 in internal/transport/http2_client.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/http2_client.go#L1023-L1024

Added lines #L1023 - L1024 were not covered by tests
}
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelz.ID)
Expand Down
59 changes: 56 additions & 3 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

const goAwayFrameSize = 42

var (
expectedRequest = []byte("ping")
expectedResponse = []byte("pong")
Expand Down Expand Up @@ -2424,7 +2426,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
TransportCredentials: creds,
ChannelzParent: channelzSubChannel(t),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
tr, err := NewClientTransport(ctx, ctx, addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down Expand Up @@ -2465,7 +2467,7 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {
Dialer: dialer,
ChannelzParent: channelzSubChannel(t),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
tr, err := NewClientTransport(ctx, ctx, addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down Expand Up @@ -2725,7 +2727,7 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
}
}()

ct, err := NewClientTransport(ctx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {})
ct, err := NewClientTransport(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func(GoAwayReason) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
Expand All @@ -2746,3 +2748,54 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
t.Errorf("Context timed out")
}
}

// hangingConn is a net.Conn wrapper for testing, simulating hanging connections
// after a GOAWAY frame is sent, of which Write operations pause until explicitly
// signaled or a timeout occurs.
type hangingConn struct {
net.Conn
hangConn chan struct{}
}

func (hc *hangingConn) Write(b []byte) (n int, err error) {
n, err = hc.Conn.Write(b)
if n == goAwayFrameSize { // hang the conn after the goAway is received
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove the atomic?

This is worse, since you're just assuming that: 1. Nothing else written will be the size of the GOAWAY frame, and 2. the GOAWAY will be written to the connection, by itself, in a single operation.

<-hc.hangConn
}
return n, err
}

func hangingDialer(_ context.Context, addr string) (net.Conn, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
return &hangingConn{Conn: conn, hangConn: make(chan struct{})}, nil
}

// Tests the scenario where a client transport is closed and writing of the
// GOAWAY frame as part of the close does not complete because of a network
// hang. The test verifies that the client transport is closed without waiting
// for too long.
func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) {
// Override timer for writing GOAWAY to 0 so that the connection write
// always times out. It is equivalent of real network hang when conn
// write for goaway doesn't finish in specified deadline
dfawley marked this conversation as resolved.
Show resolved Hide resolved
origGoAwayLoopyTimeout := goAwayLoopyWriterTimeout
goAwayLoopyWriterTimeout = time.Millisecond
defer func() {
goAwayLoopyWriterTimeout = origGoAwayLoopyTimeout
}()

// Create the server set up.
server, ct, cancel := setUpWithOptions(t, 0, &ServerConfig{}, normal, ConnectOptions{Dialer: hangingDialer})
defer cancel()
defer server.stop()
dfawley marked this conversation as resolved.
Show resolved Hide resolved

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := ct.NewStream(ctx, &CallHdr{}); err != nil {
t.Fatalf("Failed to open stream: %v", err)
}
ct.Close(errors.New("manually closed by client"))
easwars marked this conversation as resolved.
Show resolved Hide resolved
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
Loading