Skip to content

Commit

Permalink
Defer operator
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 17, 2020
1 parent 2640fee commit e2d3fe9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 67 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ The result of this execution is:

It means, the first Observer consumed already all the items.

On the other hand, let's create a hot Observable using `FromFuncs` operator:
On the other hand, let's create a hot Observable using `Defer` operator:

```go
observable := rxgo.FromFuncs(func(_ context.Context, ch chan<- rxgo.Item, done func()) {
observable := rxgo.Defer(func(_ context.Context, ch chan<- rxgo.Item, done func()) {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ Now, the result is:
2
```

In the case of a hot observable created with `FromFuncs`, the stream is reproducible. Depending on our use case, we may favour one or the other approach.
In the case of a hot observable created with `Defer`, the stream is reproducible. Depending on our use case, we may favour one or the other approach.

### Backpressure

Expand Down Expand Up @@ -267,7 +267,7 @@ In this example, we create a pool of 32 goroutines that consume items concurrent
* [Empty/Never](http://reactivex.io/documentation/operators/empty-never-throw.html) — create Observables that have very precise and limited behaviour
* FromChannel — create an Observable based on a lazy channel
* FromEventSource — create an Observable based on an eager channel
* FromFuncs - combine scatter functions emitting items into one Observable
* Defer - combine scatter functions emitting items into one Observable
* FromSlice — create an Observable from a slice
* [Interval](http://reactivex.io/documentation/operators/interval.html) — create an Observable that emits a sequence of integers spaced by a particular time interval
* [Just](http://reactivex.io/documentation/operators/just.html) — convert a set of objects into an Observable that emits that or those objects
Expand Down
14 changes: 7 additions & 7 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ func Concat(observables []Observable, opts ...Option) Observable {
}
}

// Defer creates an observable from multiple functions.
func Defer(f ...ProducerFunc) Observable {
return &observable{
iterable: newFuncsIterable(f...),
}
}

// Empty creates an Observable with no item and terminate immediately.
func Empty() Observable {
next := make(chan Item)
Expand Down Expand Up @@ -300,13 +307,6 @@ func Range(start, count int) Observable {
}
}

// FromFuncs creates an observable from multiple functions.
func FromFuncs(f ...ProducerFunc) Observable {
return &observable{
iterable: newFuncsIterable(f...),
}
}

// Start creates an Observable from one or more directive-like Supplier
// and emits the result of each operation asynchronously on a new Observable.
func Start(fs []Supplier, opts ...Option) Observable {
Expand Down
108 changes: 54 additions & 54 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,43 +80,8 @@ func Test_Concat_OneEmptyObservable(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(1, 2, 3))
}

func Test_Empty(t *testing.T) {
obs := Empty()
Assert(context.Background(), t, obs, HasNoItems())
}

func Test_FromChannel(t *testing.T) {
ch := make(chan Item)
go func() {
ch <- Of(1)
ch <- Of(2)
ch <- Of(3)
close(ch)
}()
obs := FromChannel(ch)
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_FromChannel_SimpleCapacity(t *testing.T) {
ch := FromChannel(make(chan Item, 10)).Observe(WithBufferedChannel(11))
assert.Equal(t, 10, cap(ch))
}

func Test_FromChannel_ComposedCapacity(t *testing.T) {
obs1 := FromChannel(make(chan Item, 10)).
Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
assert.Equal(t, 11, cap(obs1.Observe(WithBufferedChannel(13))))

obs2 := obs1.Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))
assert.Equal(t, 12, cap(obs2.Observe(WithBufferedChannel(13))))
}

func Test_FromFuncs(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
func Test_Defer(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
Expand All @@ -125,8 +90,8 @@ func Test_FromFuncs(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_FromFuncs_Multiple(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
func Test_Defer_Multiple(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
done()
Expand All @@ -138,8 +103,8 @@ func Test_FromFuncs_Multiple(t *testing.T) {
Assert(context.Background(), t, obs, HasItemsNoParticularOrder(1, 2, 10, 20), HasNotRaisedError())
}

func Test_FromFuncs_Close(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
func Test_Defer_Close(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
Expand All @@ -148,8 +113,8 @@ func Test_FromFuncs_Close(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_FromFuncs_SingleDup(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
func Test_Defer_SingleDup(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
Expand All @@ -159,8 +124,8 @@ func Test_FromFuncs_SingleDup(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_FromFuncs_ComposedDup(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
func Test_Defer_ComposedDup(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
Expand All @@ -174,8 +139,8 @@ func Test_FromFuncs_ComposedDup(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(3, 4, 5), HasNotRaisedError())
}

func Test_FromFuncs_ComposedDup_EagerObservation(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
Expand All @@ -186,13 +151,13 @@ func Test_FromFuncs_ComposedDup_EagerObservation(t *testing.T) {
return i.(int) + 1, nil
})
Assert(context.Background(), t, obs, HasItems(3, 4, 5), HasNotRaisedError())
// In the case of an eager observation, we already consumed the items produced by FromFuncs
// In the case of an eager observation, we already consumed the items produced by Defer
// So if we create another subscription, it will be empty
Assert(context.Background(), t, obs, HasNoItem(), HasNotRaisedError())
}

func Test_FromFuncs_Error(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
func Test_Defer_Error(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Error(errFoo)
Expand All @@ -201,15 +166,15 @@ func Test_FromFuncs_Error(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(1, 2), HasRaisedError(errFoo))
}

func Test_FromFuncs_SimpleCapacity(t *testing.T) {
ch := FromFuncs(func(_ context.Context, _ chan<- Item, done func()) {
func Test_Defer_SimpleCapacity(t *testing.T) {
ch := Defer(func(_ context.Context, _ chan<- Item, done func()) {
done()
}).Observe(WithBufferedChannel(5))
assert.Equal(t, 5, cap(ch))
}

func Test_FromFuncs_ComposedCapacity(t *testing.T) {
obs1 := FromFuncs(func(_ context.Context, _ chan<- Item, done func()) {
func Test_Defer_ComposedCapacity(t *testing.T) {
obs1 := Defer(func(_ context.Context, _ chan<- Item, done func()) {
done()
}).Map(func(_ interface{}) (interface{}, error) {
return 1, nil
Expand All @@ -222,6 +187,41 @@ func Test_FromFuncs_ComposedCapacity(t *testing.T) {
assert.Equal(t, 12, cap(obs2.Observe(WithBufferedChannel(13))))
}

func Test_Empty(t *testing.T) {
obs := Empty()
Assert(context.Background(), t, obs, HasNoItems())
}

func Test_FromChannel(t *testing.T) {
ch := make(chan Item)
go func() {
ch <- Of(1)
ch <- Of(2)
ch <- Of(3)
close(ch)
}()
obs := FromChannel(ch)
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_FromChannel_SimpleCapacity(t *testing.T) {
ch := FromChannel(make(chan Item, 10)).Observe(WithBufferedChannel(11))
assert.Equal(t, 10, cap(ch))
}

func Test_FromChannel_ComposedCapacity(t *testing.T) {
obs1 := FromChannel(make(chan Item, 10)).
Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
assert.Equal(t, 11, cap(obs1.Observe(WithBufferedChannel(13))))

obs2 := obs1.Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))
assert.Equal(t, 12, cap(obs2.Observe(WithBufferedChannel(13))))
}

func Test_FromItem(t *testing.T) {
single := JustItem(Of(1))
Assert(context.Background(), t, single, HasItem(1), HasNotRaisedError())
Expand Down
4 changes: 2 additions & 2 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func Test_Observable_ReturnError(t *testing.T) {

func Test_Observable_Retry(t *testing.T) {
i := 0
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
if i == 2 {
Expand All @@ -609,7 +609,7 @@ func Test_Observable_Retry(t *testing.T) {
}

func Test_Observable_Retry_Error(t *testing.T) {
obs := FromFuncs(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Error(errFoo)
Expand Down

0 comments on commit e2d3fe9

Please sign in to comment.