Skip to content

Commit

Permalink
Add BBR Flow-Control (#294)
Browse files Browse the repository at this point in the history
* Some preliminary work on adding BBR flow control support.

* Improve comments in clock package.

* Add a link to the BBR paper.

* More comments.

* Clean up filter.go

Better separation of concerns.

* More work on bbr.

- All of the needed fields of state are present and documented.
  I think I have my head wrapped around the whole thing now.
- onAck is "fully" implemented.

* Add some necessary type conversions

* bbr: add an un-bounded queue implementation.

The rtprop filter's window is time based.

* queue: add a Fold operator.

Will be useful for computing estimates; doing this in a loop is awkward
because of the ringbuffer nature.

* Rework filters.

...to use queue, and in the case of rtProp to avoid unbounded space.

* Remove now-unused ringBuffer type

* Add tests for filter and fix a bug.

The zero value for nextSample gets treated as an actual sample. So strip
things out of the queue after updating it, and make sure it's timestamp
makes it immediately obsolete.

* Rename state -> Manager.

This is going to be the public interface for this package. I want to use
`state` to refer to something else.

* Start on some state tracking logic.

Running out of steam, putting this down for a bit.

* Write most of the logic for send.

This looks different than in the paper due to the need to actually block
things. Needs testing, and bits need docs.

* Implement FlowLimiter for Manager.

* Stub out Timer with an interface.

Needs testing. Then we need to use it for testing.

* Add a NewManual function.

* Add tests for manual clock/timer.

* Make sure timers go off if the duration isn't positive.

* Manager: start the timer.

* A test, and various fixes.

* Avoid divide by zeros, and better comments.

In particular, in doSend we divide by pacingGain * btlBw, so the latter
musn't be zero.

* Rename readme.go -> doc.go

...which is more idiomatic for this kind of thing.

* Clarify comments re: coalescing rtt samples.

* Document more fields in Manager.

* Get rid of stale XXX comment.

When I wrote this, the value we were supplying was 0, which could
take precedence over an actual sample if the clock was set to within
1 second of MinInt64. But now that the value is MaxInt64, it will be
superseded even then.

* Rename Manager -> Limiter.

Manager is a huge code smell that usually means "I couldn't figure out
what to call it," and this corresponds to the FlowLimiter interface in
the flowcontrol package.

* One more pass at clarifying comment.

Thanks to Louis for suggesting this.

* Better comment on limiter.

* Use floats for bandwidth, define a wrapper type.

The wrapper type makes this a bit more type safe; harder to accidentally
mix bandwidths and non-bandwidths.

In the course of doing this I realized our units were GB/s, which is way
to fast to use if it's an integer.

* Rename variable mgr -> lim

Missed this when I renamed the type.

* Add a test.

* Better name for TestStuff, use context.Background()

* Use the system clock for one test.

...otherwise we'd need to advance the clock manually, and for this
test we don't care.

* Fix a bug where trySend() would not always re-arm the timer.

If there was no message ready to send, trySend() would not re-arm the
timer -- which makes sense in isolation, but could cause deadlocks if
we later *do* need to re-arm the timer. This reworks things so that
the Limiter doesn't store the timer at all, and it doesn't need
resetting. Instead, we just create a new timer at the top of the main
loop each time we need it.

* Some more testing work.

- We now have a way of pausing the limiter so we can inspect it
  without data races.
- We check a few more things.

* Explicitly set nextSendTime on startup.

Unlikely to come up, but if someone's clock is incorrectly
set to before the zero value for time.Time, then this could
otherwise incorrectly not fire.

* Fix typos in comments.

* startupState: make sure to always update prevBtlBwEstimate

Note, we don't actually need the `if s.prevBtlBwEstimate == 0 { ... }`,
because in that case we'll always go down the plateuRounds = 0 case,
which changes no state relative to the starting point.

* Add a test for the startup state.

This is poorly thought out and I'm running out of steam; calling it a
night.

* Add some utilities for simulating a network path.

Untested.

* Add some simple tests for the test code.

* Factor out magic start time constant

* Remove need for packetMeta in testPacket

...we really only need the size.

* Set a reasonable initial value for delveredTime.

* Minor code cleanup

no functional change.

* Work on probeBW state.

Implemented except for switching into probeRTT.

* Fuss with and ultimately disable state_test.

I need to think through what this is actually trying to check

* Add a "test" for tracing purposes.

* First pass at probeRTT.

I vaguely suspected this would happen: turns out we don't need preAck()
for any state, so that's gone. Some bits to fill in still.

* Add a comment.

* probeRTT: change how we detect round trips.

...rather than just assuming they take (our possibly stale estimate of)
`rtProp` time.

* switch from probeRTT back to probeBW, and add comments.

Much of this is me trying to wrap my head around all the state
transitions.

* Move logging out of the hot path.

* Factor out specific data from new runTrace function.

This will let us get a trace for various inputs, and then offline check
properties of the samples.

* More faithful snapshots.

* snapshot.report(): dump contents of rtPropFilter.

* Expose BBR limiter from flowcontrol package.

* Fix import path for mpsc

...to match the change from main.

* testLink.run: take receiver by value.

...the tests were spawning multiple copies of the same link. D'oh. Also
includes a test that catches the issue.

* drainState: change state even if we overshoot the target.

I'm like 98% sure this is what we want.

* TraceTest: don't test when -short is passed.

* BBR: int64 -> uint64

There's really no reason for these to be signed. I think originally this
had something to do with the way this interacted with time.Duration, but
just changing it everywhere seems to work fine, so...

* Add a missing word in comment.

* Add a function for returning the true rtProp and btlBw values for a path

* Shape TestTrace into a proper test.

* Better error messages on test failure

* Include minPacketBytes field in TestTrace cases.

...so this doesn't always have to be 1. We could improve this by just
finding the correct value from the list of packets.

* Test estimates after the startup state.

* isAppLimited: check if inflight() is zero.

See the comment; this is important and was the cause of some apparent
hangs.

* Separately test that values are correct *at any point*

Will only fail when at least one of the others does obviously, but
useful for diagnostics.

* More test suite stuff.

- Slightly nicer output for snapshots
- relax tolerances for rtProp; this was causing failures, but 5% seems
  reasonable. We'll do more stringent testing (esp since probeRTT is
  only half implemented) to validate this later on.
