Skip to content

Commit

Permalink
Error strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 20, 2020
1 parent ffbf019 commit d8026a5
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 90 deletions.
38 changes: 30 additions & 8 deletions assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type RxAssert interface {
noItemsToBeChecked() bool
someItemsToBeChecked() bool
raisedErrorToBeChecked() (bool, error)
raisedErrorsToBeChecked() (bool, []error)
raisedAnErrorToBeChecked() (bool, error)
notRaisedErrorToBeChecked() bool
itemToBeChecked() (bool, interface{})
Expand All @@ -34,7 +35,9 @@ type rxAssert struct {
checkHasItemsNoOrder bool
itemsNoOrder []interface{}
checkHasRaisedError bool
error error
err error
checkHasRaisedErrors bool
errs []error
checkHasRaisedAnError bool
checkHasNotRaisedError bool
checkHasItem bool
Expand Down Expand Up @@ -65,11 +68,15 @@ func (ass *rxAssert) someItemsToBeChecked() bool {
}

func (ass *rxAssert) raisedErrorToBeChecked() (bool, error) {
return ass.checkHasRaisedError, ass.error
return ass.checkHasRaisedError, ass.err
}

func (ass *rxAssert) raisedErrorsToBeChecked() (bool, []error) {
return ass.checkHasRaisedErrors, ass.errs
}

func (ass *rxAssert) raisedAnErrorToBeChecked() (bool, error) {
return ass.checkHasRaisedAnError, ass.error
return ass.checkHasRaisedAnError, ass.err
}

func (ass *rxAssert) notRaisedErrorToBeChecked() bool {
Expand Down Expand Up @@ -128,7 +135,7 @@ func HasItemsNoParticularOrder(items ...interface{}) RxAssert {
func HasRaisedError(err error) RxAssert {
return newAssertion(func(a *rxAssert) {
a.checkHasRaisedError = true
a.error = err
a.err = err
})
}

Expand All @@ -139,6 +146,14 @@ func HasRaisedAnError() RxAssert {
})
}

// HasRaisedErrors checks that the observable has produce a set of errors.
func HasRaisedErrors(errs ...error) RxAssert {
return newAssertion(func(a *rxAssert) {
a.checkHasRaisedErrors = true
a.errs = errs
})
}

// HasNotRaisedError checks that the observable has not raised any error.
func HasNotRaisedError() RxAssert {
return newAssertion(func(a *rxAssert) {
Expand Down Expand Up @@ -185,7 +200,7 @@ func Assert(ctx context.Context, t *testing.T, iterable Iterable, assertions ...
ass := parseAssertions(assertions...)

got := make([]interface{}, 0)
var err error
errs := make([]error, 0)

observe := iterable.Observe()
loop:
Expand All @@ -198,7 +213,7 @@ loop:
break loop
}
if item.Error() {
err = item.E
errs = append(errs, item.E)
} else {
got = append(got, item.V)
}
Expand Down Expand Up @@ -237,12 +252,19 @@ loop:
assert.NotEqual(t, 0, len(got))
}
if checkHasRaisedError, expectedError := ass.raisedErrorToBeChecked(); checkHasRaisedError {
assert.Equal(t, expectedError, err)
if expectedError == nil {
assert.Equal(t, 0, len(errs))
} else {
assert.Equal(t, expectedError, errs[0])
}
}
if checkHasRaisedErrors, expectedErrors := ass.raisedErrorsToBeChecked(); checkHasRaisedErrors {
assert.Equal(t, expectedErrors, errs)
}
if checkHasRaisedAnError, expectedError := ass.raisedAnErrorToBeChecked(); checkHasRaisedAnError {
assert.Nil(t, expectedError)
}
if ass.notRaisedErrorToBeChecked() {
assert.Nil(t, err)
assert.Equal(t, 0, len(errs))
}
}
4 changes: 2 additions & 2 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func FromEventSource(next <-chan Item, opts ...Option) Observable {
option := parseOptions(opts...)

return &ObservableImpl{
iterable: newEventSourceIterable(option.buildContext(), next, option.buildBackPressureStrategy()),
iterable: newEventSourceIterable(option.buildContext(), next, option.getBackPressureStrategy()),
}
}

Expand All @@ -221,7 +221,7 @@ func Interval(interval Duration, opts ...Option) Observable {
}
}()
return &ObservableImpl{
iterable: newEventSourceIterable(ctx, next, option.buildBackPressureStrategy()),
iterable: newEventSourceIterable(ctx, next, option.getBackPressureStrategy()),
}
}

Expand Down
21 changes: 0 additions & 21 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,6 @@ func Test_Defer_Error(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(1, 2), HasRaisedError(errFoo))
}

