Skip to content

Commit c04b085

Browse files
authored
internal/transport: minor cleanup of controlBuffer code (#7319)
1 parent 07078c4 commit c04b085

File tree

3 files changed

+94
-77
lines changed

3 files changed

+94
-77
lines changed

internal/transport/controlbuf.go

Lines changed: 90 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -289,66 +289,82 @@ func (l *outStreamList) dequeue() *outStream {
289289
}
290290

291291
// controlBuffer is a way to pass information to loopy.
292-
// Information is passed as specific struct types called control frames.
293-
// A control frame not only represents data, messages or headers to be sent out
294-
// but can also be used to instruct loopy to update its internal state.
295-
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
296-
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
292+
//
293+
// Information is passed as specific struct types called control frames. A
294+
// control frame not only represents data, messages or headers to be sent out
295+
// but can also be used to instruct loopy to update its internal state. It
296+
// shouldn't be confused with an HTTP2 frame, although some of the control
297+
// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
297298
type controlBuffer struct {
298-
ch chan struct{}
299-
done <-chan struct{}
299+
wakeupCh chan struct{} // Unblocks readers waiting for something to read.
300+
done <-chan struct{} // Closed when the transport is done.
301+
302+
// Mutex guards all the fields below, except trfChan which can be read
303+
// atomically without holding mu.
300304
mu sync.Mutex
301-
consumerWaiting bool
302-
list *itemList
303-
err error
305+
consumerWaiting bool // True when readers are blocked waiting for new data.
306+
closed bool // True when the controlbuf is finished.
307+
list *itemList // List of queued control frames.
304308

305309
// transportResponseFrames counts the number of queued items that represent
306310
// the response of an action initiated by the peer. trfChan is created
307311
// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
308312
// closed and nilled when transportResponseFrames drops below the
309313
// threshold. Both fields are protected by mu.
310314
transportResponseFrames int
311-
trfChan atomic.Value // chan struct{}
315+
trfChan atomic.Pointer[chan struct{}]
312316
}
313317

314318
func newControlBuffer(done <-chan struct{}) *controlBuffer {
315319
return &controlBuffer{
316-
ch: make(chan struct{}, 1),
317-
list: &itemList{},
318-
done: done,
320+
wakeupCh: make(chan struct{}, 1),
321+
list: &itemList{},
322+
done: done,
319323
}
320324
}
321325

322-
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
323-
// controlbuf.
326+
// throttle blocks if there are too many frames in the control buf that
327+
// represent the response of an action initiated by the peer, like
328+
// incomingSettings cleanupStreams etc.
324329
func (c *controlBuffer) throttle() {
325-
ch, _ := c.trfChan.Load().(chan struct{})
326-
if ch != nil {
330+
if ch := c.trfChan.Load(); ch != nil {
327331
select {
328-
case <-ch:
332+
case <-(*ch):
329333
case <-c.done:
330334
}
331335
}
332336
}
333337

338+
// put adds an item to the controlbuf.
334339
func (c *controlBuffer) put(it cbItem) error {
335340
_, err := c.executeAndPut(nil, it)
336341
return err
337342
}
338343

344+
// executeAndPut runs f, and if the return value is true, adds the given item to
345+
// the controlbuf. The item could be nil, in which case, this method simply
346+
// executes f and does not add the item to the controlbuf.
347+
//
348+
// The first return value indicates whether the item was successfully added to
349+
// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
350+
// if the control buffer is already closed.
339351
func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
340-
var wakeUp bool
341352
c.mu.Lock()
342-
if c.err != nil {
343-
c.mu.Unlock()
344-
return false, c.err
353+
defer c.mu.Unlock()
354+
355+
if c.closed {
356+
return false, ErrConnClosing
345357
}
346358
if f != nil {
347359
if !f() { // f wasn't successful
348-
c.mu.Unlock()
349360
return false, nil
350361
}
351362
}
363+
if it == nil {
364+
return true, nil
365+
}
366+
367+
var wakeUp bool
352368
if c.consumerWaiting {
353369
wakeUp = true
354370
c.consumerWaiting = false
@@ -359,77 +375,81 @@ func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
359375
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
360376
// We are adding the frame that puts us over the threshold; create
361377
// a throttling channel.
362-
c.trfChan.Store(make(chan struct{}))
378+
ch := make(chan struct{})
379+
c.trfChan.Store(&ch)
363380
}
364381
}
365-
c.mu.Unlock()
366382
if wakeUp {
367383
select {
368-
case c.ch <- struct{}{}:
384+
case c.wakeupCh <- struct{}{}:
369385
default:
370386
}
371387
}
372388
return true, nil
373389
}
374390

375-
// Note argument f should never be nil.
376-
func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
377-
c.mu.Lock()
378-
if c.err != nil {
379-
c.mu.Unlock()
380-
return false, c.err
381-
}
382-
if !f(it) { // f wasn't successful
383-
c.mu.Unlock()
384-
return false, nil
385-
}
386-
c.mu.Unlock()
387-
return true, nil
388-
}
389-
391+
// get returns the next control frame from the control buffer. If block is true
392+
// **and** there are no control frames in the control buffer, the call blocks
393+
// until one of the conditions is met: there is a frame to return or the
394+
// transport is closed.
390395
func (c *controlBuffer) get(block bool) (any, error) {
391396
for {
392397
c.mu.Lock()
393-
if c.err != nil {
394-
c.mu.Unlock()
395-
return nil, c.err
396-
}
397-
if !c.list.isEmpty() {
398-
h := c.list.dequeue().(cbItem)
399-
if h.isTransportResponseFrame() {
400-
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
401-
// We are removing the frame that put us over the
402-
// threshold; close and clear the throttling channel.
403-
ch := c.trfChan.Load().(chan struct{})
404-
close(ch)
405-
c.trfChan.Store((chan struct{})(nil))
406-
}
407-
c.transportResponseFrames--
408-
}
398+
frame, err := c.getOnceLocked()
399+
if frame != nil || err != nil || !block {
400+
// If we read a frame or an error, we can return to the caller. The
401+
// call to getOnceLocked() returns a nil frame and a nil error if
402+
// there is nothing to read, and in that case, if the caller asked
403+
// us not to block, we can return now as well.
409404
c.mu.Unlock()
410-
return h, nil
411-
}
412-
if !block {
413-
c.mu.Unlock()
414-
return nil, nil
405+
return frame, err
415406
}
416407
c.consumerWaiting = true
417408
c.mu.Unlock()
409+
410+
// Release the lock above and wait to be woken up.
418411
select {
419-
case <-c.ch:
412+
case <-c.wakeupCh:
420413
case <-c.done:
421414
return nil, errors.New("transport closed by client")
422415
}
423416
}
424417
}
425418