- Add a few more (failing) test cases.

* Minor readability improvement.

This was a bit hard to follow because the units were conceptually wrong
-- we were casting to bytesPerNs because that was the type we needed,
but the units were bytes and nanoseconds, separately. Just for clarity's
sake, cast to float64 and then only cast to bytesPerNs when you've got
the final result.

* Json reporting

* Fix logic in trueValues

A "delay only" link actually has a bandwidth of (avg packet size /
delay), whereas the previous calculation assumed bandwidth was infinite.

* Fix some errors in trueValues.

Most glaringly, we were taking the *maximum* bandwidth to be the
bottleneck, not the minimum. As a secondary issue, we were only adding
in delay for the bottleneck link (whether or not its delay was already
accounting for), not all bandwidth limited links.

With these changes, our TraceTest numbers are only off by a bit under
25% -- is that close enough? But my test program still is getting 20x
less bandwidth than it should...

* More informative reporting of numeric errors.

This should make it easier to understand what's going on.

* Fix typo in comment.

* Fix spelling error in variable name.

* Add some comments

* probeBWState: tweak logic for choosing index.

Specifically: move the undesired state to the end of the array, so we
can just pick a random index before then, rather than having to check if
we go the bad one. This also fixes a probably-inconsequential bug in
that the probability of choosing the index after the low one was
slightly higher than the others; it should be a properly uniform
distribution now.

* Fixes to core logic in run().

The way we were/weren't using appLimitedUntil was... wrong, I think
mostly as a function of me having a shaky understanding of how it worked
when I first wrote this. After fixing this the bbr limiter gets within a
factor of 3 for the bandwidth on my local network.

* Delete state_test.go

The tests here never really made sense.

* Skip gatherData() by default.

...so we can practically use go test with this here.

* Clean up tests a bit, test uniform packet sizes in TraceTest

* Remove some TODOs.

- The fine grained time measurement seems to be working fine, so let's
  leave it.
- cwndGain is actually essential to how startup works at least, so we
  can't just kill it.

* Expose snapshot functionality from bbr package.

...so I can get this info outside the test suite, for debugging.

* Add a TracingLimiter implementation.

...which has been useful for debugging.

* Improvements to snapshot support.

- SnapshotLimiter is now safe to call concurrently with other operations
  on the limiter (uses whilePaused internally).
- There is now a SnapshottingLimiter that takes snapshots at each point
  of interest.

* Commit test tool for gathering data.

* Support snapshoting bbr in the test tool.

* flowcontrol test tool: add -packetsize flag.

* test-tool: add a --no-trace flag.

The intent here was to test if there was significant overhead for this
that could be throwing off results. Answer: no.

* Correctly set cwnd for probeRTT

...four packets, as discussed in the paper.

* probeRTT: go to startup if we're below BDP.

* TraceLimiter: add a constructor function.

...and make `underlying` private.

