diff --git a/stream/stream.go b/stream/stream.go index fb620dd..0ceea3b 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -777,6 +777,37 @@ func (s *flattenStream[T]) Close() { s.inner.Close() } +func FlattenSlices[T any](s Stream[[]T]) Stream[T] { + return &flattenSlicesStream[T]{ + inner: s, + } +} + +type flattenSlicesStream[T any] struct { + inner Stream[[]T] + buffer []T +} + +func (s *flattenSlicesStream[T]) Next(ctx context.Context) (T, error) { + var zero T + for { + if len(s.buffer) > 0 { + item := s.buffer[0] + s.buffer[0] = zero + s.buffer = s.buffer[1:] + return item, nil + } + + var err error + s.buffer, err = s.inner.Next(ctx) + if err != nil { + return zero, err + } + } +} + +func (s *flattenSlicesStream[T]) Close() { s.inner.Close() } + // Join returns a Stream that yields all elements from streams[0], then all elements from // streams[1], and so on. func Join[T any](streams ...Stream[T]) Stream[T] { diff --git a/xsync/xsync.go b/xsync/xsync.go index 88ace36..345b743 100644 --- a/xsync/xsync.go +++ b/xsync/xsync.go @@ -5,7 +5,6 @@ import ( "context" "math/rand" "sync" - "sync/atomic" "time" ) @@ -321,66 +320,3 @@ func (f *Future[T]) WaitContext(ctx context.Context) (T, error) { } return f.x, nil } - -// Watchable contains a value. It is similar to an atomic.Pointer[T] but allows notifying callers -// that a new value has been set. -type Watchable[T any] struct { - p atomic.Pointer[watchableInner[T]] -} - -type watchableInner[T any] struct { - t T - c chan struct{} -} - -// Set sets the value in w and notifies callers of Value() that there is a new value. -func (w *Watchable[T]) Set(t T) { - newInner := &watchableInner[T]{ - t: t, - c: make(chan struct{}), - } - oldInner := w.p.Swap(newInner) - if oldInner != nil { - close(oldInner.c) - } -} - -// Value returns the current value inside w and a channel that will be closed when w is Set() to a -// newer value than the returned one. -// -// If called before the first Set(), returns the zero value of T. -// -// Normal usage has an observer waiting for new values in a loop: -// -// for { -// v, changed := w.Value() -// -// // do something with v -// -// <-changed -// } -// -// Note that the value in w may have changed multiple times between successive calls to Value(), -// Value() only ever returns the last-set value. This is by design so that slow observers cannot -// block Set(), unlike sending values on a channel. -func (w *Watchable[T]) Value() (T, chan struct{}) { - inner := w.p.Load() - if inner == nil { - // There's no inner, meaning w has not been Set() yet. Try filling it with an empty inner, - // so that we have a channel to listen on. - c := make(chan struct{}) - emptyInner := &watchableInner[T]{ - c: c, - } - // CompareAndSwap so we don't accidentally smash a real value that got put between our Load - // and here. - if w.p.CompareAndSwap(nil, emptyInner) { - var zero T - return zero, c - } - // If we fell through to here somebody Set() while we were trying to do this, so there's - // definitely an inner now. - inner = w.p.Load() - } - return inner.t, inner.c -} diff --git a/xsync/xsync_go1.19.go b/xsync/xsync_go1.19.go new file mode 100644 index 0000000..9d104e3 --- /dev/null +++ b/xsync/xsync_go1.19.go @@ -0,0 +1,70 @@ +//go:build go1.19 + +package xsync + +import ( + "sync/atomic" +) + +// Watchable contains a value. It is similar to an atomic.Pointer[T] but allows notifying callers +// that a new value has been set. +type Watchable[T any] struct { + p atomic.Pointer[watchableInner[T]] +} + +type watchableInner[T any] struct { + t T + c chan struct{} +} + +// Set sets the value in w and notifies callers of Value() that there is a new value. +func (w *Watchable[T]) Set(t T) { + newInner := &watchableInner[T]{ + t: t, + c: make(chan struct{}), + } + oldInner := w.p.Swap(newInner) + if oldInner != nil { + close(oldInner.c) + } +} + +// Value returns the current value inside w and a channel that will be closed when w is Set() to a +// newer value than the returned one. +// +// If called before the first Set(), returns the zero value of T. +// +// Normal usage has an observer waiting for new values in a loop: +// +// for { +// v, changed := w.Value() +// +// // do something with v +// +// <-changed +// } +// +// Note that the value in w may have changed multiple times between successive calls to Value(), +// Value() only ever returns the last-set value. This is by design so that slow observers cannot +// block Set(), unlike sending values on a channel. +func (w *Watchable[T]) Value() (T, chan struct{}) { + inner := w.p.Load() + if inner == nil { + // There's no inner, meaning w has not been Set() yet. Try filling it with an empty inner, + // so that we have a channel to listen on. + c := make(chan struct{}) + emptyInner := &watchableInner[T]{ + c: c, + } + // CompareAndSwap so we don't accidentally smash a real value that got put between our Load + // and here. + if w.p.CompareAndSwap(nil, emptyInner) { + var zero T + return zero, c + } + // If we fell through to here somebody Set() while we were trying to do this, so there's + // definitely an inner now. + inner = w.p.Load() + } + return inner.t, inner.c +} diff --git a/xsync/xsync_go1.19_test.go b/xsync/xsync_go1.19_test.go new file mode 100644 index 0000000..ef699c8 --- /dev/null +++ b/xsync/xsync_go1.19_test.go @@ -0,0 +1,36 @@ +//go:build go1.19 + +package xsync + +import ( + "fmt" + "time" +) + +func ExampleWatchable() { + start := time.Now() + + var w Watchable[int] + w.Set(0) + go func() { + for i := 1; i < 20; i++ { + w.Set(i) + fmt.Printf("set %d at %s\n", i, time.Since(start).Round(time.Millisecond)) + time.Sleep(5 * time.Millisecond) + } + }() + + for { + v, changed := w.Value() + if v == 19 { + return + } + + fmt.Printf("observed %d at %s\n", v, time.Since(start).Round(time.Millisecond)) + + // Sleep for longer between iterations to show that we don't slow down the setter. + time.Sleep(17 * time.Millisecond) + + <-changed + } +} diff --git a/xsync/xsync_test.go b/xsync/xsync_test.go index 9cc1b5d..e3cbdb2 100644 --- a/xsync/xsync_test.go +++ b/xsync/xsync_test.go @@ -97,31 +97,3 @@ func TestGroup(t *testing.T) { // Jank, but just in case we'd be safe from the above panic just because the test is over. time.Sleep(200 * time.Millisecond) } - -func ExampleWatchable() { - start := time.Now() - - var w Watchable[int] - w.Set(0) - go func() { - for i := 1; i < 20; i++ { - w.Set(i) - fmt.Printf("set %d at %s\n", i, time.Since(start).Round(time.Millisecond)) - time.Sleep(5 * time.Millisecond) - } - }() - - for { - v, changed := w.Value() - if v == 19 { - return - } - - fmt.Printf("observed %d at %s\n", v, time.Since(start).Round(time.Millisecond)) - - // Sleep for longer between iterations to show that we don't slow down the setter. - time.Sleep(17 * time.Millisecond) - - <-changed - } -}