419+
// Callers must not use this method, but should instead use get().
420+
//
421+
// Caller must hold c.mu.
422+
func (c *controlBuffer) getOnceLocked() (any, error) {
423+
if c.closed {
424+
return false, ErrConnClosing
425+
}
426+
if c.list.isEmpty() {
427+
return nil, nil
428+
}
429+
h := c.list.dequeue().(cbItem)
430+
if h.isTransportResponseFrame() {
431+
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
432+
// We are removing the frame that put us over the
433+
// threshold; close and clear the throttling channel.
434+
ch := c.trfChan.Swap(nil)
435+
close(*ch)
436+
}
437+
c.transportResponseFrames--
438+
}
439+
return h, nil
440+
}
441+
442+
// finish closes the control buffer, cleaning up any streams that have queued
443+
// header frames. Once this method returns, no more frames can be added to the
444+
// control buffer, and attempts to do so will return ErrConnClosing.
426445
func (c *controlBuffer) finish() {
427446
c.mu.Lock()
428-
if c.err != nil {
429-
c.mu.Unlock()
447+
defer c.mu.Unlock()
448+
449+
if c.closed {
430450
return
431451
}
432-
c.err = ErrConnClosing
452+
c.closed = true
433453
// There may be headers for streams in the control buffer.
434454
// These streams need to be cleaned out since the transport
435455
// is still not aware of these yet.
@@ -442,15 +462,14 @@ func (c *controlBuffer) finish() {
442462
hdr.onOrphaned(ErrConnClosing)
443463
}
444464
}
465+
445466
// In case throttle() is currently in flight, it needs to be unblocked.
446467
// Otherwise, the transport may not close, since the transport is closed by
447468
// the reader encountering the connection error.
448-
ch, _ := c.trfChan.Load().(chan struct{})
469+
ch := c.trfChan.Swap(nil)
449470
if ch != nil {
450-
close(ch)
471+
close(*ch)
451472
}
452-
c.trfChan.Store((chan struct{})(nil))
453-
c.mu.Unlock()
454473
}
455474

456475
type side int

internal/transport/http2_server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1089,7 +1089,9 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
10891089
onWrite: t.setResetPingStrikes,
10901090
}
10911091

1092-
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
1092+
success, err := t.controlBuf.executeAndPut(func() bool {
1093+
return t.checkForHeaderListSize(trailingHeader)
1094+
}, nil)
10931095
if !success {
10941096
if err != nil {
10951097
return err

internal/transport/transport_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,11 +2495,7 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
24952495
activeStreams: map[uint32]*Stream{
24962496
0: ts,
24972497
},
2498-
controlBuf: &controlBuffer{
2499-
ch: make(chan struct{}),
2500-
done: make(chan struct{}),
2501-
list: &itemList{},
2502-
},
2498+
controlBuf: newControlBuffer(make(<-chan struct{})),
25032499
}
25042500
}
25052501

0 commit comments

Comments
 (0)