Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: Discard the buffer when empty after http connect handshake #7424

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/transport/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,14 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}

return &bufConn{Conn: conn, r: r}, nil
// The buffer could contain extra bytes from the target server, so we can't
// discard it. However, in many cases where the server waits for the client
// to send the first message (e.g. when TLS is being used), the buffer will
// be empty, so we can avoid the overhead of reading through this buffer.
if r.Buffered() != 0 {
return &bufConn{Conn: conn, r: r}, nil
}
return conn, nil
}

// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
Expand Down
52 changes: 50 additions & 2 deletions internal/transport/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package transport

import (
"bufio"
"bytes"
"context"
"encoding/base64"
"fmt"
Expand Down Expand Up @@ -84,7 +85,24 @@ func (p *proxyServer) run() {
return
}
resp := http.Response{StatusCode: http.StatusOK, Proto: "HTTP/1.0"}
resp.Write(p.in)
var buf bytes.Buffer
resp.Write(&buf)
// Batch the first message from the server with the http connect
// response. This is done to test the cases in which the grpc client has
// the response to the connect request and proxied packets from the
// destination server when it reads the transport.
out.SetReadDeadline(time.Now().Add(20 * time.Millisecond))
b := make([]byte, 50)
bytesRead, err := out.Read(b)
// The read is expected to fail with deadline exceeded if the server doesn't
// have a message to send.
if err != nil {
p.t.Logf("Got error while reading message from server: %v", err)
}
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// reset the deadline.
out.SetReadDeadline(time.Time{})
buf.Write(b[0:bytesRead])
p.in.Write(buf.Bytes())
p.out = out
go io.Copy(p.in, p.out)
go io.Copy(p.out, p.in)
Expand All @@ -100,7 +118,7 @@ func (p *proxyServer) stop() {
}
}

func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxyReqCheck func(*http.Request) error) {
func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxyReqCheck func(*http.Request) error, serverMessage []byte) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should add a struct about now to hold all the parameters (unless we decide to test this code differently per the above comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

plis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
Expand Down Expand Up @@ -128,6 +146,7 @@ func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxy
return
}
defer in.Close()
in.Write(serverMessage)
in.Read(recvBuf)
done <- nil
}()
Expand Down Expand Up @@ -157,6 +176,18 @@ func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxy
if string(recvBuf) != string(msg) {
t.Fatalf("received msg: %v, want %v", recvBuf, msg)
}

c.SetReadDeadline(time.Now().Add(20 * time.Millisecond))

gotServerMessage := make([]byte, len(serverMessage))
// This call will return a deadline exceeded error if the server has nothing
// to send. This is expected.
if _, err := c.Read(gotServerMessage); err != nil {
t.Logf("Got error while reading message from server: %v", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar; I don't really like adding a magic deadline to things that we know will timeout sometimes.

Probably we should only do this read if len(serverMessage) != 0, and then we can apply a longer deadline while not impacting code that doesn't use it?

Copy link
Contributor Author

@arjan-bal arjan-bal Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed, now the proxy is configured to wait for server hello.

if string(gotServerMessage) != string(serverMessage) {
t.Fatalf("message from server: %v, want %v", gotServerMessage, serverMessage)
}
}

func (s) TestHTTPConnect(t *testing.T) {
Expand All @@ -170,6 +201,22 @@ func (s) TestHTTPConnect(t *testing.T) {
}
return nil
},
nil,
)
}

func (s) TestHTTPConnectWithServerHello(t *testing.T) {
testHTTPConnect(t,
func(in *url.URL) *url.URL {
return in
},
func(req *http.Request) error {
if req.Method != http.MethodConnect {
return fmt.Errorf("unexpected Method %q, want %q", req.Method, http.MethodConnect)
}
return nil
},
[]byte("server-hello"),
)
}

Expand All @@ -195,6 +242,7 @@ func (s) TestHTTPConnectBasicAuth(t *testing.T) {
}
return nil
},
nil,
)
}

Expand Down