From 14a45793da5638135ff9e4d9dfe41cb1f80c29a8 Mon Sep 17 00:00:00 2001 From: teivah Date: Sun, 23 Feb 2020 14:54:46 +0000 Subject: [PATCH] Documentation update --- README.md | 1 + doc/debounce.md | 41 ++++++++++++++++ duration.go | 2 - observable.go | 23 +++++++++ observable_operator.go | 79 ++++++++++++++++--------------- observable_operator_bench_test.go | 1 - observable_operator_test.go | 12 +++++ 7 files changed, 118 insertions(+), 41 deletions(-) create mode 100644 doc/debounce.md diff --git a/README.md b/README.md index d9b76ecb..63401798 100644 --- a/README.md +++ b/README.md @@ -290,6 +290,7 @@ In this example, we create a pool of 32 goroutines that consume items concurrent * [Window](doc/window.md) — apply a function to each item emitted by an Observable, sequentially, and emit each successive value ### Filtering Observables +* [Debounce](doc/debounce.md) — only emit an item from an Observable if a particular timespan has passed without it emitting another item * [Distinct](doc/distinct.md)/[DistinctUntilChanged](doc/distinctuntilchanged.md) — suppress duplicate items emitted by an Observable * [ElementAt](doc/elementat.md) — emit only item n emitted by an Observable * [Filter](doc/filter.md) — emit only those items from an Observable that pass a predicate test diff --git a/doc/debounce.md b/doc/debounce.md new file mode 100644 index 00000000..18fc533e --- /dev/null +++ b/doc/debounce.md @@ -0,0 +1,41 @@ +# Debounce Operator + +## Overview + +Only emit an item from an Observable if a particular timespan has passed without it emitting another item. + +![](http://reactivex.io/documentation/operators/images/debounce.png) + +## Example + +```go +observable.Debounce(rxgo.WithDuration(250 * time.Millisecond)) +``` + +Output: each item emitted by the Observable if not item has been emitted after 250 milliseconds. + +## Options + +### WithBufferedChannel + +[Detail](options.md#withbufferedchannel) + +### WithContext + +[Detail](options.md#withcontext) + +### WithObservationStrategy + +[Detail](options.md#withobservationstrategy) + +### WithErrorStrategy + +[Detail](options.md#witherrorstrategy) + +### WithPool + +https://github.com/ReactiveX/RxGo/wiki/Options#withpool + +### WithCPUPool + +https://github.com/ReactiveX/RxGo/wiki/Options#withcpupool \ No newline at end of file diff --git a/duration.go b/duration.go index 89650ee5..61d26fa9 100644 --- a/duration.go +++ b/duration.go @@ -51,8 +51,6 @@ func timeCausality(elems ...interface{}) (context.Context, Observable, Duration) fs[i] = func() { ch <- Of(elem) } - case func(): - fs[i] = elem case error: fs[i] = func() { ch <- Error(elem) diff --git a/observable.go b/observable.go index ba07e62a..3be1a677 100644 --- a/observable.go +++ b/observable.go @@ -24,6 +24,7 @@ type Observable interface { BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable Contains(equal Predicate, opts ...Option) Single Count(opts ...Option) Single + Debounce(timespan Duration, opts ...Option) Observable DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable Distinct(apply Func, opts ...Option) Observable DistinctUntilChanged(apply Func, opts ...Option) Observable @@ -313,3 +314,25 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact close(gather) }() } + +func customObservableOperator(f func(ctx context.Context, next chan Item, option Option, opts ...Option), opts ...Option) Observable { + option := parseOptions(opts...) + + if option.isEagerObservation() { + next := option.buildChannel() + ctx := option.buildContext() + go f(ctx, next, option, opts...) + return &ObservableImpl{iterable: newChannelIterable(next)} + } + + return &ObservableImpl{ + iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item { + mergedOptions := append(opts, propagatedOptions...) + option := parseOptions(mergedOptions...) + next := option.buildChannel() + ctx := option.buildContext() + go f(ctx, next, option, mergedOptions...) + return next + }), + } +} diff --git a/observable_operator.go b/observable_operator.go index 90ba5d79..6d8a3e96 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -637,6 +637,45 @@ func (op *countOperator) end(_ context.Context, dst chan<- Item) { func (op *countOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) { } +// Debounce only emits an item from an Observable if a particular timespan has passed without it emitting another item. +func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable { + f := func(ctx context.Context, next chan Item, option Option, opts ...Option) { + defer close(next) + observe := o.Observe(opts...) + var latest interface{} + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-observe: + if !ok { + return + } + if item.Error() { + if !item.SendWithContext(ctx, next) { + return + } + if option.getErrorStrategy() == Stop { + return + } + } else { + latest = item.V + } + case <-time.After(timespan.duration()): + if latest != nil { + if !Of(latest).SendWithContext(ctx, next) { + return + } + latest = nil + } + } + } + } + + return customObservableOperator(f, opts...) +} + // DefaultIfEmpty returns an Observable that emits the items emitted by the source // Observable or a specified default item if the source Observable is empty. func (o *ObservableImpl) DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable { @@ -2551,25 +2590,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser } } - option := parseOptions(opts...) - - if option.isEagerObservation() { - next := option.buildChannel() - ctx := option.buildContext() - go f(ctx, next, option, opts...) - return &ObservableImpl{iterable: newChannelIterable(next)} - } - - return &ObservableImpl{ - iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item { - mergedOptions := append(opts, propagatedOptions...) - option := parseOptions(mergedOptions...) - next := option.buildChannel() - ctx := option.buildContext() - go f(ctx, next, option, mergedOptions...) - return next - }), - } + return customObservableOperator(f, opts...) } // WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size @@ -2636,25 +2657,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt } } - option := parseOptions(opts...) - - if option.isEagerObservation() { - next := option.buildChannel() - ctx := option.buildContext() - go f(ctx, next, option, opts...) - return &ObservableImpl{iterable: newChannelIterable(next)} - } - - return &ObservableImpl{ - iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item { - mergedOptions := append(opts, propagatedOptions...) - option := parseOptions(mergedOptions...) - next := option.buildChannel() - ctx := option.buildContext() - go f(ctx, next, option, mergedOptions...) - return next - }), - } + return customObservableOperator(f, opts...) } // ZipFromIterable merges the emissions of an Iterable via a specified function diff --git a/observable_operator_bench_test.go b/observable_operator_bench_test.go index 43007f73..d0ed67e0 100644 --- a/observable_operator_bench_test.go +++ b/observable_operator_bench_test.go @@ -8,7 +8,6 @@ import ( const ( benchChannelCap = 1000 - benchNumberOfElementsLarge = 1000000 benchNumberOfElementsSmall = 1000 ioPool = 32 ) diff --git a/observable_operator_test.go b/observable_operator_test.go index 467af9c0..b5b338e5 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -254,6 +254,18 @@ func Test_Observable_Count_Parallel(t *testing.T) { HasItem(int64(10001))) } +func Test_Observable_Debounce(t *testing.T) { + ctx, obs, d := timeCausality(1, tick, 2, tick, 3, 4, 5, tick, 6, tick) + Assert(context.Background(), t, obs.Debounce(d, WithBufferedChannel(10), WithContext(ctx)), + HasItems(1, 2, 5, 6)) +} + +func Test_Observable_Debounce_Error(t *testing.T) { + ctx, obs, d := timeCausality(1, tick, 2, tick, 3, errFoo, 5, tick, 6, tick) + Assert(context.Background(), t, obs.Debounce(d, WithBufferedChannel(10), WithContext(ctx)), + HasItems(1, 2), HasError(errFoo)) +} + func Test_Observable_DefaultIfEmpty_Empty(t *testing.T) { obs := Empty().DefaultIfEmpty(3) Assert(context.Background(), t, obs, HasItems(3))