diff --git a/assert.go b/assert.go index 84be548c..05b41209 100644 --- a/assert.go +++ b/assert.go @@ -18,6 +18,7 @@ type RxAssert interface { noItemsToBeChecked() bool someItemsToBeChecked() bool raisedErrorToBeChecked() (bool, error) + raisedErrorsToBeChecked() (bool, []error) raisedAnErrorToBeChecked() (bool, error) notRaisedErrorToBeChecked() bool itemToBeChecked() (bool, interface{}) @@ -34,7 +35,9 @@ type rxAssert struct { checkHasItemsNoOrder bool itemsNoOrder []interface{} checkHasRaisedError bool - error error + err error + checkHasRaisedErrors bool + errs []error checkHasRaisedAnError bool checkHasNotRaisedError bool checkHasItem bool @@ -65,11 +68,15 @@ func (ass *rxAssert) someItemsToBeChecked() bool { } func (ass *rxAssert) raisedErrorToBeChecked() (bool, error) { - return ass.checkHasRaisedError, ass.error + return ass.checkHasRaisedError, ass.err +} + +func (ass *rxAssert) raisedErrorsToBeChecked() (bool, []error) { + return ass.checkHasRaisedErrors, ass.errs } func (ass *rxAssert) raisedAnErrorToBeChecked() (bool, error) { - return ass.checkHasRaisedAnError, ass.error + return ass.checkHasRaisedAnError, ass.err } func (ass *rxAssert) notRaisedErrorToBeChecked() bool { @@ -128,7 +135,7 @@ func HasItemsNoParticularOrder(items ...interface{}) RxAssert { func HasRaisedError(err error) RxAssert { return newAssertion(func(a *rxAssert) { a.checkHasRaisedError = true - a.error = err + a.err = err }) } @@ -139,6 +146,14 @@ func HasRaisedAnError() RxAssert { }) } +// HasRaisedErrors checks that the observable has produce a set of errors. +func HasRaisedErrors(errs ...error) RxAssert { + return newAssertion(func(a *rxAssert) { + a.checkHasRaisedErrors = true + a.errs = errs + }) +} + // HasNotRaisedError checks that the observable has not raised any error. func HasNotRaisedError() RxAssert { return newAssertion(func(a *rxAssert) { @@ -185,7 +200,7 @@ func Assert(ctx context.Context, t *testing.T, iterable Iterable, assertions ... ass := parseAssertions(assertions...) got := make([]interface{}, 0) - var err error + errs := make([]error, 0) observe := iterable.Observe() loop: @@ -198,7 +213,7 @@ loop: break loop } if item.Error() { - err = item.E + errs = append(errs, item.E) } else { got = append(got, item.V) } @@ -237,12 +252,19 @@ loop: assert.NotEqual(t, 0, len(got)) } if checkHasRaisedError, expectedError := ass.raisedErrorToBeChecked(); checkHasRaisedError { - assert.Equal(t, expectedError, err) + if expectedError == nil { + assert.Equal(t, 0, len(errs)) + } else { + assert.Equal(t, expectedError, errs[0]) + } + } + if checkHasRaisedErrors, expectedErrors := ass.raisedErrorsToBeChecked(); checkHasRaisedErrors { + assert.Equal(t, expectedErrors, errs) } if checkHasRaisedAnError, expectedError := ass.raisedAnErrorToBeChecked(); checkHasRaisedAnError { assert.Nil(t, expectedError) } if ass.notRaisedErrorToBeChecked() { - assert.Nil(t, err) + assert.Equal(t, 0, len(errs)) } } diff --git a/factory.go b/factory.go index ea4d6e27..ce28cb6f 100644 --- a/factory.go +++ b/factory.go @@ -196,7 +196,7 @@ func FromEventSource(next <-chan Item, opts ...Option) Observable { option := parseOptions(opts...) return &ObservableImpl{ - iterable: newEventSourceIterable(option.buildContext(), next, option.buildBackPressureStrategy()), + iterable: newEventSourceIterable(option.buildContext(), next, option.getBackPressureStrategy()), } } @@ -221,7 +221,7 @@ func Interval(interval Duration, opts ...Option) Observable { } }() return &ObservableImpl{ - iterable: newEventSourceIterable(ctx, next, option.buildBackPressureStrategy()), + iterable: newEventSourceIterable(ctx, next, option.getBackPressureStrategy()), } } diff --git a/factory_test.go b/factory_test.go index fac3cfda..49140b51 100644 --- a/factory_test.go +++ b/factory_test.go @@ -187,27 +187,6 @@ func Test_Defer_Error(t *testing.T) { Assert(context.Background(), t, obs, HasItems(1, 2), HasRaisedError(errFoo)) } -func Test_Defer_SimpleCapacity(t *testing.T) { - ch := Defer([]Producer{func(_ context.Context, _ chan<- Item, done func()) { - done() - }}, WithBufferedChannel(5)).Observe() - assert.Equal(t, 5, cap(ch)) -} - -func Test_Defer_ComposedCapacity(t *testing.T) { - obs1 := Defer([]Producer{func(_ context.Context, _ chan<- Item, done func()) { - done() - }}).Map(func(_ interface{}) (interface{}, error) { - return 1, nil - }, WithBufferedChannel(11)) - assert.Equal(t, 11, cap(obs1.Observe())) - - obs2 := obs1.Map(func(_ interface{}) (interface{}, error) { - return 1, nil - }, WithBufferedChannel(12)) - assert.Equal(t, 12, cap(obs2.Observe())) -} - func Test_Empty(t *testing.T) { obs := Empty() Assert(context.Background(), t, obs, HasNoItems()) diff --git a/iterable.go b/iterable.go index 6a919a79..efaa40f2 100644 --- a/iterable.go +++ b/iterable.go @@ -2,5 +2,5 @@ package rxgo // Iterable is the basic type that can be observed. type Iterable interface { - Observe() <-chan Item + Observe(opts ...Option) <-chan Item } diff --git a/iterable_channel.go b/iterable_channel.go index 1323a611..05887afb 100644 --- a/iterable_channel.go +++ b/iterable_channel.go @@ -8,6 +8,6 @@ func newChannelIterable(next <-chan Item) Iterable { return &channelIterable{next: next} } -func (i *channelIterable) Observe() <-chan Item { +func (i *channelIterable) Observe(_ ...Option) <-chan Item { return i.next } diff --git a/iterable_create.go b/iterable_create.go index a47d6a8c..b762d92b 100644 --- a/iterable_create.go +++ b/iterable_create.go @@ -33,6 +33,6 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable { } } -func (i *createIterable) Observe() <-chan Item { +func (i *createIterable) Observe(_ ...Option) <-chan Item { return i.next } diff --git a/iterable_defer.go b/iterable_defer.go index 7aa078b2..51274c15 100644 --- a/iterable_defer.go +++ b/iterable_defer.go @@ -16,8 +16,8 @@ func newDeferIterable(f []Producer, opts ...Option) Iterable { } } -func (i *deferIterable) Observe() <-chan Item { - option := parseOptions(i.opts...) +func (i *deferIterable) Observe(opts ...Option) <-chan Item { + option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() ctx := option.buildContext() diff --git a/iterable_eventsource.go b/iterable_eventsource.go index f213b536..d79a49a3 100644 --- a/iterable_eventsource.go +++ b/iterable_eventsource.go @@ -62,8 +62,8 @@ func (i *eventSourceIterable) closeAllObservers() { i.Unlock() } -func (i *eventSourceIterable) Observe() <-chan Item { - option := parseOptions(i.opts...) +func (i *eventSourceIterable) Observe(opts ...Option) <-chan Item { + option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() i.Lock() diff --git a/iterable_factory.go b/iterable_factory.go index d04c0a4a..807553f8 100644 --- a/iterable_factory.go +++ b/iterable_factory.go @@ -1,13 +1,14 @@ package rxgo type factoryIterable struct { - factory func() <-chan Item + factory func(opts ...Option) <-chan Item } -func newColdIterable(factory func() <-chan Item) Iterable { +// TODO Replace by create +func newColdIterable(factory func(opts ...Option) <-chan Item) Iterable { return &factoryIterable{factory: factory} } -func (i *factoryIterable) Observe() <-chan Item { - return i.factory() +func (i *factoryIterable) Observe(opts ...Option) <-chan Item { + return i.factory(opts...) } diff --git a/iterable_just.go b/iterable_just.go index 6b323c4f..737cde28 100644 --- a/iterable_just.go +++ b/iterable_just.go @@ -12,8 +12,8 @@ func newJustIterable(items interface{}, opts ...Option) Iterable { } } -func (i *justIterable) Observe() <-chan Item { - option := parseOptions(i.opts...) +func (i *justIterable) Observe(opts ...Option) <-chan Item { + option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() go SendItems(next, CloseChannel, i.items) diff --git a/iterable_range.go b/iterable_range.go index 650d0957..0a705772 100644 --- a/iterable_range.go +++ b/iterable_range.go @@ -13,8 +13,8 @@ func newRangeIterable(start, count int, opts ...Option) Iterable { } } -func (i *rangeIterable) Observe() <-chan Item { - option := parseOptions(i.opts...) +func (i *rangeIterable) Observe(opts ...Option) <-chan Item { + option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() go func() { diff --git a/iterable_slice.go b/iterable_slice.go index a904c2b8..d096d96c 100644 --- a/iterable_slice.go +++ b/iterable_slice.go @@ -12,8 +12,8 @@ func newSliceIterable(items []Item, opts ...Option) Iterable { } } -func (i *sliceIterable) Observe() <-chan Item { - option := parseOptions(i.opts...) +func (i *sliceIterable) Observe(opts ...Option) <-chan Item { + option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() go func() { diff --git a/observable.go b/observable.go index 337a57c8..4dca73cd 100644 --- a/observable.go +++ b/observable.go @@ -96,42 +96,47 @@ func defaultEndFuncOperator(_ context.Context, _ chan<- Item) {} func operator(iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd, opts ...Option) Iterable { option := parseOptions(opts...) - if option.withEagerObservation() { + if option.isEagerObservation() { next := option.buildChannel() ctx := option.buildContext() - if withPool, pool := option.withPool(); withPool { - parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc) + if withPool, pool := option.getPool(); withPool { + parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc, option, opts...) } else { - seq(ctx, next, iterable, nextFunc, errFunc, endFunc) + seq(ctx, next, iterable, nextFunc, errFunc, endFunc, option, opts...) } return newChannelIterable(next) } return &ObservableImpl{ - iterable: newColdIterable(func() <-chan Item { + iterable: newColdIterable(func(propagatedOptions ...Option) <-chan Item { + mergedOptions := append(opts, propagatedOptions...) + option = parseOptions(mergedOptions...) + next := option.buildChannel() ctx := option.buildContext() - if withPool, pool := option.withPool(); withPool { - parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc) + if withPool, pool := option.getPool(); withPool { + parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc, option, mergedOptions...) } else { - seq(ctx, next, iterable, nextFunc, errFunc, endFunc) + seq(ctx, next, iterable, nextFunc, errFunc, endFunc, option, mergedOptions...) } return next }), } } -func seq(ctx context.Context, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd) { +func seq(ctx context.Context, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd, option Option, opts ...Option) { go func() { stopped := false - observe := iterable.Observe() + observe := iterable.Observe(opts...) operator := operatorOptions{ stop: func() { - stopped = true + if option.getErrorStrategy() == StopOnError { + stopped = true + } }, resetIterable: func(newIterable Iterable) { - observe = newIterable.Observe() + observe = newIterable.Observe(opts...) }, } @@ -156,12 +161,14 @@ func seq(ctx context.Context, next chan Item, iterable Iterable, nextFunc, errFu }() } -func parallel(ctx context.Context, pool int, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd) { +func parallel(ctx context.Context, pool int, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd, option Option, opts ...Option) { stopped := abool.New() - observe := iterable.Observe() + observe := iterable.Observe(opts...) operator := operatorOptions{ stop: func() { - stopped.Set() + if option.getErrorStrategy() == StopOnError { + stopped.Set() + } }, // TODO Can we implement a reset strategy with a parallel implementation resetIterable: func(_ Iterable) {}, diff --git a/observable_operator.go b/observable_operator.go index 877c0ca4..42585009 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -867,8 +867,8 @@ func (o *ObservableImpl) Min(comparator Comparator, opts ...Option) OptionalSing } // Observe observes an Observable by returning its channel. -func (o *ObservableImpl) Observe() <-chan Item { - return o.iterable.Observe() +func (o *ObservableImpl) Observe(opts ...Option) <-chan Item { + return o.iterable.Observe(opts...) } // OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking diff --git a/observable_operator_test.go b/observable_operator_test.go index b0ad76f4..20e2869b 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -1125,3 +1125,48 @@ func Test_Observable_ZipFromObservable_DifferentLength2(t *testing.T) { zip := obs1.ZipFromIterable(obs2, zipper) Assert(context.Background(), t, zip, HasItems(11, 22)) } + +func Test_Observable_Option_WithOnErrorStrategy_Single(t *testing.T) { + obs := testObservable(1, 2, 3). + Map(func(i interface{}) (interface{}, error) { + if i == 2 { + return nil, errFoo + } + return i, nil + }, WithErrorStrategy(ContinueOnError)) + Assert(context.Background(), t, obs, HasItems(1, 3), HasRaisedError(errFoo)) +} + +func Test_Observable_Option_WithOnErrorStrategy_Propagate(t *testing.T) { + obs := testObservable(1, 2, 3). + Map(func(i interface{}) (interface{}, error) { + if i == 1 { + return nil, errFoo + } + return i, nil + }). + Map(func(i interface{}) (interface{}, error) { + if i == 2 { + return nil, errBar + } + return i, nil + }, WithErrorStrategy(ContinueOnError)) + Assert(context.Background(), t, obs, HasItems(3), HasRaisedErrors(errFoo, errBar)) +} + +func Test_Observable_Option_SimpleCapacity(t *testing.T) { + ch := Just(1, WithBufferedChannel(5)).Observe() + assert.Equal(t, 5, cap(ch)) +} + +func Test_Observable_Option_ComposedCapacity(t *testing.T) { + obs1 := Just(1).Map(func(_ interface{}) (interface{}, error) { + return 1, nil + }, WithBufferedChannel(11)) + obs2 := obs1.Map(func(_ interface{}) (interface{}, error) { + return 1, nil + }, WithBufferedChannel(12)) + + assert.Equal(t, 11, cap(obs1.Observe())) + assert.Equal(t, 12, cap(obs2.Observe())) +} diff --git a/optionalsingle.go b/optionalsingle.go index b69b7012..a747c244 100644 --- a/optionalsingle.go +++ b/optionalsingle.go @@ -18,6 +18,6 @@ type OptionalSingleImpl struct { } // Observe observes an OptionalSingle by returning its channel. -func (o *OptionalSingleImpl) Observe() <-chan Item { - return o.iterable.Observe() +func (o *OptionalSingleImpl) Observe(opts ...Option) <-chan Item { + return o.iterable.Observe(opts...) } diff --git a/options.go b/options.go index 8a66a313..a58c8de4 100644 --- a/options.go +++ b/options.go @@ -8,59 +8,61 @@ import ( // Option handles configurable options. type Option interface { apply(*funcOption) - withBuffer() (bool, int) - withContext() (bool, context.Context) - withEagerObservation() bool - withPool() (bool, int) + toPropagate() bool + isEagerObservation() bool + getPool() (bool, int) buildChannel() chan Item buildContext() context.Context - buildBackPressureStrategy() BackpressureStrategy + getBackPressureStrategy() BackpressureStrategy + getErrorStrategy() OnErrorStrategy } type funcOption struct { f func(*funcOption) - toBuffer bool + isBuffer bool buffer int ctx context.Context eagerObservation bool pool int backPressureStrategy BackpressureStrategy + onErrorStrategy OnErrorStrategy + propagate bool } -func (fdo *funcOption) withBuffer() (bool, int) { - return fdo.toBuffer, fdo.buffer +func (fdo *funcOption) toPropagate() bool { + return fdo.propagate } -func (fdo *funcOption) withContext() (bool, context.Context) { - return fdo.ctx != nil, fdo.ctx -} - -func (fdo *funcOption) withEagerObservation() bool { +func (fdo *funcOption) isEagerObservation() bool { return fdo.eagerObservation } -func (fdo *funcOption) withPool() (bool, int) { +func (fdo *funcOption) getPool() (bool, int) { return fdo.pool > 0, fdo.pool } func (fdo *funcOption) buildChannel() chan Item { - if toBeBuffered, cap := fdo.withBuffer(); toBeBuffered { - return make(chan Item, cap) + if fdo.isBuffer { + return make(chan Item, fdo.buffer) } return make(chan Item) } func (fdo *funcOption) buildContext() context.Context { - if withContext, c := fdo.withContext(); withContext { - return c + if fdo.ctx == nil { + return context.Background() } - return context.Background() + return fdo.ctx } -func (fdo *funcOption) buildBackPressureStrategy() BackpressureStrategy { +func (fdo *funcOption) getBackPressureStrategy() BackpressureStrategy { return fdo.backPressureStrategy } +func (fdo *funcOption) getErrorStrategy() OnErrorStrategy { + return fdo.onErrorStrategy +} + func (fdo *funcOption) apply(do *funcOption) { fdo.f(do) } @@ -82,7 +84,7 @@ func parseOptions(opts ...Option) Option { // WithBufferedChannel allows to configure the capacity of a buffered channel. func WithBufferedChannel(capacity int) Option { return newFuncOption(func(options *funcOption) { - options.toBuffer = true + options.isBuffer = true options.buffer = capacity }) } @@ -121,3 +123,9 @@ func WithBackPressureStrategy(strategy BackpressureStrategy) Option { options.backPressureStrategy = strategy }) } + +func WithErrorStrategy(strategy OnErrorStrategy) Option { + return newFuncOption(func(options *funcOption) { + options.onErrorStrategy = strategy + }) +} diff --git a/single.go b/single.go index 25bdc63b..40a3ac97 100644 --- a/single.go +++ b/single.go @@ -21,8 +21,8 @@ func newSingleFromOperator(iterable Iterable, nextFunc, errFunc operatorItem, en } // Observe observes a Single by returning its channel. -func (s *SingleImpl) Observe() <-chan Item { - return s.iterable.Observe() +func (s *SingleImpl) Observe(opts ...Option) <-chan Item { + return s.iterable.Observe(opts...) } // Filter emits only those items from a Single that pass a predicate test. diff --git a/types.go b/types.go index d0a184b2..fd11337d 100644 --- a/types.go +++ b/types.go @@ -5,6 +5,7 @@ import "context" type ( // BackpressureStrategy is the backpressure strategy type BackpressureStrategy uint32 + OnErrorStrategy uint32 operatorOptions struct { stop func() @@ -58,3 +59,8 @@ const ( // Drop drops the message. Drop ) + +const ( + StopOnError OnErrorStrategy = iota + ContinueOnError +)