Skip to content

Commit

Permalink
Refine create and close operators
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 24, 2020
1 parent 24db51b commit 49fc114
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 191 deletions.
4 changes: 3 additions & 1 deletion assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ loop:
for _, v := range got {
delete(m, v)
}
assert.Equal(t, 0, len(m))
if len(m) != 0 {
assert.Fail(t, "missing elements", "%v", got)
}
}
if checkHasItem, value := ass.itemToBeChecked(); checkHasItem {
length := len(got)
Expand Down
9 changes: 1 addition & 8 deletions doc/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ Create an Observable from scratch by calling observer methods programmatically.
## Example

```go
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) {
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
done()
}})
```

Expand All @@ -25,12 +24,6 @@ Output:
3
```

There are two ways to close the Observable:
* Closing the `next` channel.
* Calling the `done()` function.

Yet, as we can pass multiple producers, using the `done()` function is the recommended approach.

## Options

### WithBufferedChannel
Expand Down
9 changes: 1 addition & 8 deletions doc/defer.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ do not create the Observable until the observer subscribes, and create a fresh O
## Example

```go
observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) {
observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
done()
}})
```

Expand All @@ -25,12 +24,6 @@ Output:
3
```

There are two ways to close the Observable:
* Closing the `next` channel.
* Calling the `done()` function.

Yet, as we can pass multiple producers, using the `done()` function is the recommended approach.

## Options

### WithBufferedChannel
Expand Down
71 changes: 45 additions & 26 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,76 +84,97 @@ func Test_Concat_OneEmptyObservable(t *testing.T) {
}

func Test_Create(t *testing.T) {
obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Create([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
}

func Test_Create_SingleDup(t *testing.T) {
obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Create([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
Assert(context.Background(), t, obs, IsEmpty(), HasNoError())
}

func Test_Create_ContextCancelled(t *testing.T) {
closed1 := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
Create([]Producer{
func(ctx context.Context, next chan<- Item) {
cancel()
}, func(ctx context.Context, next chan<- Item) {
<-ctx.Done()
closed1 <- struct{}{}
},
}, WithContext(ctx)).Run()

select {
case <-time.Tick(time.Second):
assert.FailNow(t, "producer not closed")
case <-closed1:
}
}

func Test_Defer(t *testing.T) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
}

func Test_Defer_Multiple(t *testing.T) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
done()
}, func(ctx context.Context, next chan<- Item, done func()) {
}, func(ctx context.Context, next chan<- Item) {
next <- Of(10)
next <- Of(20)
done()
}})
Assert(context.Background(), t, obs, HasItemsNoOrder(1, 2, 10, 20), HasNoError())
}

func Test_Defer_Close(t *testing.T) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
func Test_Defer_ContextCancelled(t *testing.T) {
closed1 := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
Defer([]Producer{
func(ctx context.Context, next chan<- Item) {
cancel()
}, func(ctx context.Context, next chan<- Item) {
<-ctx.Done()
closed1 <- struct{}{}
},
}, WithContext(ctx)).Run()

select {
case <-time.Tick(time.Second):
assert.FailNow(t, "producer not closed")
case <-closed1:
}
}

func Test_Defer_SingleDup(t *testing.T) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
}

func Test_Defer_ComposedDup(t *testing.T) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
return i.(int) + 1, nil
}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
Expand All @@ -164,11 +185,10 @@ func Test_Defer_ComposedDup(t *testing.T) {
}

func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
return i.(int) + 1, nil
}, WithObservationStrategy(Eager)).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
Expand All @@ -181,11 +201,10 @@ func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
}

func Test_Defer_Error(t *testing.T) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Error(errFoo)
done()
}})
Assert(context.Background(), t, obs, HasItems(1, 2), HasError(errFoo))
}
Expand Down
4 changes: 2 additions & 2 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (i Item) SendBlocking(ch chan<- Item) {
ch <- i
}

// SendCtx sends an item and blocks until it is sent or a context canceled.
// SendContext sends an item and blocks until it is sent or a context canceled.
// It returns a boolean to indicate whether the item was sent.
func (i Item) SendCtx(ctx context.Context, ch chan<- Item) bool {
func (i Item) SendContext(ctx context.Context, ch chan<- Item) bool {
select {
case <-ctx.Done():
return false
Expand Down
2 changes: 1 addition & 1 deletion item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_Item_SendContext_True(t *testing.T) {
defer close(ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
assert.True(t, Of(5).SendCtx(ctx, ch))
assert.True(t, Of(5).SendContext(ctx, ch))
}

func Test_Item_SendNonBlocking(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions iterable_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable {
ctx := option.buildContext()

wg := sync.WaitGroup{}
done := func() {
wg.Done()
}
for _, f := range fs {
f := f
wg.Add(1)
go f(ctx, next, done)
go func() {
defer wg.Done()
f(ctx, next)
}()
}
go func() {
wg.Wait()
Expand Down
9 changes: 5 additions & 4 deletions iterable_defer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ func (i *deferIterable) Observe(opts ...Option) <-chan Item {
ctx := option.buildContext()

wg := sync.WaitGroup{}
done := func() {
wg.Done()
}
for _, f := range i.f {
f := f
wg.Add(1)
go f(ctx, next, done)
go func() {
defer wg.Done()
f(ctx, next)
}()
}
go func() {
wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type ObservableImpl struct {
}

func defaultErrorFuncOperator(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
item.SendCtx(ctx, dst)
item.SendContext(ctx, dst)
operatorOptions.stop()
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact
case item, ok := <-observe:
if !ok {
if !bypassGather {
Of(op).SendCtx(ctx, gather)
Of(op).SendContext(ctx, gather)
}
return
}
Expand Down
Loading

0 comments on commit 49fc114

Please sign in to comment.