Skip to content

Commit

Permalink
Documentation update
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 23, 2020
1 parent 090bbb0 commit 14a4579
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 41 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ In this example, we create a pool of 32 goroutines that consume items concurrent
* [Window](doc/window.md) — apply a function to each item emitted by an Observable, sequentially, and emit each successive value

### Filtering Observables
* [Debounce](doc/debounce.md) — only emit an item from an Observable if a particular timespan has passed without it emitting another item
* [Distinct](doc/distinct.md)/[DistinctUntilChanged](doc/distinctuntilchanged.md) — suppress duplicate items emitted by an Observable
* [ElementAt](doc/elementat.md) — emit only item n emitted by an Observable
* [Filter](doc/filter.md) — emit only those items from an Observable that pass a predicate test
Expand Down
41 changes: 41 additions & 0 deletions doc/debounce.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Debounce Operator

## Overview

Only emit an item from an Observable if a particular timespan has passed without it emitting another item.

![](http://reactivex.io/documentation/operators/images/debounce.png)

## Example

```go
observable.Debounce(rxgo.WithDuration(250 * time.Millisecond))
```

Output: each item emitted by the Observable if not item has been emitted after 250 milliseconds.

## Options

### WithBufferedChannel

[Detail](options.md#withbufferedchannel)

### WithContext

[Detail](options.md#withcontext)

### WithObservationStrategy

[Detail](options.md#withobservationstrategy)

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)

### WithPool

https://github.com/ReactiveX/RxGo/wiki/Options#withpool

### WithCPUPool

https://github.com/ReactiveX/RxGo/wiki/Options#withcpupool
2 changes: 0 additions & 2 deletions duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func timeCausality(elems ...interface{}) (context.Context, Observable, Duration)
fs[i] = func() {
ch <- Of(elem)
}
case func():
fs[i] = elem
case error:
fs[i] = func() {
ch <- Error(elem)
Expand Down
23 changes: 23 additions & 0 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Observable interface {
BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
Contains(equal Predicate, opts ...Option) Single
Count(opts ...Option) Single
Debounce(timespan Duration, opts ...Option) Observable
DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable
Distinct(apply Func, opts ...Option) Observable
DistinctUntilChanged(apply Func, opts ...Option) Observable
Expand Down Expand Up @@ -313,3 +314,25 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact
close(gather)
}()
}

func customObservableOperator(f func(ctx context.Context, next chan Item, option Option, opts ...Option), opts ...Option) Observable {
option := parseOptions(opts...)

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, opts...)
return &ObservableImpl{iterable: newChannelIterable(next)}
}

return &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option := parseOptions(mergedOptions...)
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, mergedOptions...)
return next
}),
}
}
79 changes: 41 additions & 38 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,45 @@ func (op *countOperator) end(_ context.Context, dst chan<- Item) {
func (op *countOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
}

// Debounce only emits an item from an Observable if a particular timespan has passed without it emitting another item.
func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable {
f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
defer close(next)
observe := o.Observe(opts...)
var latest interface{}

for {
select {
case <-ctx.Done():
return
case item, ok := <-observe:
if !ok {
return
}
if item.Error() {
if !item.SendWithContext(ctx, next) {
return
}
if option.getErrorStrategy() == Stop {
return
}
} else {
latest = item.V
}
case <-time.After(timespan.duration()):
if latest != nil {
if !Of(latest).SendWithContext(ctx, next) {
return
}
latest = nil
}
}
}
}

return customObservableOperator(f, opts...)
}

// DefaultIfEmpty returns an Observable that emits the items emitted by the source
// Observable or a specified default item if the source Observable is empty.
func (o *ObservableImpl) DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable {
Expand Down Expand Up @@ -2551,25 +2590,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser
}
}

option := parseOptions(opts...)

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, opts...)
return &ObservableImpl{iterable: newChannelIterable(next)}
}

return &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option := parseOptions(mergedOptions...)
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, mergedOptions...)
return next
}),
}
return customObservableOperator(f, opts...)
}

// WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size
Expand Down Expand Up @@ -2636,25 +2657,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt
}
}

option := parseOptions(opts...)

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, opts...)
return &ObservableImpl{iterable: newChannelIterable(next)}
}

return &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option := parseOptions(mergedOptions...)
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, mergedOptions...)
return next
}),
}
return customObservableOperator(f, opts...)
}

// ZipFromIterable merges the emissions of an Iterable via a specified function
Expand Down
1 change: 0 additions & 1 deletion observable_operator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

const (
benchChannelCap = 1000
benchNumberOfElementsLarge = 1000000
benchNumberOfElementsSmall = 1000
ioPool = 32
)
Expand Down
12 changes: 12 additions & 0 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ func Test_Observable_Count_Parallel(t *testing.T) {
HasItem(int64(10001)))
}

func Test_Observable_Debounce(t *testing.T) {
ctx, obs, d := timeCausality(1, tick, 2, tick, 3, 4, 5, tick, 6, tick)
Assert(context.Background(), t, obs.Debounce(d, WithBufferedChannel(10), WithContext(ctx)),
HasItems(1, 2, 5, 6))
}

func Test_Observable_Debounce_Error(t *testing.T) {
ctx, obs, d := timeCausality(1, tick, 2, tick, 3, errFoo, 5, tick, 6, tick)
Assert(context.Background(), t, obs.Debounce(d, WithBufferedChannel(10), WithContext(ctx)),
HasItems(1, 2), HasError(errFoo))
}

func Test_Observable_DefaultIfEmpty_Empty(t *testing.T) {
obs := Empty().DefaultIfEmpty(3)
Assert(context.Background(), t, obs, HasItems(3))
Expand Down

0 comments on commit 14a4579

Please sign in to comment.