* Emit trace records via a callback.

...instead of just appending them to a slice. This allows for streaming
collection of data.

* bbr: add comments to exported snapshot stuff.

* Expose the bbr package

...as flowcontrol/bbr

* Expose clock interface, since it's used by bbr.

Also, bbr.NewLimiter now defaults to using the system clock if its
argument is nil. flowcontrol.NewBBR() has been removed; just use
bbr.NewLimiter(nil)

* Mark TestTrace as flaky.
  • Loading branch information
zenhack authored Nov 18, 2022
1 parent 6aa0193 commit bf31894
Show file tree
Hide file tree
Showing 20 changed files with 2,550 additions and 0 deletions.
131 changes: 131 additions & 0 deletions exp/clock/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Package clock provides an interface for clocks.
//
// This is useful for testing code which checks does things based on
// time; it allows advancing the clock as needed by the tests, rather
// than needing to race with real time.
package clock

import (
"math"
"sync"
"time"

"capnproto.org/go/capnp/v3/internal/syncutil"
)

// A Clock can measure time.
type Clock interface {
Now() time.Time
NewTimer(d time.Duration) Timer
}

type Timer interface {
Chan() <-chan time.Time
Reset(d time.Duration)
Stop() bool
}

// System reads the current time from the system clock.
var System Clock = systemClock{}

type systemClock struct{}

func (systemClock) Now() time.Time {
return time.Now()
}

func (systemClock) NewTimer(d time.Duration) Timer {
return (*systemTimer)(time.NewTimer(d))
}

type systemTimer time.Timer

func (t *systemTimer) Chan() <-chan time.Time {
return t.C
}

func (t *systemTimer) Reset(d time.Duration) {
(*time.Timer)(t).Reset(d)
}

func (t *systemTimer) Stop() bool {
return (*time.Timer)(t).Stop()
}

// A Manual is a clock which is stopped, and only advances when its Advance
// method is called.
type Manual struct {
mu sync.Mutex
now time.Time
timers []*manualTimer
}

// Returns a new Manual clock, with the given initial time.
func NewManual(now time.Time) *Manual {
return &Manual{now: now}
}

// Now returns the current time.
func (m *Manual) Now() (now time.Time) {
syncutil.With(&m.mu, func() {
now = m.now
})
return now
}

func (m *Manual) NewTimer(d time.Duration) Timer {
var ret *manualTimer
syncutil.With(&m.mu, func() {
ret = &manualTimer{
ch: make(chan time.Time, 1),
deadline: m.now.Add(d),
clock: m,
}
m.timers = append(m.timers, ret)
if d <= 0 {
ret.ch <- m.now
}
})
return ret
}

// Advance advances the clock forward by the given duration.
func (m *Manual) Advance(d time.Duration) {
syncutil.With(&m.mu, func() {
before := m.now
m.now = before.Add(d)

for i := range m.timers {
t := m.timers[i]

if before.Before(t.deadline) && !m.now.Before(t.deadline) {
t.ch <- m.now
}
}
})
}

type manualTimer struct {
ch chan time.Time
deadline time.Time
clock *Manual
}

func (t *manualTimer) Chan() <-chan time.Time {
return t.ch
}

func (t *manualTimer) Reset(d time.Duration) {
syncutil.With(&t.clock.mu, func() {
t.deadline = t.clock.now.Add(d)
})
}

func (t *manualTimer) Stop() bool {
var wasActive bool
syncutil.With(&t.clock.mu, func() {
wasActive = t.clock.now.Before(t.deadline)
t.deadline = time.Unix(math.MinInt64, 0)
})
return wasActive
}
77 changes: 77 additions & 0 deletions exp/clock/clock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package clock

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTimers(t *testing.T) {
// arbitrary initial now:
init := time.Unix(1e9, 0)

clock := NewManual(init)

timer := clock.NewTimer(5 * time.Second)

select {
case <-timer.Chan():
t.Fatal("Timer went off too early.")
default:
}

clock.Advance(1 * time.Second)

select {
case <-timer.Chan():
t.Fatal("Timer went off too early.")
default:
}

clock.Advance(10 * time.Second)

select {
case <-timer.Chan():
default:
t.Fatal("Timer didn't go off.")
}

assert.False(t, timer.Stop(), "Timer should have already been stopped.")

assert.Equal(t, init.Add(11*time.Second), clock.Now(), "Time should be 11 seconds later.")

timer.Reset(1 * time.Second)

select {
case <-timer.Chan():
t.Fatal("Timer went off again too early.")
default:
}

clock.Advance(2 * time.Second)

select {
case <-timer.Chan():
default:
t.Fatal("Timer didn't go off.")
}
}

