From 49fc114ded8a90135b9e787ee2b0d95ecf010b7b Mon Sep 17 00:00:00 2001 From: teivah Date: Mon, 24 Feb 2020 20:35:29 +0000 Subject: [PATCH] Refine create and close operators --- assert.go | 4 +- doc/create.md | 9 +- doc/defer.md | 9 +- factory_test.go | 71 +++++++---- item.go | 4 +- item_test.go | 2 +- iterable_create.go | 9 +- iterable_defer.go | 9 +- observable.go | 4 +- observable_operator.go | 240 ++++++++++++++++++------------------ observable_operator_test.go | 14 +-- single.go | 8 +- types.go | 2 +- 13 files changed, 194 insertions(+), 191 deletions(-) diff --git a/assert.go b/assert.go index c2542d14..4628dbb6 100644 --- a/assert.go +++ b/assert.go @@ -233,7 +233,9 @@ loop: for _, v := range got { delete(m, v) } - assert.Equal(t, 0, len(m)) + if len(m) != 0 { + assert.Fail(t, "missing elements", "%v", got) + } } if checkHasItem, value := ass.itemToBeChecked(); checkHasItem { length := len(got) diff --git a/doc/create.md b/doc/create.md index f2e5e793..998dd325 100644 --- a/doc/create.md +++ b/doc/create.md @@ -9,11 +9,10 @@ Create an Observable from scratch by calling observer methods programmatically. ## Example ```go -observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) { +observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) { next <- rxgo.Of(1) next <- rxgo.Of(2) next <- rxgo.Of(3) - done() }}) ``` @@ -25,12 +24,6 @@ Output: 3 ``` -There are two ways to close the Observable: -* Closing the `next` channel. -* Calling the `done()` function. - -Yet, as we can pass multiple producers, using the `done()` function is the recommended approach. - ## Options ### WithBufferedChannel diff --git a/doc/defer.md b/doc/defer.md index 0bd7ecea..b39e0422 100644 --- a/doc/defer.md +++ b/doc/defer.md @@ -9,11 +9,10 @@ do not create the Observable until the observer subscribes, and create a fresh O ## Example ```go -observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) { +observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) { next <- rxgo.Of(1) next <- rxgo.Of(2) next <- rxgo.Of(3) - done() }}) ``` @@ -25,12 +24,6 @@ Output: 3 ``` -There are two ways to close the Observable: -* Closing the `next` channel. -* Calling the `done()` function. - -Yet, as we can pass multiple producers, using the `done()` function is the recommended approach. - ## Options ### WithBufferedChannel diff --git a/factory_test.go b/factory_test.go index 1d616849..50445398 100644 --- a/factory_test.go +++ b/factory_test.go @@ -84,76 +84,97 @@ func Test_Concat_OneEmptyObservable(t *testing.T) { } func Test_Create(t *testing.T) { - obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Create([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Of(3) - done() }}) Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError()) } func Test_Create_SingleDup(t *testing.T) { - obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Create([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Of(3) - done() }}) Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError()) Assert(context.Background(), t, obs, IsEmpty(), HasNoError()) } +func Test_Create_ContextCancelled(t *testing.T) { + closed1 := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + Create([]Producer{ + func(ctx context.Context, next chan<- Item) { + cancel() + }, func(ctx context.Context, next chan<- Item) { + <-ctx.Done() + closed1 <- struct{}{} + }, + }, WithContext(ctx)).Run() + + select { + case <-time.Tick(time.Second): + assert.FailNow(t, "producer not closed") + case <-closed1: + } +} + func Test_Defer(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Of(3) - done() }}) Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError()) } func Test_Defer_Multiple(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) - done() - }, func(ctx context.Context, next chan<- Item, done func()) { + }, func(ctx context.Context, next chan<- Item) { next <- Of(10) next <- Of(20) - done() }}) Assert(context.Background(), t, obs, HasItemsNoOrder(1, 2, 10, 20), HasNoError()) } -func Test_Defer_Close(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { - next <- Of(1) - next <- Of(2) - next <- Of(3) - done() - }}) - Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError()) +func Test_Defer_ContextCancelled(t *testing.T) { + closed1 := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + Defer([]Producer{ + func(ctx context.Context, next chan<- Item) { + cancel() + }, func(ctx context.Context, next chan<- Item) { + <-ctx.Done() + closed1 <- struct{}{} + }, + }, WithContext(ctx)).Run() + + select { + case <-time.Tick(time.Second): + assert.FailNow(t, "producer not closed") + case <-closed1: + } } func Test_Defer_SingleDup(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Of(3) - done() }}) Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError()) Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError()) } func Test_Defer_ComposedDup(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Of(3) - done() }}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) { return i.(int) + 1, nil }).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) { @@ -164,11 +185,10 @@ func Test_Defer_ComposedDup(t *testing.T) { } func Test_Defer_ComposedDup_EagerObservation(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Of(3) - done() }}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) { return i.(int) + 1, nil }, WithObservationStrategy(Eager)).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) { @@ -181,11 +201,10 @@ func Test_Defer_ComposedDup_EagerObservation(t *testing.T) { } func Test_Defer_Error(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Error(errFoo) - done() }}) Assert(context.Background(), t, obs, HasItems(1, 2), HasError(errFoo)) } diff --git a/item.go b/item.go index 0b94b25e..7a3d0d09 100644 --- a/item.go +++ b/item.go @@ -81,9 +81,9 @@ func (i Item) SendBlocking(ch chan<- Item) { ch <- i } -// SendCtx sends an item and blocks until it is sent or a context canceled. +// SendContext sends an item and blocks until it is sent or a context canceled. // It returns a boolean to indicate whether the item was sent. -func (i Item) SendCtx(ctx context.Context, ch chan<- Item) bool { +func (i Item) SendContext(ctx context.Context, ch chan<- Item) bool { select { case <-ctx.Done(): return false diff --git a/item_test.go b/item_test.go index 5f12119d..8d40192f 100644 --- a/item_test.go +++ b/item_test.go @@ -43,7 +43,7 @@ func Test_Item_SendContext_True(t *testing.T) { defer close(ch) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - assert.True(t, Of(5).SendCtx(ctx, ch)) + assert.True(t, Of(5).SendContext(ctx, ch)) } func Test_Item_SendNonBlocking(t *testing.T) { diff --git a/iterable_create.go b/iterable_create.go index b762d92b..2b643aa9 100644 --- a/iterable_create.go +++ b/iterable_create.go @@ -15,12 +15,13 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable { ctx := option.buildContext() wg := sync.WaitGroup{} - done := func() { - wg.Done() - } for _, f := range fs { + f := f wg.Add(1) - go f(ctx, next, done) + go func() { + defer wg.Done() + f(ctx, next) + }() } go func() { wg.Wait() diff --git a/iterable_defer.go b/iterable_defer.go index 51274c15..06aa2e32 100644 --- a/iterable_defer.go +++ b/iterable_defer.go @@ -22,12 +22,13 @@ func (i *deferIterable) Observe(opts ...Option) <-chan Item { ctx := option.buildContext() wg := sync.WaitGroup{} - done := func() { - wg.Done() - } for _, f := range i.f { + f := f wg.Add(1) - go f(ctx, next, done) + go func() { + defer wg.Done() + f(ctx, next) + }() } go func() { wg.Wait() diff --git a/observable.go b/observable.go index 017ccbd0..cd2feb7e 100644 --- a/observable.go +++ b/observable.go @@ -88,7 +88,7 @@ type ObservableImpl struct { } func defaultErrorFuncOperator(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) operatorOptions.stop() } @@ -297,7 +297,7 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact case item, ok := <-observe: if !ok { if !bypassGather { - Of(op).SendCtx(ctx, gather) + Of(op).SendContext(ctx, gather) } return } diff --git a/observable_operator.go b/observable_operator.go index 05677266..49822cd5 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -30,7 +30,7 @@ type allOperator struct { func (op *allOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { if !op.predicate(item.V) { - Of(false).SendCtx(ctx, dst) + Of(false).SendContext(ctx, dst) op.all = false operatorOptions.stop() } @@ -42,13 +42,13 @@ func (op *allOperator) err(ctx context.Context, item Item, dst chan<- Item, oper func (op *allOperator) end(ctx context.Context, dst chan<- Item) { if op.all { - Of(true).SendCtx(ctx, dst) + Of(true).SendContext(ctx, dst) } } func (op *allOperator) gatherNext(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { if item.V == false { - Of(false).SendCtx(ctx, dst) + Of(false).SendContext(ctx, dst) op.all = false operatorOptions.stop() } @@ -69,7 +69,7 @@ type averageFloat32Operator struct { func (op *averageFloat32Operator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { switch v := item.V.(type) { default: - Error(IllegalInputError{error: fmt.Sprintf("expected type: float or int, got: %t", item)}).SendCtx(ctx, dst) + Error(IllegalInputError{error: fmt.Sprintf("expected type: float or int, got: %t", item)}).SendContext(ctx, dst) operatorOptions.stop() case int: op.sum += float32(v) @@ -89,9 +89,9 @@ func (op *averageFloat32Operator) err(ctx context.Context, item Item, dst chan<- func (op *averageFloat32Operator) end(ctx context.Context, dst chan<- Item) { if op.count == 0 { - Of(0).SendCtx(ctx, dst) + Of(0).SendContext(ctx, dst) } else { - Of(op.sum/op.count).SendCtx(ctx, dst) + Of(op.sum/op.count).SendContext(ctx, dst) } } @@ -116,7 +116,7 @@ type averageFloat64Operator struct { func (op *averageFloat64Operator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { switch v := item.V.(type) { default: - Error(IllegalInputError{error: fmt.Sprintf("expected type: float or int, got: %t", item)}).SendCtx(ctx, dst) + Error(IllegalInputError{error: fmt.Sprintf("expected type: float or int, got: %t", item)}).SendContext(ctx, dst) operatorOptions.stop() case int: op.sum += float64(v) @@ -136,9 +136,9 @@ func (op *averageFloat64Operator) err(ctx context.Context, item Item, dst chan<- func (op *averageFloat64Operator) end(ctx context.Context, dst chan<- Item) { if op.count == 0 { - Of(0).SendCtx(ctx, dst) + Of(0).SendContext(ctx, dst) } else { - Of(op.sum/op.count).SendCtx(ctx, dst) + Of(op.sum/op.count).SendContext(ctx, dst) } } @@ -163,7 +163,7 @@ type averageIntOperator struct { func (op *averageIntOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { switch v := item.V.(type) { default: - Error(IllegalInputError{error: fmt.Sprintf("expected type: int, got: %t", item)}).SendCtx(ctx, dst) + Error(IllegalInputError{error: fmt.Sprintf("expected type: int, got: %t", item)}).SendContext(ctx, dst) operatorOptions.stop() case int: op.sum += v @@ -177,9 +177,9 @@ func (op *averageIntOperator) err(ctx context.Context, item Item, dst chan<- Ite func (op *averageIntOperator) end(ctx context.Context, dst chan<- Item) { if op.count == 0 { - Of(0).SendCtx(ctx, dst) + Of(0).SendContext(ctx, dst) } else { - Of(op.sum/op.count).SendCtx(ctx, dst) + Of(op.sum/op.count).SendContext(ctx, dst) } } @@ -204,7 +204,7 @@ type averageInt8Operator struct { func (op *averageInt8Operator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { switch v := item.V.(type) { default: - Error(IllegalInputError{error: fmt.Sprintf("expected type: int8, got: %t", item)}).SendCtx(ctx, dst) + Error(IllegalInputError{error: fmt.Sprintf("expected type: int8, got: %t", item)}).SendContext(ctx, dst) operatorOptions.stop() case int8: op.sum += v @@ -218,9 +218,9 @@ func (op *averageInt8Operator) err(ctx context.Context, item Item, dst chan<- It func (op *averageInt8Operator) end(ctx context.Context, dst chan<- Item) { if op.count == 0 { - Of(0).SendCtx(ctx, dst) + Of(0).SendContext(ctx, dst) } else { - Of(op.sum/op.count).SendCtx(ctx, dst) + Of(op.sum/op.count).SendContext(ctx, dst) } } @@ -245,7 +245,7 @@ type averageInt16Operator struct { func (op *averageInt16Operator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { switch v := item.V.(type) { default: - Error(IllegalInputError{error: fmt.Sprintf("expected type: int16, got: %t", item)}).SendCtx(ctx, dst) + Error(IllegalInputError{error: fmt.Sprintf("expected type: int16, got: %t", item)}).SendContext(ctx, dst) operatorOptions.stop() case int16: op.sum += v @@ -259,9 +259,9 @@ func (op *averageInt16Operator) err(ctx context.Context, item Item, dst chan<- I func (op *averageInt16Operator) end(ctx context.Context, dst chan<- Item) { if op.count == 0 { - Of(0).SendCtx(ctx, dst) + Of(0).SendContext(ctx, dst) } else { - Of(op.sum/op.count).SendCtx(ctx, dst) + Of(op.sum/op.count).SendContext(ctx, dst) } } @@ -286,7 +286,7 @@ type averageInt32Operator struct { func (op *averageInt32Operator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { switch v := item.V.(type) { default: - Error(IllegalInputError{error: fmt.Sprintf("expected type: int32, got: %t", item)}).SendCtx(ctx, dst) + Error(IllegalInputError{error: fmt.Sprintf("expected type: int32, got: %t", item)}).SendContext(ctx, dst) operatorOptions.stop() case int32: op.sum += v @@ -300,9 +300,9 @@ func (op *averageInt32Operator) err(ctx context.Context, item Item, dst chan<- I func (op *averageInt32Operator) end(ctx context.Context, dst chan<- Item) { if op.count == 0 { - Of(0).SendCtx(ctx, dst) + Of(0).SendContext(ctx, dst) } else { - Of(op.sum/op.count).SendCtx(ctx, dst) + Of(op.sum/op.count).SendContext(ctx, dst) } } @@ -327,7 +327,7 @@ type averageInt64Operator struct { func (op *averageInt64Operator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { switch v := item.V.(type) { default: - Error(IllegalInputError{error: fmt.Sprintf("expected type: int64, got: %t", item)}).SendCtx(ctx, dst) + Error(IllegalInputError{error: fmt.Sprintf("expected type: int64, got: %t", item)}).SendContext(ctx, dst) operatorOptions.stop() case int64: op.sum += v @@ -341,9 +341,9 @@ func (op *averageInt64Operator) err(ctx context.Context, item Item, dst chan<- I func (op *averageInt64Operator) end(ctx context.Context, dst chan<- Item) { if op.count == 0 { - Of(0).SendCtx(ctx, dst) + Of(0).SendContext(ctx, dst) } else { - Of(op.sum/op.count).SendCtx(ctx, dst) + Of(op.sum/op.count).SendContext(ctx, dst) } } @@ -374,13 +374,13 @@ func (o *ObservableImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...Option if i.Error() { return i.E } - i.SendCtx(ctx, next) + i.SendContext(ctx, next) } } } go func() { if err := backoff.Retry(f, backOffCfg); err != nil { - Error(err).SendCtx(ctx, next) + Error(err).SendContext(ctx, next) close(next) return } @@ -421,7 +421,7 @@ func (op *bufferWithCountOperator) next(ctx context.Context, item Item, dst chan op.buffer[op.iCount] = item.V op.iCount++ if op.iCount == op.count { - Of(op.buffer).SendCtx(ctx, dst) + Of(op.buffer).SendContext(ctx, dst) op.iCount = 0 op.buffer = make([]interface{}, op.count) } @@ -433,7 +433,7 @@ func (op *bufferWithCountOperator) err(ctx context.Context, item Item, dst chan< func (op *bufferWithCountOperator) end(ctx context.Context, dst chan<- Item) { if op.iCount != 0 { - Of(op.buffer[:op.iCount]).SendCtx(ctx, dst) + Of(op.buffer[:op.iCount]).SendContext(ctx, dst) } } @@ -462,12 +462,12 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser case item, ok := <-observe: if !ok { if len(buffer) != 0 { - Of(buffer).SendCtx(ctx, next) + Of(buffer).SendContext(ctx, next) } return } if item.Error() { - item.SendCtx(ctx, next) + item.SendContext(ctx, next) if option.getErrorStrategy() == StopOnError { return } @@ -476,7 +476,7 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser } case <-time.After(timespan.duration()): if len(buffer) != 0 { - if !Of(buffer).SendCtx(ctx, next) { + if !Of(buffer).SendContext(ctx, next) { return } buffer = make([]interface{}, 0) @@ -509,19 +509,19 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt case item, ok := <-observe: if !ok { if len(buffer) != 0 { - Of(buffer).SendCtx(ctx, next) + Of(buffer).SendContext(ctx, next) } return } if item.Error() { - item.SendCtx(ctx, next) + item.SendContext(ctx, next) if option.getErrorStrategy() == StopOnError { return } } else { buffer = append(buffer, item.V) if len(buffer) == count { - if !Of(buffer).SendCtx(ctx, next) { + if !Of(buffer).SendContext(ctx, next) { return } buffer = make([]interface{}, 0) @@ -529,7 +529,7 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt } case <-time.After(timespan.duration()): if len(buffer) != 0 { - if !Of(buffer).SendCtx(ctx, next) { + if !Of(buffer).SendContext(ctx, next) { return } buffer = make([]interface{}, 0) @@ -558,7 +558,7 @@ type containsOperator struct { func (op *containsOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { if op.equal(item.V) { - Of(true).SendCtx(ctx, dst) + Of(true).SendContext(ctx, dst) op.contains = true operatorOptions.stop() } @@ -570,13 +570,13 @@ func (op *containsOperator) err(ctx context.Context, item Item, dst chan<- Item, func (op *containsOperator) end(ctx context.Context, dst chan<- Item) { if !op.contains { - Of(false).SendCtx(ctx, dst) + Of(false).SendContext(ctx, dst) } } func (op *containsOperator) gatherNext(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { if item.V == true { - Of(true).SendCtx(ctx, dst) + Of(true).SendContext(ctx, dst) operatorOptions.stop() op.contains = true } @@ -603,7 +603,7 @@ func (op *countOperator) err(_ context.Context, _ Item, _ chan<- Item, operatorO } func (op *countOperator) end(ctx context.Context, dst chan<- Item) { - Of(op.count).SendCtx(ctx, dst) + Of(op.count).SendContext(ctx, dst) } func (op *countOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) { @@ -625,7 +625,7 @@ func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable return } if item.Error() { - if !item.SendCtx(ctx, next) { + if !item.SendContext(ctx, next) { return } if option.getErrorStrategy() == StopOnError { @@ -636,7 +636,7 @@ func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable } case <-time.After(timespan.duration()): if latest != nil { - if !Of(latest).SendCtx(ctx, next) { + if !Of(latest).SendContext(ctx, next) { return } latest = nil @@ -666,7 +666,7 @@ type defaultIfEmptyOperator struct { func (op *defaultIfEmptyOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { op.empty = false - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } func (op *defaultIfEmptyOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { @@ -675,7 +675,7 @@ func (op *defaultIfEmptyOperator) err(ctx context.Context, item Item, dst chan<- func (op *defaultIfEmptyOperator) end(ctx context.Context, dst chan<- Item) { if op.empty { - Of(op.defaultValue).SendCtx(ctx, dst) + Of(op.defaultValue).SendContext(ctx, dst) } } @@ -701,13 +701,13 @@ type distinctOperator struct { func (op *distinctOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { key, err := op.apply(ctx, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } _, ok := op.keyset[key] if !ok { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } op.keyset[key] = nil } @@ -726,7 +726,7 @@ func (op *distinctOperator) gatherNext(ctx context.Context, item Item, dst chan< } if _, contains := op.keyset[item.V]; !contains { - Of(item.V).SendCtx(ctx, dst) + Of(item.V).SendContext(ctx, dst) op.keyset[item.V] = nil } } @@ -749,12 +749,12 @@ type distinctUntilChangedOperator struct { func (op *distinctUntilChangedOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { key, err := op.apply(ctx, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } if op.current != key { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) op.current = key } } @@ -868,7 +868,7 @@ type elementAtOperator struct { func (op *elementAtOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { if op.takeCount == int(op.index) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) op.sent = true operatorOptions.stop() return @@ -882,7 +882,7 @@ func (op *elementAtOperator) err(ctx context.Context, item Item, dst chan<- Item func (op *elementAtOperator) end(ctx context.Context, dst chan<- Item) { if !op.sent { - Error(&IllegalInputError{}).SendCtx(ctx, dst) + Error(&IllegalInputError{}).SendContext(ctx, dst) } } @@ -947,7 +947,7 @@ type filterOperator struct { func (op *filterOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { if op.apply(item.V) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } } @@ -972,7 +972,7 @@ func (o *ObservableImpl) First(opts ...Option) OptionalSingle { type firstOperator struct{} func (op *firstOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) operatorOptions.stop() } @@ -1003,7 +1003,7 @@ type firstOrDefaultOperator struct { } func (op *firstOrDefaultOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) op.sent = true operatorOptions.stop() } @@ -1014,7 +1014,7 @@ func (op *firstOrDefaultOperator) err(ctx context.Context, item Item, dst chan<- func (op *firstOrDefaultOperator) end(ctx context.Context, dst chan<- Item) { if !op.sent { - Of(op.defaultValue).SendCtx(ctx, dst) + Of(op.defaultValue).SendContext(ctx, dst) } } @@ -1045,12 +1045,12 @@ func (o *ObservableImpl) FlatMap(apply ItemToObservable, opts ...Option) Observa break loop2 } if item.Error() { - item.SendCtx(ctx, next) + item.SendContext(ctx, next) if option.getErrorStrategy() == StopOnError { return } } else { - if !item.SendCtx(ctx, next) { + if !item.SendContext(ctx, next) { return } } @@ -1151,11 +1151,11 @@ func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts . if idx >= length { err := Error(IndexOutOfBoundError{error: fmt.Sprintf("index %d, length %d", idx, length)}) for i := 0; i < length; i++ { - err.SendCtx(ctx, chs[i]) + err.SendContext(ctx, chs[i]) } return } - item.SendCtx(ctx, chs[idx]) + item.SendContext(ctx, chs[idx]) } } }() @@ -1191,7 +1191,7 @@ func (op *lastOperator) err(ctx context.Context, item Item, dst chan<- Item, ope func (op *lastOperator) end(ctx context.Context, dst chan<- Item) { if !op.empty { - op.last.SendCtx(ctx, dst) + op.last.SendContext(ctx, dst) } } @@ -1227,9 +1227,9 @@ func (op *lastOrDefaultOperator) err(ctx context.Context, item Item, dst chan<- func (op *lastOrDefaultOperator) end(ctx context.Context, dst chan<- Item) { if !op.empty { - op.last.SendCtx(ctx, dst) + op.last.SendContext(ctx, dst) } else { - Of(op.defaultValue).SendCtx(ctx, dst) + Of(op.defaultValue).SendContext(ctx, dst) } } @@ -1250,11 +1250,11 @@ type mapOperator struct { func (op *mapOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { res, err := op.apply(ctx, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } - Of(res).SendCtx(ctx, dst) + Of(res).SendContext(ctx, dst) } func (op *mapOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { @@ -1269,7 +1269,7 @@ func (op *mapOperator) gatherNext(ctx context.Context, item Item, dst chan<- Ite case *mapOperator: return } - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } // Marshal transforms the items emitted by an Observable by applying a marshalling to each item. @@ -1313,7 +1313,7 @@ func (op *maxOperator) err(ctx context.Context, item Item, dst chan<- Item, oper func (op *maxOperator) end(ctx context.Context, dst chan<- Item) { if !op.empty { - Of(op.max).SendCtx(ctx, dst) + Of(op.max).SendContext(ctx, dst) } } @@ -1355,7 +1355,7 @@ func (op *minOperator) err(ctx context.Context, item Item, dst chan<- Item, oper func (op *minOperator) end(ctx context.Context, dst chan<- Item) { if !op.empty { - Of(op.max).SendCtx(ctx, dst) + Of(op.max).SendContext(ctx, dst) } } @@ -1381,7 +1381,7 @@ type onErrorResumeNextOperator struct { } func (op *onErrorResumeNextOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } func (op *onErrorResumeNextOperator) err(_ context.Context, item Item, _ chan<- Item, operatorOptions operatorOptions) { @@ -1407,11 +1407,11 @@ type onErrorReturnOperator struct { } func (op *onErrorReturnOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } func (op *onErrorReturnOperator) err(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { - Of(op.resumeFunc(item.E)).SendCtx(ctx, dst) + Of(op.resumeFunc(item.E)).SendContext(ctx, dst) } func (op *onErrorReturnOperator) end(_ context.Context, _ chan<- Item) { @@ -1432,11 +1432,11 @@ type onErrorReturnItemOperator struct { } func (op *onErrorReturnItemOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } func (op *onErrorReturnItemOperator) err(ctx context.Context, _ Item, dst chan<- Item, _ operatorOptions) { - Of(op.resume).SendCtx(ctx, dst) + Of(op.resume).SendContext(ctx, dst) } func (op *onErrorReturnItemOperator) end(_ context.Context, _ chan<- Item) { @@ -1465,7 +1465,7 @@ func (op *reduceOperator) next(ctx context.Context, item Item, dst chan<- Item, op.empty = false v, err := op.apply(ctx, op.acc, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() op.empty = true return @@ -1481,7 +1481,7 @@ func (op *reduceOperator) err(_ context.Context, item Item, dst chan<- Item, ope func (op *reduceOperator) end(ctx context.Context, dst chan<- Item) { if !op.empty { - Of(op.acc).SendCtx(ctx, dst) + Of(op.acc).SendContext(ctx, dst) } } @@ -1515,7 +1515,7 @@ type repeatOperator struct { } func (op *repeatOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) op.seq = append(op.seq, item) } @@ -1540,7 +1540,7 @@ func (op *repeatOperator) end(ctx context.Context, dst chan<- Item) { time.Sleep(op.frequency.duration()) } for _, v := range op.seq { - v.SendCtx(ctx, dst) + v.SendContext(ctx, dst) } op.count = op.count - 1 } @@ -1570,12 +1570,12 @@ func (o *ObservableImpl) Retry(count int, opts ...Option) Observable { if i.Error() { count-- if count < 0 { - i.SendCtx(ctx, next) + i.SendContext(ctx, next) break loop } observe = o.Observe(opts...) } else { - i.SendCtx(ctx, next) + i.SendContext(ctx, next) } } } @@ -1631,7 +1631,7 @@ func (o *ObservableImpl) Sample(iterable Iterable, opts ...Option) Observable { if !ok { return } - i.SendCtx(ctx, obsCh) + i.SendContext(ctx, obsCh) } } }() @@ -1647,7 +1647,7 @@ func (o *ObservableImpl) Sample(iterable Iterable, opts ...Option) Observable { if !ok { return } - i.SendCtx(ctx, itCh) + i.SendContext(ctx, itCh) } } }() @@ -1702,11 +1702,11 @@ type scanOperator struct { func (op *scanOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { v, err := op.apply(ctx, op.current, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } - Of(v).SendCtx(ctx, dst) + Of(v).SendContext(ctx, dst) op.current = v } @@ -1752,7 +1752,7 @@ func (o *ObservableImpl) Send(output chan<- Item, opts ...Option) { output <- i break loop } - i.SendCtx(ctx, output) + i.SendContext(ctx, output) } } close(output) @@ -1779,7 +1779,7 @@ func (o *ObservableImpl) SequenceEqual(iterable Iterable, opts ...Option) Single if !ok { return } - i.SendCtx(ctx, obsCh) + i.SendContext(ctx, obsCh) } } }() @@ -1795,7 +1795,7 @@ func (o *ObservableImpl) SequenceEqual(iterable Iterable, opts ...Option) Single if !ok { return } - i.SendCtx(ctx, itCh) + i.SendContext(ctx, itCh) } } }() @@ -1831,7 +1831,7 @@ func (o *ObservableImpl) SequenceEqual(iterable Iterable, opts ...Option) Single } } - Of(areCorrect && len(mainSequence) == 0 && len(obsSequence) == 0).SendCtx(ctx, next) + Of(areCorrect && len(mainSequence) == 0 && len(obsSequence) == 0).SendContext(ctx, next) close(next) }() @@ -1911,7 +1911,7 @@ func (o *ObservableImpl) Serialize(from int, identifier func(interface{}) int, o minHeap.Pop() delete(status, id) mutex.Unlock() - Of(itemValue).SendCtx(ctx, next) + Of(itemValue).SendContext(ctx, next) mutex.Lock() atomic.AddInt64(&counter, 1) continue @@ -1950,7 +1950,7 @@ func (op *skipOperator) next(ctx context.Context, item Item, dst chan<- Item, _ op.skipCount++ return } - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } func (op *skipOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { @@ -1985,7 +1985,7 @@ func (op *skipLastOperator) next(ctx context.Context, item Item, dst chan<- Item return } op.skipCount++ - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } func (op *skipLastOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { @@ -2016,11 +2016,11 @@ type skipWhileOperator struct { func (op *skipWhileOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { if !op.skip { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } else { if !op.apply(item.V) { op.skip = false - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } } } @@ -2057,7 +2057,7 @@ func (o *ObservableImpl) StartWith(iterable Iterable, opts ...Option) Observable next <- i return } - i.SendCtx(ctx, next) + i.SendContext(ctx, next) } } observe = o.Observe(opts...) @@ -2071,10 +2071,10 @@ func (o *ObservableImpl) StartWith(iterable Iterable, opts ...Option) Observable break loop2 } if i.Error() { - i.SendCtx(ctx, next) + i.SendContext(ctx, next) return } - i.SendCtx(ctx, next) + i.SendContext(ctx, next) } } }() @@ -2180,7 +2180,7 @@ type takeOperator struct { func (op *takeOperator) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { if op.takeCount < int(op.nth) { op.takeCount++ - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } } @@ -2233,7 +2233,7 @@ func (op *takeLast) end(ctx context.Context, dst chan<- Item) { op.n = op.count } for i := 0; i < op.n; i++ { - Of(op.r.Value).SendCtx(ctx, dst) + Of(op.r.Value).SendContext(ctx, dst) op.r = op.r.Next() } } @@ -2257,7 +2257,7 @@ type takeUntilOperator struct { } func (op *takeUntilOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) if op.apply(item.V) { operatorOptions.stop() return @@ -2294,7 +2294,7 @@ func (op *takeWhileOperator) next(ctx context.Context, item Item, dst chan<- Ite operatorOptions.stop() return } - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } func (op *takeWhileOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { @@ -2323,7 +2323,7 @@ func (o *ObservableImpl) TimeInterval(opts ...Option) Observable { return } if item.Error() { - if !item.SendCtx(ctx, next) { + if !item.SendContext(ctx, next) { return } if option.getErrorStrategy() == StopOnError { @@ -2331,7 +2331,7 @@ func (o *ObservableImpl) TimeInterval(opts ...Option) Observable { } } else { now := time.Now().UTC() - if !Of(now.Sub(latest)).SendCtx(ctx, next) { + if !Of(now.Sub(latest)).SendContext(ctx, next) { return } latest = now @@ -2357,7 +2357,7 @@ func (op *timestampOperator) next(ctx context.Context, item Item, dst chan<- Ite Of(TimestampItem{ Timestamp: time.Now().UTC(), V: item.V, - }).SendCtx(ctx, dst) + }).SendContext(ctx, dst) } func (op *timestampOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { @@ -2390,7 +2390,7 @@ type toMapOperator struct { func (op *toMapOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { k, err := op.keySelector(ctx, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } @@ -2402,7 +2402,7 @@ func (op *toMapOperator) err(ctx context.Context, item Item, dst chan<- Item, op } func (op *toMapOperator) end(ctx context.Context, dst chan<- Item) { - Of(op.m).SendCtx(ctx, dst) + Of(op.m).SendContext(ctx, dst) } func (op *toMapOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) { @@ -2430,14 +2430,14 @@ type toMapWithValueSelector struct { func (op *toMapWithValueSelector) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { k, err := op.keySelector(ctx, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } v, err := op.valueSelector(ctx, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } @@ -2450,7 +2450,7 @@ func (op *toMapWithValueSelector) err(ctx context.Context, item Item, dst chan<- } func (op *toMapWithValueSelector) end(ctx context.Context, dst chan<- Item) { - Of(op.m).SendCtx(ctx, dst) + Of(op.m).SendContext(ctx, dst) } func (op *toMapWithValueSelector) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) { @@ -2527,7 +2527,7 @@ func (op *windowWithCountOperator) pre(ctx context.Context, dst chan<- Item) { if op.currentChannel == nil { ch := op.option.buildChannel() op.currentChannel = ch - Of(FromChannel(ch)).SendCtx(ctx, dst) + Of(FromChannel(ch)).SendContext(ctx, dst) } } @@ -2537,7 +2537,7 @@ func (op *windowWithCountOperator) post(ctx context.Context, dst chan<- Item) { close(op.currentChannel) ch := op.option.buildChannel() op.currentChannel = ch - Of(FromChannel(ch)).SendCtx(ctx, dst) + Of(FromChannel(ch)).SendContext(ctx, dst) } } @@ -2577,7 +2577,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser observe := o.Observe(opts...) ch := option.buildChannel() empty := true - if !Of(FromChannel(ch)).SendCtx(ctx, next) { + if !Of(FromChannel(ch)).SendContext(ctx, next) { return } @@ -2592,7 +2592,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser return } if item.Error() { - if !item.SendCtx(ctx, ch) { + if !item.SendContext(ctx, ch) { return } if option.getErrorStrategy() == StopOnError { @@ -2600,7 +2600,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser return } } - if !item.SendCtx(ctx, ch) { + if !item.SendContext(ctx, ch) { return } empty = false @@ -2611,7 +2611,7 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser close(ch) ch = option.buildChannel() empty = true - if !Of(FromChannel(ch)).SendCtx(ctx, next) { + if !Of(FromChannel(ch)).SendContext(ctx, next) { return } } @@ -2636,7 +2636,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt observe := o.Observe(opts...) ch := option.buildChannel() iCount := 0 - if !Of(FromChannel(ch)).SendCtx(ctx, next) { + if !Of(FromChannel(ch)).SendContext(ctx, next) { return } @@ -2651,7 +2651,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt return } if item.Error() { - if !item.SendCtx(ctx, ch) { + if !item.SendContext(ctx, ch) { return } if option.getErrorStrategy() == StopOnError { @@ -2659,7 +2659,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt return } } - if !item.SendCtx(ctx, ch) { + if !item.SendContext(ctx, ch) { return } iCount++ @@ -2667,7 +2667,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt close(ch) ch = option.buildChannel() iCount = 0 - if !Of(FromChannel(ch)).SendCtx(ctx, next) { + if !Of(FromChannel(ch)).SendContext(ctx, next) { return } } @@ -2678,7 +2678,7 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt close(ch) ch = option.buildChannel() iCount = 0 - if !Of(FromChannel(ch)).SendCtx(ctx, next) { + if !Of(FromChannel(ch)).SendContext(ctx, next) { return } } @@ -2709,7 +2709,7 @@ func (o *ObservableImpl) ZipFromIterable(iterable Iterable, zipper Func2, opts . break loop } if i1.Error() { - i1.SendCtx(ctx, next) + i1.SendContext(ctx, next) return } for { @@ -2721,15 +2721,15 @@ func (o *ObservableImpl) ZipFromIterable(iterable Iterable, zipper Func2, opts . break loop } if i2.Error() { - i2.SendCtx(ctx, next) + i2.SendContext(ctx, next) return } v, err := zipper(ctx, i1.V, i2.V) if err != nil { - Error(err).SendCtx(ctx, next) + Error(err).SendContext(ctx, next) return } - Of(v).SendCtx(ctx, next) + Of(v).SendContext(ctx, next) continue loop } } diff --git a/observable_operator_test.go b/observable_operator_test.go index 153e6dcd..f2fdd69d 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -140,16 +140,14 @@ func Test_Observable_BackOffRetry(t *testing.T) { i := 0 backOffCfg := backoff.NewExponentialBackOff() backOffCfg.InitialInterval = time.Nanosecond - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) if i == 2 { next <- Of(3) - done() } else { i++ next <- Error(errFoo) - done() } }}).BackOffRetry(backoff.WithMaxRetries(backOffCfg, 3)) Assert(context.Background(), t, obs, HasItems(1, 2, 1, 2, 1, 2, 3), HasNoError()) @@ -158,11 +156,10 @@ func Test_Observable_BackOffRetry(t *testing.T) { func Test_Observable_BackOffRetry_Error(t *testing.T) { backOffCfg := backoff.NewExponentialBackOff() backOffCfg.InitialInterval = time.Nanosecond - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Error(errFoo) - done() }}).BackOffRetry(backoff.WithMaxRetries(backOffCfg, 3)) Assert(context.Background(), t, obs, HasItems(1, 2, 1, 2, 1, 2, 1, 2), HasError(errFoo)) } @@ -992,27 +989,24 @@ func Test_Observable_Repeat_Frequency(t *testing.T) { func Test_Observable_Retry(t *testing.T) { i := 0 - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) if i == 2 { next <- Of(3) - done() } else { i++ next <- Error(errFoo) - done() } }}).Retry(3) Assert(context.Background(), t, obs, HasItems(1, 2, 1, 2, 1, 2, 3), HasNoError()) } func Test_Observable_Retry_Error(t *testing.T) { - obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) { + obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) { next <- Of(1) next <- Of(2) next <- Error(errFoo) - done() }}).Retry(3) Assert(context.Background(), t, obs, HasItems(1, 2, 1, 2, 1, 2, 1, 2), HasError(errFoo)) } diff --git a/single.go b/single.go index e939e60a..09e6e888 100644 --- a/single.go +++ b/single.go @@ -54,11 +54,11 @@ type mapOperatorSingle struct { func (op *mapOperatorSingle) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { res, err := op.apply(ctx, item.V) if err != nil { - Error(err).SendCtx(ctx, dst) + Error(err).SendContext(ctx, dst) operatorOptions.stop() return } - Of(res).SendCtx(ctx, dst) + Of(res).SendContext(ctx, dst) } func (op *mapOperatorSingle) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { @@ -73,7 +73,7 @@ func (op *mapOperatorSingle) gatherNext(ctx context.Context, item Item, dst chan case *mapOperatorSingle: return } - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } // Observe observes a Single by returning its channel. @@ -87,7 +87,7 @@ type filterOperatorSingle struct { func (op *filterOperatorSingle) next(ctx context.Context, item Item, dst chan<- Item, _ operatorOptions) { if op.apply(item.V) { - item.SendCtx(ctx, dst) + item.SendContext(ctx, dst) } } diff --git a/types.go b/types.go index 3e29500e..1187b02a 100644 --- a/types.go +++ b/types.go @@ -32,7 +32,7 @@ type ( // Unmarshaller defines an unmarshaller type ([]byte to interface). Unmarshaller func([]byte, interface{}) error // Producer defines a producer implementation. - Producer func(ctx context.Context, next chan<- Item, done func()) + Producer func(ctx context.Context, next chan<- Item) // Supplier defines a function that supplies a result from nothing. Supplier func(ctx context.Context) Item // Disposed is a notification channel indicating when an Observable is closed.