Skip to content

Commit

Permalink
http2: client side SETTINGS_ENABLE_CONNECT_PROTOCOL support
Browse files Browse the repository at this point in the history
  • Loading branch information
WeidiDeng committed Sep 27, 2024
1 parent f238a07 commit 4e0170f
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 22 deletions.
4 changes: 4 additions & 0 deletions http2/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (s Setting) Valid() error {
if s.Val < 16384 || s.Val > 1<<24-1 {
return ConnectionError(ErrCodeProtocol)
}
case SettingEnableConnectProtocol:
if s.Val != 1 && s.Val != 0 {
return ConnectionError(ErrCodeProtocol)
}
}
return nil
}
Expand Down
79 changes: 57 additions & 22 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,25 +335,26 @@ type ClientConn struct {
idleTimeout time.Duration // or 0 for never
idleTimer timer

mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow outflow // our conn-level flow control quota (cs.outflow is per stream)
inflow inflow // peer's conn-level flow control
doNotReuse bool // whether conn is marked to not be reused for any future requests
closing bool
closed bool
seenSettings bool // true if we've seen a settings frame, false otherwise
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
br *bufio.Reader
lastActive time.Time
lastIdle time.Time // time last idle
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow outflow // our conn-level flow control quota (cs.outflow is per stream)
inflow inflow // peer's conn-level flow control
doNotReuse bool // whether conn is marked to not be reused for any future requests
closing bool
closed bool
seenSettings bool // true if we've seen a settings frame, false otherwise
seenSettingsChan chan struct{} // closed when seenSettings is true or frame reading fails
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
br *bufio.Reader
lastActive time.Time
lastIdle time.Time // time last idle
// Settings from peer: (also guarded by wmu)
maxFrameSize uint32
maxConcurrentStreams uint32
Expand All @@ -363,6 +364,7 @@ type ClientConn struct {
initialStreamRecvWindowSize int32
readIdleTimeout time.Duration
pingTimeout time.Duration
extendedConnecAllowed bool

// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
// Write to reqHeaderMu to lock it, read from it to unlock.
Expand Down Expand Up @@ -752,6 +754,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
seenSettingsChan: make(chan struct{}),
wantSettingsAck: true,
readIdleTimeout: conf.SendPingTimeout,
pingTimeout: conf.PingTimeout,
Expand Down Expand Up @@ -1376,6 +1379,8 @@ func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)
cs.cleanupWriteRequest(err)
}

var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")

// writeRequest sends a request.
//
// It returns nil after the request is written, the response read,
Expand Down Expand Up @@ -1405,7 +1410,20 @@ func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStre
return ctx.Err()
}

// wait for setting frames to be received, a server can change this value later,
// but we just wait for the first settings frame
var isExtendedConnect bool
if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
isExtendedConnect = true
<-cc.seenSettingsChan
}

cc.mu.Lock()
if isExtendedConnect && !cc.extendedConnecAllowed {
cc.mu.Unlock()
<-cc.reqHeaderMu
return errExtendedConnectNotSupported
}
if cc.idleTimer != nil {
cc.idleTimer.Stop()
}
Expand Down Expand Up @@ -1910,7 +1928,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)

func validateHeaders(hdrs http.Header) string {
for k, vv := range hdrs {
if !httpguts.ValidHeaderFieldName(k) {
if !httpguts.ValidHeaderFieldName(k) && k != ":protocol" {
return fmt.Sprintf("name %q", k)
}
for _, v := range vv {
Expand All @@ -1926,6 +1944,10 @@ func validateHeaders(hdrs http.Header) string {

var errNilRequestURL = errors.New("http2: Request.URI is nil")

func isNormalConnect(req *http.Request) bool {
return req.Method == "CONNECT" && req.Header.Get(":protocol") == ""
}

// requires cc.wmu be held.
func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
cc.hbuf.Reset()
Expand All @@ -1946,7 +1968,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
}

var path string
if req.Method != "CONNECT" {
if !isNormalConnect(req) {
path = req.URL.RequestURI()
if !validPseudoPath(path) {
orig := path
Expand Down Expand Up @@ -1983,7 +2005,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
m = http.MethodGet
}
f(":method", m)
if req.Method != "CONNECT" {
if !isNormalConnect(req) {
f(":path", path)
f(":scheme", req.URL.Scheme)
}
Expand Down Expand Up @@ -2370,6 +2392,9 @@ func (rl *clientConnReadLoop) run() error {
if VerboseLogs {
cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
}
if !cc.seenSettings {
close(cc.seenSettingsChan)
}
return err
}
}
Expand Down Expand Up @@ -2917,6 +2942,15 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
case SettingHeaderTableSize:
cc.henc.SetMaxDynamicTableSize(s.Val)
cc.peerMaxHeaderTableSize = s.Val
case SettingEnableConnectProtocol:
if err := s.Valid(); err != nil {
return err
}
// RFC 8441 section, https://datatracker.ietf.org/doc/html/rfc8441#section-3
if s.Val == 0 && cc.extendedConnecAllowed {
return ConnectionError(ErrCodeProtocol)
}
cc.extendedConnecAllowed = s.Val == 1
default:
cc.vlogf("Unhandled Setting: %v", s)
}
Expand All @@ -2934,6 +2968,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
// connection can establish to our default.
cc.maxConcurrentStreams = defaultMaxConcurrentStreams
}
close(cc.seenSettingsChan)
cc.seenSettings = true
}

Expand Down
59 changes: 59 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5421,3 +5421,62 @@ func TestIssue67671(t *testing.T) {
res.Body.Close()
}
}

func TestExtendedConnectClientWithServerSupport(t *testing.T) {
disableExtendedConnectProtocol = false
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
t.Log(io.Copy(w, r.Body))
})
tr := &Transport{
TLSClientConfig: tlsConfigInsecure,
AllowHTTP: true,
}
defer tr.CloseIdleConnections()
pr, pw := io.Pipe()
pwDone := make(chan struct{})
req, _ := http.NewRequest("CONNECT", ts.URL, pr)
req.Header.Set(":protocol", "extended-connect")
go func() {
pw.Write([]byte("hello, extended connect"))
pw.Close()
close(pwDone)
}()

res, err := tr.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
body, err := io.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(body, []byte("hello, extended connect")) {
t.Fatal("unexpected body received")
}
}

func TestExtendedConnectClientWithoutServerSupport(t *testing.T) {
disableExtendedConnectProtocol = true
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
io.Copy(w, r.Body)
})
tr := &Transport{
TLSClientConfig: tlsConfigInsecure,
AllowHTTP: true,
}
defer tr.CloseIdleConnections()
pr, pw := io.Pipe()
pwDone := make(chan struct{})
req, _ := http.NewRequest("CONNECT", ts.URL, pr)
req.Header.Set(":protocol", "extended-connect")
go func() {
pw.Write([]byte("hello, extended connect"))
pw.Close()
close(pwDone)
}()

_, err := tr.RoundTrip(req)
if !errors.Is(err, errExtendedConnectNotSupported) {
t.Fatalf("expected error errExtendedConnectNotSupported, got: %v", err)
}
}

0 comments on commit 4e0170f

Please sign in to comment.