func TestImmediateTimer(t *testing.T) {
clock := NewManual(time.Unix(1e9, 0))

timer := clock.NewTimer(0)
select {
case <-timer.Chan():
default:
t.Fatal("Timer with duration of zero should go off immediately.")
}

timer = clock.NewTimer(-1)
select {
case <-timer.Chan():
default:
t.Fatal("Timer with negative duration should go off immediately.")
}
}
5 changes: 5 additions & 0 deletions flowcontrol/bbr/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// package bbr implements the BBR congestion control algorithm, as
// described in:
//
// https://queue.acm.org/detail.cfm?id=3022184
package bbr
143 changes: 143 additions & 0 deletions flowcontrol/bbr/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package bbr

import (
"fmt"
"math"
"time"
)

// Size of the bottleneck bandwidth filter's window. The paper suggests
// 6-10, other than that this is arbitrary.
const btlBwFilterSize = 6

// Units in which we measure the bottleneck bandwidth. Also equivalent
// to GB/s.
type bytesPerNs float64

// Filter that estimates the bottleneck bandwidth.
type btlBwFilter struct {
q queue[bytesPerNs]
Estimate bytesPerNs
}

func newBtlBwFilter() btlBwFilter {
return btlBwFilter{
q: *newQueue[bytesPerNs](btlBwFilterSize),

// We set this to something that is only barely
// non-zero, so it won't result in divide by
// zero errors but also won't take precedence
// over any actual data we receive.
Estimate: 1e-10, // 1 byte per 10s
}
}

func (f *btlBwFilter) AddSample(deliveryRate bytesPerNs) {
if f.q.Len() == btlBwFilterSize {
f.q.Pop()
}
f.q.Push(deliveryRate)
f.Estimate = f.q.Fold(0, max[bytesPerNs])
}

// Filter that estimates the round-trip propagation time.
type rtPropFilter struct {
q queue[rtPropSample]
nextSample rtPropSample
Estimate time.Duration
}

func newRtPropFilter() rtPropFilter {
return rtPropFilter{
nextSample: rtPropSample{
// Set this to a value that will immediately be superceeded
// as soon as we get a real sample.
now: time.Unix(math.MinInt64, 0),
rtt: math.MaxInt64,
},
}
}

type rtPropSample struct {
rtt time.Duration
now time.Time
}

func (s rtPropSample) String() string {
return fmt.Sprintf("rtPropSample{rtt: %v, now = %v}", s.rtt, s.now)
}

func (f *rtPropFilter) AddSample(sample rtPropSample) {
// We want to avoid an un-bounded growing queue for two reasons:
//
// 1. Space usage
// 2. Recomputing the estimate is an O(n) operation, so we want to
// keep n small if we're doing that on each ack.
//
// We manage this as follows: rather than adding each sample to the
// queue individually, we compute the minimum RTT for each 1-second
// window, and add that aggregate value to the queue, which in turn
// drops samples more than 30 seconds old. This gives the benefits of
// the sliding window without needing to store each and every sample.

if sample.now.Sub(f.nextSample.now) > time.Second {
f.q.Push(f.nextSample)
f.nextSample = sample
} else {
f.nextSample = rtPropSample{
// We keep the old `now`, since it's used to determine if we
// should shift to the next sample:
now: f.nextSample.now,
rtt: min(f.nextSample.rtt, sample.rtt),
}
}

// Clear out any samples older than 30 seconds:
for !f.q.Empty() && sample.now.Sub(f.q.Peek().now) > 30*time.Second {
f.q.Pop()
}

f.Estimate = foldQueue(
&f.q,
f.nextSample.rtt,
func(rtProp time.Duration, sample rtPropSample) time.Duration {
return min(rtProp, sample.rtt)
},
)
}

func (f btlBwFilter) snapshot() btlBwFilter {
ret := f
ret.q = f.q.snapshot()
return ret
}

func (f rtPropFilter) snapshot() rtPropFilter {
ret := f
ret.q = f.q.snapshot()
return ret
}

// min and max compute the minimum and maximum of two numbers, respectively.
// Presumably, as Go generics become more widely used, these will be dropped
// in favor of some standard library function.
//
// We could use a broader constraint here (constraints.Ordered), but we'd have
// to either define the alias ourselves or import the exp package, and we
// only actually use these at more specific types anyway.

func min[T ~int64](x, y T) T {
if x < y {
return x
} else {
return y
}
}

func max[T ~float64](x, y T) T {
if x > y {
return x
} else {
return y
}
}
Loading

0 comments on commit bf31894

Please sign in to comment.