Skip to content

Commit

Permalink
quic: avoid panic when PTO expires and implicitly-created streams exist
Browse files Browse the repository at this point in the history
The streams map contains nil entries for implicitly-created streams.
(Receiving a packet for stream N implicitly creates all streams of the
same type <N.)

We weren't checking for nil entries when iterating the map on PTO,
resulting in a panic.

Change the map value to be a wrapper type to make it more explicit that
nil entries exist.

Change-Id: I070c6d60631744018a6e6f2645c95a2f3d3d24b6
Reviewed-on: https://go-review.googlesource.com/c/net/+/550798
Reviewed-by: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
  • Loading branch information
neild committed Dec 19, 2023
1 parent f9726a9 commit c136d0c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 14 deletions.
45 changes: 31 additions & 14 deletions internal/quic/conn_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ import (
)

type streamsState struct {
queue queue[*Stream] // new, peer-created streams
streams map[streamID]*Stream
queue queue[*Stream] // new, peer-created streams

// All peer-created streams.
//
// Implicitly created streams are included as an empty entry in the map.
// (For example, if we receive a frame for stream 4, we implicitly create stream 0 and
// insert an empty entry for it to the map.)
//
// The map value is maybeStream rather than *Stream as a reminder that values can be nil.
streams map[streamID]maybeStream

// Limits on the number of streams, indexed by streamType.
localLimit [streamTypeCount]localStreamLimits
Expand All @@ -37,8 +45,13 @@ type streamsState struct {
queueData streamRing // streams with only flow-controlled frames
}

// maybeStream is a possibly nil *Stream. See streamsState.streams.
type maybeStream struct {
s *Stream
}

func (c *Conn) streamsInit() {
c.streams.streams = make(map[streamID]*Stream)
c.streams.streams = make(map[streamID]maybeStream)
c.streams.queue = newQueue[*Stream]()
c.streams.localLimit[bidiStream].init()
c.streams.localLimit[uniStream].init()
Expand All @@ -52,8 +65,8 @@ func (c *Conn) streamsCleanup() {
c.streams.localLimit[bidiStream].connHasClosed()
c.streams.localLimit[uniStream].connHasClosed()
for _, s := range c.streams.streams {
if s != nil {
s.connHasClosed()
if s.s != nil {
s.s.connHasClosed()
}
}
}
Expand Down Expand Up @@ -97,7 +110,7 @@ func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, er

// Modify c.streams on the conn's loop.
if err := c.runOnLoop(ctx, func(now time.Time, c *Conn) {
c.streams.streams[s.id] = s
c.streams.streams[s.id] = maybeStream{s}
}); err != nil {
return nil, err
}
Expand All @@ -119,7 +132,7 @@ const (
// streamForID returns the stream with the given id.
// If the stream does not exist, it returns nil.
func (c *Conn) streamForID(id streamID) *Stream {
return c.streams.streams[id]
return c.streams.streams[id].s
}

// streamForFrame returns the stream with the given id.
Expand All @@ -144,9 +157,9 @@ func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType)
}
}

s, isOpen := c.streams.streams[id]
if s != nil {
return s
ms, isOpen := c.streams.streams[id]
if ms.s != nil {
return ms.s
}

num := id.num()
Expand Down Expand Up @@ -183,10 +196,10 @@ func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType)
// with the same initiator and type and a lower number.
// Add a nil entry to the streams map for each implicitly created stream.
for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 {
c.streams.streams[n] = nil
c.streams.streams[n] = maybeStream{}
}

s = newStream(c, id)
s := newStream(c, id)
s.inmaxbuf = c.config.maxStreamReadBufferSize()
s.inwin = c.config.maxStreamReadBufferSize()
if id.streamType() == bidiStream {
Expand All @@ -196,7 +209,7 @@ func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType)
s.inUnlock()
s.outUnlock()

c.streams.streams[id] = s
c.streams.streams[id] = maybeStream{s}
c.streams.queue.put(s)
return s
}
Expand Down Expand Up @@ -400,7 +413,11 @@ func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool {
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
const pto = true
for _, s := range c.streams.streams {
for _, ms := range c.streams.streams {
s := ms.s
if s == nil {
continue
}
const pto = true
s.ingate.lock()
inOK := s.appendInFramesLocked(w, pnum, pto)
Expand Down
35 changes: 35 additions & 0 deletions internal/quic/conn_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,38 @@ func TestStreamsCreateConcurrency(t *testing.T) {
t.Errorf("accepted %v streams, want %v", got, want)
}
}

func TestStreamsPTOWithImplicitStream(t *testing.T) {
ctx := canceledContext()
tc := newTestConn(t, serverSide, permissiveTransportParameters)
tc.handshake()
tc.ignoreFrame(frameTypeAck)

// Peer creates stream 1, and implicitly creates stream 0.
tc.writeFrames(packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, bidiStream, 1),
})

// We accept stream 1 and write data to it.
data := []byte("data")
s, err := tc.conn.AcceptStream(ctx)
if err != nil {
t.Fatalf("conn.AcceptStream() = %v, want stream", err)
}
s.Write(data)
s.Flush()
tc.wantFrame("data written to stream",
packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, bidiStream, 1),
data: data,
})

// PTO expires, and the data is resent.
const pto = true
tc.triggerLossOrPTO(packetType1RTT, true)
tc.wantFrame("data resent after PTO expires",
packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, bidiStream, 1),
data: data,
})
}

0 comments on commit c136d0c

Please sign in to comment.