Skip to content

Commit

Permalink
add xsync.Watchable
Browse files Browse the repository at this point in the history
  • Loading branch information
bradenaw committed Nov 25, 2023
1 parent 1b49b27 commit c413889
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
64 changes: 64 additions & 0 deletions xsync/xsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -320,3 +321,66 @@ func (f *Future[T]) WaitContext(ctx context.Context) (T, error) {
}
return f.x, nil
}

// Watchable contains a value. It is similar to an atomic.Pointer[T] but allows notifying callers
// that a new value has been set.
type Watchable[T any] struct {
p atomic.Pointer[watchableInner[T]]

Check failure on line 328 in xsync/xsync.go

View workflow job for this annotation

GitHub Actions / build

undefined: atomic.Pointer
}

type watchableInner[T any] struct {
t T
c chan struct{}
}

// Set sets the value in w and notifies callers of Value() that there is a new value.
func (w *Watchable[T]) Set(t T) {
newInner := &watchableInner[T]{
t: t,
c: make(chan struct{}),
}
oldInner := w.p.Swap(newInner)
if oldInner != nil {
close(oldInner.c)
}
}

// Value returns the current value inside w and a channel that will be closed when w is Set() to a
// newer value than the returned one.
//
// If called before the first Set(), returns the zero value of T.
//
// Normal usage has an observer waiting for new values in a loop:
//
// for {
// v, changed := w.Value()
//
// // do something with v
//
// <-changed
// }
//
// Note that the value in w may have changed multiple times between successive calls to Value(),
// Value() only ever returns the last-set value. This is by design so that slow observers cannot
// block Set(), unlike sending values on a channel.
func (w *Watchable[T]) Value() (T, chan struct{}) {
inner := w.p.Load()
if inner == nil {
// There's no inner, meaning w has not been Set() yet. Try filling it with an empty inner,
// so that we have a channel to listen on.
c := make(chan struct{})
emptyInner := &watchableInner[T]{
c: c,
}
// CompareAndSwap so we don't accidentally smash a real value that got put between our Load
// and here.
if w.p.CompareAndSwap(nil, emptyInner) {
var zero T
return zero, c
}
// If we fell through to here somebody Set() while we were trying to do this, so there's
// definitely an inner now.
inner = w.p.Load()
}
return inner.t, inner.c
}
28 changes: 28 additions & 0 deletions xsync/xsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,31 @@ func TestGroup(t *testing.T) {
// Jank, but just in case we'd be safe from the above panic just because the test is over.
time.Sleep(200 * time.Millisecond)
}

func ExampleWatchable() {
start := time.Now()

var w Watchable[int]
w.Set(0)
go func() {
for i := 1; i < 20; i++ {
w.Set(i)
fmt.Printf("set %d at %s\n", i, time.Since(start).Round(time.Millisecond))
time.Sleep(5 * time.Millisecond)
}
}()

for {
v, changed := w.Value()
if v == 19 {
return
}

fmt.Printf("observed %d at %s\n", v, time.Since(start).Round(time.Millisecond))

// Sleep for longer between iterations to show that we don't slow down the setter.
time.Sleep(17 * time.Millisecond)

<-changed
}
}

0 comments on commit c413889

Please sign in to comment.