func Test_Defer_SimpleCapacity(t *testing.T) {
ch := Defer([]Producer{func(_ context.Context, _ chan<- Item, done func()) {
done()
}}, WithBufferedChannel(5)).Observe()
assert.Equal(t, 5, cap(ch))
}

func Test_Defer_ComposedCapacity(t *testing.T) {
obs1 := Defer([]Producer{func(_ context.Context, _ chan<- Item, done func()) {
done()
}}).Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
assert.Equal(t, 11, cap(obs1.Observe()))

obs2 := obs1.Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))
assert.Equal(t, 12, cap(obs2.Observe()))
}

func Test_Empty(t *testing.T) {
obs := Empty()
Assert(context.Background(), t, obs, HasNoItems())
Expand Down
2 changes: 1 addition & 1 deletion iterable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package rxgo

// Iterable is the basic type that can be observed.
type Iterable interface {
Observe() <-chan Item
Observe(opts ...Option) <-chan Item
}
2 changes: 1 addition & 1 deletion iterable_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ func newChannelIterable(next <-chan Item) Iterable {
return &channelIterable{next: next}
}

func (i *channelIterable) Observe() <-chan Item {
func (i *channelIterable) Observe(_ ...Option) <-chan Item {
return i.next
}
2 changes: 1 addition & 1 deletion iterable_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable {
}
}

func (i *createIterable) Observe() <-chan Item {
func (i *createIterable) Observe(_ ...Option) <-chan Item {
return i.next
}
4 changes: 2 additions & 2 deletions iterable_defer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func newDeferIterable(f []Producer, opts ...Option) Iterable {
}
}

func (i *deferIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
func (i *deferIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()
ctx := option.buildContext()

Expand Down
4 changes: 2 additions & 2 deletions iterable_eventsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (i *eventSourceIterable) closeAllObservers() {
i.Unlock()
}

func (i *eventSourceIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
func (i *eventSourceIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()

i.Lock()
Expand Down
9 changes: 5 additions & 4 deletions iterable_factory.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package rxgo

type factoryIterable struct {
factory func() <-chan Item
factory func(opts ...Option) <-chan Item
}

func newColdIterable(factory func() <-chan Item) Iterable {
// TODO Replace by create
func newColdIterable(factory func(opts ...Option) <-chan Item) Iterable {
return &factoryIterable{factory: factory}
}

func (i *factoryIterable) Observe() <-chan Item {
return i.factory()
func (i *factoryIterable) Observe(opts ...Option) <-chan Item {
return i.factory(opts...)
}
4 changes: 2 additions & 2 deletions iterable_just.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ func newJustIterable(items interface{}, opts ...Option) Iterable {
}
}

func (i *justIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
func (i *justIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()

go SendItems(next, CloseChannel, i.items)
Expand Down
4 changes: 2 additions & 2 deletions iterable_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func newRangeIterable(start, count int, opts ...Option) Iterable {
}
}

func (i *rangeIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
func (i *rangeIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()

go func() {
Expand Down
4 changes: 2 additions & 2 deletions iterable_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ func newSliceIterable(items []Item, opts ...Option) Iterable {
}
}

func (i *sliceIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
func (i *sliceIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()

go func() {
Expand Down
37 changes: 22 additions & 15 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,42 +96,47 @@ func defaultEndFuncOperator(_ context.Context, _ chan<- Item) {}
func operator(iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd, opts ...Option) Iterable {
option := parseOptions(opts...)

if option.withEagerObservation() {
if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
if withPool, pool := option.withPool(); withPool {
parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc)
if withPool, pool := option.getPool(); withPool {
parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc, option, opts...)
} else {
seq(ctx, next, iterable, nextFunc, errFunc, endFunc)
seq(ctx, next, iterable, nextFunc, errFunc, endFunc, option, opts...)
}

return newChannelIterable(next)
}

return &ObservableImpl{
iterable: newColdIterable(func() <-chan Item {
iterable: newColdIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option = parseOptions(mergedOptions...)

next := option.buildChannel()
ctx := option.buildContext()
if withPool, pool := option.withPool(); withPool {
parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc)
if withPool, pool := option.getPool(); withPool {
parallel(ctx, pool, next, iterable, nextFunc, errFunc, endFunc, option, mergedOptions...)
} else {
seq(ctx, next, iterable, nextFunc, errFunc, endFunc)
seq(ctx, next, iterable, nextFunc, errFunc, endFunc, option, mergedOptions...)
}
return next
}),
}
}

func seq(ctx context.Context, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd) {
func seq(ctx context.Context, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd, option Option, opts ...Option) {
go func() {
stopped := false
observe := iterable.Observe()
observe := iterable.Observe(opts...)
operator := operatorOptions{
stop: func() {
stopped = true
if option.getErrorStrategy() == StopOnError {
stopped = true
}
},
resetIterable: func(newIterable Iterable) {
observe = newIterable.Observe()
observe = newIterable.Observe(opts...)
},
}

Expand All @@ -156,12 +161,14 @@ func seq(ctx context.Context, next chan Item, iterable Iterable, nextFunc, errFu
}()
}

func parallel(ctx context.Context, pool int, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd) {
func parallel(ctx context.Context, pool int, next chan Item, iterable Iterable, nextFunc, errFunc operatorItem, endFunc operatorEnd, option Option, opts ...Option) {
stopped := abool.New()
observe := iterable.Observe()
observe := iterable.Observe(opts...)
operator := operatorOptions{
stop: func() {
stopped.Set()
if option.getErrorStrategy() == StopOnError {
stopped.Set()
}
},
// TODO Can we implement a reset strategy with a parallel implementation
resetIterable: func(_ Iterable) {},
Expand Down
4 changes: 2 additions & 2 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,8 @@ func (o *ObservableImpl) Min(comparator Comparator, opts ...Option) OptionalSing
}

// Observe observes an Observable by returning its channel.
func (o *ObservableImpl) Observe() <-chan Item {
return o.iterable.Observe()
func (o *ObservableImpl) Observe(opts ...Option) <-chan Item {
return o.iterable.Observe(opts...)
}

// OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking
Expand Down
45 changes: 45 additions & 0 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,3 +1125,48 @@ func Test_Observable_ZipFromObservable_DifferentLength2(t *testing.T) {
zip := obs1.ZipFromIterable(obs2, zipper)
Assert(context.Background(), t, zip, HasItems(11, 22))
}

func Test_Observable_Option_WithOnErrorStrategy_Single(t *testing.T) {
obs := testObservable(1, 2, 3).
Map(func(i interface{}) (interface{}, error) {
if i == 2 {
return nil, errFoo
}
return i, nil
}, WithErrorStrategy(ContinueOnError))
Assert(context.Background(), t, obs, HasItems(1, 3), HasRaisedError(errFoo))
}

func Test_Observable_Option_WithOnErrorStrategy_Propagate(t *testing.T) {
obs := testObservable(1, 2, 3).
Map(func(i interface{}) (interface{}, error) {
if i == 1 {
return nil, errFoo
}
return i, nil
}).
Map(func(i interface{}) (interface{}, error) {
if i == 2 {
return nil, errBar
}
return i, nil
}, WithErrorStrategy(ContinueOnError))
Assert(context.Background(), t, obs, HasItems(3), HasRaisedErrors(errFoo, errBar))
}

func Test_Observable_Option_SimpleCapacity(t *testing.T) {
ch := Just(1, WithBufferedChannel(5)).Observe()
assert.Equal(t, 5, cap(ch))
}

func Test_Observable_Option_ComposedCapacity(t *testing.T) {
obs1 := Just(1).Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
obs2 := obs1.Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))

assert.Equal(t, 11, cap(obs1.Observe()))
assert.Equal(t, 12, cap(obs2.Observe()))
}
4 changes: 2 additions & 2 deletions optionalsingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ type OptionalSingleImpl struct {
}

// Observe observes an OptionalSingle by returning its channel.
func (o *OptionalSingleImpl) Observe() <-chan Item {
return o.iterable.Observe()
func (o *OptionalSingleImpl) Observe(opts ...Option) <-chan Item {
return o.iterable.Observe(opts...)
}
Loading

0 comments on commit d8026a5

Please sign in to comment.