Skip to content

Commit

Permalink
Single and optional single
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 15, 2020
1 parent 1f68290 commit 555edd3
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 30 deletions.
90 changes: 90 additions & 0 deletions assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type RxAssert interface {
noItemsToBeChecked() bool
raisedErrorToBeChecked() (bool, error)
notRaisedErrorToBeChecked() bool
valueToBeChecked() (bool, interface{})
noValueToBeChecked() (bool, interface{})
}

type rxAssert struct {
Expand All @@ -24,6 +26,9 @@ type rxAssert struct {
checkHasRaisedError bool
error error
checkHasNotRaisedError bool
checkHasValue bool
value interface{}
checkHasNoValue bool
}

func (ass *rxAssert) apply(do *rxAssert) {
Expand All @@ -46,6 +51,14 @@ func (ass *rxAssert) notRaisedErrorToBeChecked() bool {
return ass.checkHasNotRaisedError
}

func (ass *rxAssert) valueToBeChecked() (bool, interface{}) {
return ass.checkHasValue, ass.value
}

func (ass *rxAssert) noValueToBeChecked() (bool, interface{}) {
return ass.checkHasNoValue, ass.value
}

func newAssertion(f func(*rxAssert)) *rxAssert {
return &rxAssert{
f: f,
Expand Down Expand Up @@ -82,6 +95,19 @@ func HasNotRaisedError() RxAssert {
})
}

func HasValue(i interface{}) RxAssert {
return newAssertion(func(a *rxAssert) {
a.checkHasValue = true
a.value = i
})
}

func HasNoValue() RxAssert {
return newAssertion(func(a *rxAssert) {
a.checkHasNoValue = true
})
}

func parseAssertions(assertions ...RxAssert) RxAssert {
ass := new(rxAssert)
for _, assertion := range assertions {
Expand Down Expand Up @@ -125,3 +151,67 @@ func AssertObservable(ctx context.Context, t *testing.T, observable Observable,
assert.Nil(t, err)
}
}

func AssertSingle(ctx context.Context, t *testing.T, single Single, assertions ...RxAssert) {
ass := parseAssertions(assertions...)

var got interface{}
var err error

for item := range single.Observe() {
if item.IsError() {
err = item.Err
break
}
got = item.Value
break
}

if checkHasValue, value := ass.valueToBeChecked(); checkHasValue {
assert.Equal(t, value, got)
}

if checkHasNoValue, value := ass.noValueToBeChecked(); checkHasNoValue {
assert.Equal(t, value, got)
}

if checkHasRaisedError, expectedError := ass.raisedErrorToBeChecked(); checkHasRaisedError {
assert.Equal(t, expectedError, err)
}

if ass.notRaisedErrorToBeChecked() {
assert.Nil(t, err)
}
}

func AssertOptionalSingle(ctx context.Context, t *testing.T, optionalSingle OptionalSingle, assertions ...RxAssert) {
ass := parseAssertions(assertions...)

var got interface{}
var err error

for item := range optionalSingle.Observe() {
if item.IsError() {
err = item.Err
break
}
got = item.Value
break
}

if checkHasValue, value := ass.valueToBeChecked(); checkHasValue {
assert.Equal(t, value, got)
}

if checkHasNoValue, value := ass.noValueToBeChecked(); checkHasNoValue {
assert.Equal(t, value, got)
}

if checkHasRaisedError, expectedError := ass.raisedErrorToBeChecked(); checkHasRaisedError {
assert.Equal(t, expectedError, err)
}

if ass.notRaisedErrorToBeChecked() {
assert.Nil(t, err)
}
}
10 changes: 8 additions & 2 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ func FromChannel(next <-chan Item) Observable {
}
}

// Just creates an Observable with the provided items.
func Just(item Item, items ...Item) Observable {
func FromItem(item Item) Single {
return &single{
iterable: newSliceIterable([]Item{item}),
}
}

// FromItems creates an Observable with the provided items.
func FromItems(item Item, items ...Item) Observable {
if len(items) > 0 {
items = append([]Item{item}, items...)
} else {
Expand Down
10 changes: 8 additions & 2 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ func Test_FromChannel(t *testing.T) {
AssertObservable(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_Just(t *testing.T) {
obs := Just(FromValue(1), FromValue(2), FromValue(3))
func Test_FromItem(t *testing.T) {
single := FromItem(FromValue(1))
AssertSingle(context.Background(), t, single, HasValue(1), HasNotRaisedError())
AssertSingle(context.Background(), t, single, HasValue(1), HasNotRaisedError())
}

func Test_FromItems(t *testing.T) {
obs := FromItems(FromValue(1), FromValue(2), FromValue(3))
AssertObservable(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
AssertObservable(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}
2 changes: 1 addition & 1 deletion iterable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package rxgo

// Iterable is the interface returning an iterable channel.
type Iterable interface {
Next() <-chan Item
Observe() <-chan Item
}
2 changes: 1 addition & 1 deletion iterable_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ func newChannelIterable(next <-chan Item) Iterable {
return &channelIterable{next: next}
}

func (i *channelIterable) Next() <-chan Item {
func (i *channelIterable) Observe() <-chan Item {
return i.next
}
2 changes: 1 addition & 1 deletion iterable_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func newSliceIterable(items []Item) Iterable {
return &sliceIterable{items: items}
}

func (i *sliceIterable) Next() <-chan Item {
func (i *sliceIterable) Observe() <-chan Item {
next := make(chan Item)
go func() {
for _, item := range i.items {
Expand Down
26 changes: 12 additions & 14 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,25 @@ type Observable interface {
Iterable
Filter(ctx context.Context, apply Predicate) Observable
ForEach(ctx context.Context, nextFunc NextFunc, errFunc ErrFunc, doneFunc DoneFunc)
Map(ctx context.Context, apply Func) Observable
Map(ctx context.Context, apply Function) Observable
SkipWhile(ctx context.Context, apply Predicate) Observable
}

type observable struct {
iterable Iterable
handler Handler
}

func newObservable(ctx context.Context, source Observable, handler Handler) Observable {
func newObservableFromHandler(ctx context.Context, source Observable, handler Handler) Observable {
next := make(chan Item)

go handler(ctx, source.Next(), next)
go handler(ctx, source.Observe(), next)

return &observable{
iterable: newChannelIterable(next),
handler: handler,
}
}

func newOperator(ctx context.Context, source Observable, nextFunc Operator, errFunc Operator) Observable {
func newObservableFromOperator(ctx context.Context, source Observable, nextFunc Operator, errFunc Operator) Observable {
next := make(chan Item)

stopped := false
Expand All @@ -42,7 +40,7 @@ func newOperator(ctx context.Context, source Observable, nextFunc Operator, errF
case <-ctx.Done():
close(next)
return
case i, ok := <-source.Next():
case i, ok := <-source.Observe():
if !ok {
close(next)
return
Expand All @@ -62,12 +60,12 @@ func newOperator(ctx context.Context, source Observable, nextFunc Operator, errF
}
}

func (o *observable) Next() <-chan Item {
return o.iterable.Next()
func (o *observable) Observe() <-chan Item {
return o.iterable.Observe()
}

func (o *observable) Filter(ctx context.Context, apply Predicate) Observable {
return newOperator(ctx, o, func(item Item, dst chan<- Item, stop func()) {
return newObservableFromOperator(ctx, o, func(item Item, dst chan<- Item, stop func()) {
if apply(item.Value) {
dst <- item
}
Expand Down Expand Up @@ -97,11 +95,11 @@ func (o *observable) ForEach(ctx context.Context, nextFunc NextFunc, errFunc Err
}
}
}
newObservable(ctx, o, handler)
newObservableFromHandler(ctx, o, handler)
}

func (o *observable) Map(ctx context.Context, apply Func) Observable {
return newOperator(ctx, o, func(item Item, dst chan<- Item, stop func()) {
func (o *observable) Map(ctx context.Context, apply Function) Observable {
return newObservableFromOperator(ctx, o, func(item Item, dst chan<- Item, stop func()) {
res, err := apply(item.Value)
if err != nil {
dst <- FromError(err)
Expand All @@ -117,7 +115,7 @@ func (o *observable) Map(ctx context.Context, apply Func) Observable {
func (o *observable) SkipWhile(ctx context.Context, apply Predicate) Observable {
skip := true

return newOperator(ctx, o, func(item Item, dst chan<- Item, stop func()) {
return newObservableFromOperator(ctx, o, func(item Item, dst chan<- Item, stop func()) {
if !skip {
dst <- item
} else {
Expand Down
23 changes: 16 additions & 7 deletions observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_Filter(t *testing.T) {
func Test_Observable_Filter(t *testing.T) {
obs := FromChannel(channelValue(1, 2, 3, 4, closeCmd)).Filter(context.Background(),
func(i interface{}) bool {
return i.(int)%2 == 0
})
AssertObservable(context.Background(), t, obs, HasItems(2, 4), HasNotRaisedError())
}

func Test_ForEach(t *testing.T) {
func Test_Observable_ForEach(t *testing.T) {
count := 0
var gotErr error
done := make(chan struct{})
Expand All @@ -35,7 +35,7 @@ func Test_ForEach(t *testing.T) {
assert.Equal(t, errFoo, gotErr)
}

func Test_Map_One(t *testing.T) {
func Test_Observable_Map_One(t *testing.T) {
next := channelValue(1, 2, 3, closeCmd)

obs := FromChannel(next).Map(context.Background(), func(i interface{}) (interface{}, error) {
Expand All @@ -44,7 +44,7 @@ func Test_Map_One(t *testing.T) {
AssertObservable(context.Background(), t, obs, HasItems(2, 3, 4), HasNotRaisedError())
}

func Test_Map_Multiple(t *testing.T) {
func Test_Observable_Map_Multiple(t *testing.T) {
next := channelValue(1, 2, 3, closeCmd)

obs := FromChannel(next).Map(context.Background(), func(i interface{}) (interface{}, error) {
Expand All @@ -55,7 +55,7 @@ func Test_Map_Multiple(t *testing.T) {
AssertObservable(context.Background(), t, obs, HasItems(20, 30, 40), HasNotRaisedError())
}

func Test_Map_Error(t *testing.T) {
func Test_Observable_Map_Error(t *testing.T) {
next := channelValue(1, 2, 3, errFoo)

obs := FromChannel(next).Map(context.Background(), func(i interface{}) (interface{}, error) {
Expand All @@ -64,7 +64,7 @@ func Test_Map_Error(t *testing.T) {
AssertObservable(context.Background(), t, obs, HasItems(2, 3, 4), HasRaisedError(errFoo))
}

func Test_Map_Cancel(t *testing.T) {
func Test_Observable_Map_Cancel(t *testing.T) {
next := make(chan Item)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -75,7 +75,16 @@ func Test_Map_Cancel(t *testing.T) {
AssertObservable(context.Background(), t, obs, HasNoItems(), HasNotRaisedError())
}

func Test_SkipWhile(t *testing.T) {
func Test_Observable_Observe(t *testing.T) {
got := make([]int, 0)
ch := FromChannel(channelValue(1, 2, 3, closeCmd)).Observe()
for item := range ch {
got = append(got, item.Value.(int))
}
assert.Equal(t, []int{1, 2, 3}, got)
}

func Test_Observable_SkipWhile(t *testing.T) {
next := channelValue(1, 2, 3, 4, 5, closeCmd)

obs := FromChannel(next).SkipWhile(context.Background(), func(i interface{}) bool {
Expand Down
48 changes: 48 additions & 0 deletions optionalsingle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package rxgo

import "context"

type OptionalSingle interface {
Iterable
}

func newOptionalSingleFromOperator(ctx context.Context, source Single, nextFunc Operator, errFunc Operator) OptionalSingle {
next := make(chan Item)

stop := func() {}
go func() {
for {
select {
case <-ctx.Done():
close(next)
return
case i, ok := <-source.Observe():
if !ok {
close(next)
return
}
if i.IsError() {
errFunc(i, next, stop)
close(next)
return
} else {
nextFunc(i, next, stop)
close(next)
return
}
}
}
}()

return &optionalSingle{
iterable: newChannelIterable(next),
}
}

type optionalSingle struct {
iterable Iterable
}

func (o *optionalSingle) Observe() <-chan Item {
return o.iterable.Observe()
}
13 changes: 13 additions & 0 deletions optionalsingle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rxgo

import (
"context"
"testing"
)

func Test_OptionalSingle_Observe(t *testing.T) {
os := FromItem(FromValue(1)).Filter(context.Background(), func(i interface{}) bool {
return i == 1
})
AssertOptionalSingle(context.Background(), t, os, HasValue(1), HasNotRaisedError())
}
Loading

0 comments on commit 555edd3

Please sign in to comment.