Skip to content

Commit 366decf

Browse files
authored
transport/grpchttp2: add http2.Framer bridge (#7453)
1 parent 5c4da09 commit 366decf

File tree

3 files changed

+911
-0
lines changed

3 files changed

+911
-0
lines changed

internal/transport/grpchttp2/framer.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package grpchttp2
2121

2222
import "golang.org/x/net/http2/hpack"
2323

24+
const initHeaderTableSize = 4096 // Default HTTP/2 header table size.
25+
2426
// FrameType represents the type of an HTTP/2 Frame.
2527
// See [Frame Type].
2628
//
@@ -55,6 +57,12 @@ const (
5557
FlagContinuationEndHeaders Flag = 0x4
5658
)
5759

60+
// IsSet returns a boolean indicating whether the passed flag is set on this
61+
// flag instance.
62+
func (f Flag) IsSet(flag Flag) bool {
63+
return f&flag != 0
64+
}
65+
5866
// Setting represents the id and value pair of an HTTP/2 setting.
5967
// See [Setting Format].
6068
//
@@ -105,6 +113,7 @@ type FrameHeader struct {
105113
//
106114
// Each concrete Frame type defined below implements the Frame interface.
107115
type Frame interface {
116+
// Header returns the HTTP/2 9 byte header from the current Frame.
108117
Header() *FrameHeader
109118
// Free frees the underlying buffer if present so it can be reused by the
110119
// framer.
@@ -258,6 +267,9 @@ func (f *WindowUpdateFrame) Header() *FrameHeader {
258267
return f.hdr
259268
}
260269

270+
// Free is a no-op for WindowUpdateFrame.
271+
func (f *WindowUpdateFrame) Free() {}
272+
261273
// ContinuationFrame is the representation of a [CONTINUATION Frame]. The
262274
// CONTINUATION frame is used to continue a sequence of header block fragments.
263275
//
@@ -302,6 +314,26 @@ func (f *MetaHeadersFrame) Header() *FrameHeader {
302314
// Free is a no-op for MetaHeadersFrame.
303315
func (f *MetaHeadersFrame) Free() {}
304316

317+
// UnknownFrame is a struct that is returned when the framer encounters an
318+
// unsupported frame.
319+
type UnknownFrame struct {
320+
hdr *FrameHeader
321+
Payload []byte
322+
free func()
323+
}
324+
325+
// Header returns the 9 byte HTTP/2 header for this frame.
326+
func (f *UnknownFrame) Header() *FrameHeader {
327+
return f.hdr
328+
}
329+
330+
// Free frees the underlying data in the frame.
331+
func (f *UnknownFrame) Free() {
332+
if f.free != nil {
333+
f.free()
334+
}
335+
}
336+
305337
// Framer encapsulates the functionality to read and write HTTP/2 frames.
306338
type Framer interface {
307339
// ReadFrame returns grpchttp2.Frame. It is the caller's responsibility to
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package grpchttp2
20+
21+
import (
22+
"io"
23+
24+
"golang.org/x/net/http2"
25+
"golang.org/x/net/http2/hpack"
26+
"google.golang.org/grpc/mem"
27+
)
28+
29+
// FramerBridge adapts the net/x/http2 Framer to satisfy the grpchttp2.Framer
30+
// interface.
31+
//
32+
// Note: This allows temporary use of the older framer and will be removed in
33+
// a future release after the new framer stabilizes.
34+
type FramerBridge struct {
35+
framer *http2.Framer // the underlying http2.Framer implementation to perform reads and writes.
36+
pool mem.BufferPool // a pool to reuse buffers when reading.
37+
}
38+
39+
// NewFramerBridge creates an adaptor that wraps a http2.Framer in a
40+
// grpchttp2.Framer.
41+
//
42+
// Internally, it creates a http2.Framer that uses the provided io.Reader and
43+
// io.Writer, and is configured with a maximum header list size of
44+
// maxHeaderListSize.
45+
//
46+
// Frames returned by a call to the underlying http2.Framer's ReadFrame() method
47+
// need to be consumed before the next call to it. To overcome this restriction,
48+
// the data in a Frame returned by the http2.Framer's ReadFrame is copied into a
49+
// buffer from the given pool. If no pool is provided, a default pool provided
50+
// by the mem package is used.
51+
func NewFramerBridge(w io.Writer, r io.Reader, maxHeaderListSize uint32, pool mem.BufferPool) *FramerBridge {
52+
fr := http2.NewFramer(w, r)
53+
fr.SetReuseFrames()
54+
fr.MaxHeaderListSize = maxHeaderListSize
55+
fr.ReadMetaHeaders = hpack.NewDecoder(initHeaderTableSize, nil)
56+
57+
if pool == nil {
58+
pool = mem.DefaultBufferPool()
59+
}
60+
61+
return &FramerBridge{
62+
framer: fr,
63+
pool: pool,
64+
}
65+
}
66+
67+
// ReadFrame reads a frame from the underlying http2.Framer and returns a
68+
// Frame defined in the grpchttp2 package. This operation copies the data to a
69+
// buffer from the pool, making it safe to use even after another call to
70+
// ReadFrame.
71+
func (fr *FramerBridge) ReadFrame() (Frame, error) {
72+
f, err := fr.framer.ReadFrame()
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
h := f.Header()
78+
hdr := &FrameHeader{
79+
Size: h.Length,
80+
Type: FrameType(h.Type),
81+
Flags: Flag(h.Flags),
82+
StreamID: h.StreamID,
83+
}
84+
85+
switch f := f.(type) {
86+
case *http2.DataFrame:
87+
buf := fr.pool.Get(int(hdr.Size))
88+
copy(buf, f.Data())
89+
return &DataFrame{
90+
hdr: hdr,
91+
Data: buf,
92+
free: func() { fr.pool.Put(buf) },
93+
}, nil
94+
case *http2.RSTStreamFrame:
95+
return &RSTStreamFrame{
96+
hdr: hdr,
97+
Code: ErrCode(f.ErrCode),
98+
}, nil
99+
case *http2.SettingsFrame:
100+
buf := make([]Setting, 0, f.NumSettings())
101+
f.ForeachSetting(func(s http2.Setting) error {
102+
buf = append(buf, Setting{
103+
ID: SettingID(s.ID),
104+
Value: s.Val,
105+
})
106+
return nil
107+
})
108+
return &SettingsFrame{
109+
hdr: hdr,
110+
Settings: buf,
111+
}, nil
112+
case *http2.PingFrame:
113+
buf := fr.pool.Get(int(hdr.Size))
114+
copy(buf, f.Data[:])
115+
return &PingFrame{
116+
hdr: hdr,
117+
Data: buf,
118+
free: func() { fr.pool.Put(buf) },
119+
}, nil
120+
case *http2.GoAwayFrame:
121+
// Size of the frame minus the code and lastStreamID
122+
buf := fr.pool.Get(int(hdr.Size) - 8)
123+
copy(buf, f.DebugData())
124+
return &GoAwayFrame{
125+
hdr: hdr,
126+
LastStreamID: f.LastStreamID,
127+
Code: ErrCode(f.ErrCode),
128+
DebugData: buf,
129+
free: func() { fr.pool.Put(buf) },
130+
}, nil
131+
case *http2.WindowUpdateFrame:
132+
return &WindowUpdateFrame{
133+
hdr: hdr,
134+
Inc: f.Increment,
135+
}, nil
136+
case *http2.MetaHeadersFrame:
137+
return &MetaHeadersFrame{
138+
hdr: hdr,
139+
Fields: f.Fields,
140+
}, nil
141+
default:
142+
buf := fr.pool.Get(int(hdr.Size))
143+
uf := f.(*http2.UnknownFrame)
144+
copy(buf, uf.Payload())
145+
return &UnknownFrame{
146+
hdr: hdr,
147+
Payload: buf,
148+
free: func() { fr.pool.Put(buf) },
149+
}, nil
150+
}
151+
}
152+
153+
// WriteData writes a DATA Frame into the underlying writer.
154+
func (fr *FramerBridge) WriteData(streamID uint32, endStream bool, data ...[]byte) error {
155+
if len(data) == 1 {
156+
return fr.framer.WriteData(streamID, endStream, data[0])
157+
}
158+
159+
var buf []byte
160+
tl := 0
161+
for _, s := range data {
162+
tl += len(s)
163+
}
164+
165+
buf = fr.pool.Get(tl)[:0]
166+
defer fr.pool.Put(buf)
167+
for _, s := range data {
168+
buf = append(buf, s...)
169+
}
170+
171+
return fr.framer.WriteData(streamID, endStream, buf)
172+
}
173+
174+
// WriteHeaders writes a Headers Frame into the underlying writer.
175+
func (fr *FramerBridge) WriteHeaders(streamID uint32, endStream, endHeaders bool, headerBlock []byte) error {
176+
return fr.framer.WriteHeaders(http2.HeadersFrameParam{
177+
StreamID: streamID,
178+
EndStream: endStream,
179+
EndHeaders: endHeaders,
180+
BlockFragment: headerBlock,
181+
})
182+
}
183+
184+
// WriteRSTStream writes a RSTStream Frame into the underlying writer.
185+
func (fr *FramerBridge) WriteRSTStream(streamID uint32, code ErrCode) error {
186+
return fr.framer.WriteRSTStream(streamID, http2.ErrCode(code))
187+
}
188+
189+
// WriteSettings writes a Settings Frame into the underlying writer.
190+
func (fr *FramerBridge) WriteSettings(settings ...Setting) error {
191+
ss := make([]http2.Setting, 0, len(settings))
192+
for _, s := range settings {
193+
ss = append(ss, http2.Setting{
194+
ID: http2.SettingID(s.ID),
195+
Val: s.Value,
196+
})
197+
}
198+
199+
return fr.framer.WriteSettings(ss...)
200+
}
201+
202+
// WriteSettingsAck writes a Settings Frame with the Ack flag set.
203+
func (fr *FramerBridge) WriteSettingsAck() error {
204+
return fr.framer.WriteSettingsAck()
205+
}
206+
207+
// WritePing writes a Ping frame to the underlying writer.
208+
func (fr *FramerBridge) WritePing(ack bool, data [8]byte) error {
209+
return fr.framer.WritePing(ack, data)
210+
}
211+
212+
// WriteGoAway writes a GoAway Frame to the unerlying writer.
213+
func (fr *FramerBridge) WriteGoAway(maxStreamID uint32, code ErrCode, debugData []byte) error {
214+
return fr.framer.WriteGoAway(maxStreamID, http2.ErrCode(code), debugData)
215+
}
216+
217+
// WriteWindowUpdate writes a WindowUpdate Frame into the underlying writer.
218+
func (fr *FramerBridge) WriteWindowUpdate(streamID, inc uint32) error {
219+
return fr.framer.WriteWindowUpdate(streamID, inc)
220+
}
221+
222+
// WriteContinuation writes a Continuation Frame into the underlying writer.
223+
func (fr *FramerBridge) WriteContinuation(streamID uint32, endHeaders bool, headerBlock []byte) error {
224+
return fr.framer.WriteContinuation(streamID, endHeaders, headerBlock)
225+
}

0 commit comments

Comments
 (0)