Skip to content

Commit

Permalink
Remove options passed to Observe()
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 17, 2020
1 parent e2d3fe9 commit cad74dc
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 74 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The following documentation gives an overview of RxGo. If you need more informat
Let's create our first Observable and consume an item:

```go
observable := rxgo.Just(rxgo.Of("Hello, World!"))
observable := rxgo.Just([]rxgo.Item{rxgo.Of("Hello, World!")})
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)
Expand Down Expand Up @@ -188,12 +188,12 @@ It means, the first Observer consumed already all the items.
On the other hand, let's create a hot Observable using `Defer` operator:

```go
observable := rxgo.Defer(func(_ context.Context, ch chan<- rxgo.Item, done func()) {
observable := rxgo.Defer([]Producer{func(_ context.Context, ch chan<- rxgo.Item, done func()) {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
done()
})
}})

// First Observer
for item := range observable.Observe() {
Expand Down
23 changes: 9 additions & 14 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ func Concat(observables []Observable, opts ...Option) Observable {
}

// Defer creates an observable from multiple functions.
func Defer(f ...ProducerFunc) Observable {
func Defer(f []Producer, opts ...Option) Observable {
return &observable{
iterable: newFuncsIterable(f...),
iterable: newDeferIterable(f, opts...),
}
}

Expand Down Expand Up @@ -195,9 +195,9 @@ func FromEventSource(next <-chan Item, opts ...Option) Observable {
}

// FromSlice creates an observable from a slice.
func FromSlice(s []Item) Single {
func FromSlice(s []Item, opts ...Option) Single {
return &single{
iterable: newSliceIterable(s),
iterable: newSliceIterable(s, opts...),
}
}

Expand Down Expand Up @@ -227,19 +227,14 @@ func Interval(interval Duration, opts ...Option) Observable {
}

// Just creates an Observable with the provided items.
func Just(item Item, items ...Item) Observable {
if len(items) > 0 {
items = append([]Item{item}, items...)
} else {
items = []Item{item}
}
func Just(items []Item, opts ...Option) Observable {
return &observable{
iterable: newSliceIterable(items),
iterable: newSliceIterable(items, opts...),
}
}

// JustItem creates a single from one item.
func JustItem(item Item) Single {
func JustItem(item Item, opts ...Option) Single {
return &single{
iterable: newSliceIterable([]Item{item}),
}
Expand Down Expand Up @@ -295,15 +290,15 @@ func Never() Observable {
}

// Range creates an Observable that emits a particular range of sequential integers.
func Range(start, count int) Observable {
func Range(start, count int, opts ...Option) Observable {
if count < 0 {
return newObservableFromError(errors.Wrap(&IllegalInputError{}, "count must be positive"))
}
if start+count-1 > math.MaxInt32 {
return newObservableFromError(errors.Wrap(&IllegalInputError{}, "max value is bigger than math.MaxInt32"))
}
return &observable{
iterable: newRangeIterable(start, count),
iterable: newRangeIterable(start, count, opts...),
}
}

Expand Down
56 changes: 28 additions & 28 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,56 +81,56 @@ func Test_Concat_OneEmptyObservable(t *testing.T) {
}

func Test_Defer(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
})
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_Defer_Multiple(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
done()
}, func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(10)
next <- Of(20)
done()
})
}})
Assert(context.Background(), t, obs, HasItemsNoParticularOrder(1, 2, 10, 20), HasNotRaisedError())
}

func Test_Defer_Close(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
})
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_Defer_SingleDup(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
})
}})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_Defer_ComposedDup(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}).Map(func(i interface{}) (_ interface{}, _ error) {
}}).Map(func(i interface{}) (_ interface{}, _ error) {
return i.(int) + 1, nil
}).Map(func(i interface{}) (_ interface{}, _ error) {
return i.(int) + 1, nil
Expand All @@ -140,12 +140,12 @@ func Test_Defer_ComposedDup(t *testing.T) {
}

func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Of(3)
done()
}).Map(func(i interface{}) (_ interface{}, _ error) {
}}).Map(func(i interface{}) (_ interface{}, _ error) {
return i.(int) + 1, nil
}, WithEagerObservation()).Map(func(i interface{}) (_ interface{}, _ error) {
return i.(int) + 1, nil
Expand All @@ -157,34 +157,34 @@ func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
}

func Test_Defer_Error(t *testing.T) {
obs := Defer(func(ctx context.Context, next chan<- Item, done func()) {
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
next <- Of(1)
next <- Of(2)
next <- Error(errFoo)
done()
})
}})
Assert(context.Background(), t, obs, HasItems(1, 2), HasRaisedError(errFoo))
}

func Test_Defer_SimpleCapacity(t *testing.T) {
ch := Defer(func(_ context.Context, _ chan<- Item, done func()) {
ch := Defer([]Producer{func(_ context.Context, _ chan<- Item, done func()) {
done()
}).Observe(WithBufferedChannel(5))
}}, WithBufferedChannel(5)).Observe()
assert.Equal(t, 5, cap(ch))
}

func Test_Defer_ComposedCapacity(t *testing.T) {
obs1 := Defer(func(_ context.Context, _ chan<- Item, done func()) {
obs1 := Defer([]Producer{func(_ context.Context, _ chan<- Item, done func()) {
done()
}).Map(func(_ interface{}) (interface{}, error) {
}}).Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
assert.Equal(t, 11, cap(obs1.Observe(WithBufferedChannel(13))))
assert.Equal(t, 11, cap(obs1.Observe()))

obs2 := obs1.Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))
assert.Equal(t, 12, cap(obs2.Observe(WithBufferedChannel(13))))
assert.Equal(t, 12, cap(obs2.Observe()))
}

func Test_Empty(t *testing.T) {
Expand All @@ -205,7 +205,7 @@ func Test_FromChannel(t *testing.T) {
}

func Test_FromChannel_SimpleCapacity(t *testing.T) {
ch := FromChannel(make(chan Item, 10)).Observe(WithBufferedChannel(11))
ch := FromChannel(make(chan Item, 10)).Observe()
assert.Equal(t, 10, cap(ch))
}

Expand All @@ -214,12 +214,12 @@ func Test_FromChannel_ComposedCapacity(t *testing.T) {
Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
assert.Equal(t, 11, cap(obs1.Observe(WithBufferedChannel(13))))
assert.Equal(t, 11, cap(obs1.Observe()))

obs2 := obs1.Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))
assert.Equal(t, 12, cap(obs2.Observe(WithBufferedChannel(13))))
assert.Equal(t, 12, cap(obs2.Observe()))
}

func Test_FromItem(t *testing.T) {
Expand All @@ -229,26 +229,26 @@ func Test_FromItem(t *testing.T) {
}

func Test_FromItems(t *testing.T) {
obs := Just(Of(1), Of(2), Of(3))
obs := Just([]Item{Of(1), Of(2), Of(3)})
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_FromItems_SimpleCapacity(t *testing.T) {
ch := Just(Of(1)).Observe(WithBufferedChannel(5))
ch := Just([]Item{Of(1)}, WithBufferedChannel(5)).Observe()
assert.Equal(t, 5, cap(ch))
}

func Test_FromItems_ComposedCapacity(t *testing.T) {
obs1 := Just(Of(1)).Map(func(_ interface{}) (interface{}, error) {
obs1 := Just([]Item{Of(1)}).Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
assert.Equal(t, 11, cap(obs1.Observe(WithBufferedChannel(13))))
assert.Equal(t, 11, cap(obs1.Observe()))

obs2 := obs1.Map(func(_ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))
assert.Equal(t, 12, cap(obs2.Observe(WithBufferedChannel(13))))
assert.Equal(t, 12, cap(obs2.Observe()))
}

func Test_FromEventSource_ObservationAfterAllSent(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion iterable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package rxgo

// Iterable is the interface returning an iterable channel.
type Iterable interface {
Observe(opts ...Option) <-chan Item
Observe() <-chan Item
}
2 changes: 1 addition & 1 deletion iterable_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ func newChannelIterable(next <-chan Item) Iterable {
return &channelIterable{next: next}
}

func (i *channelIterable) Observe(_ ...Option) <-chan Item {
func (i *channelIterable) Observe() <-chan Item {
return i.next
}
16 changes: 10 additions & 6 deletions iterable_funcs.go → iterable_defer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ import (
"sync"
)

type funcsIterable struct {
f []ProducerFunc
type deferIterable struct {
f []Producer
opts []Option
}

func newFuncsIterable(f ...ProducerFunc) Iterable {
return &funcsIterable{f: f}
func newDeferIterable(f []Producer, opts ...Option) Iterable {
return &deferIterable{
f: f,
opts: opts,
}
}

func (i *funcsIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(opts...)
func (i *deferIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
next := option.buildChannel()
ctx := option.buildContext()

Expand Down
8 changes: 5 additions & 3 deletions iterable_eventsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ type eventSourceIterable struct {
sync.RWMutex
observers []chan Item
disposed bool
opts []Option
}

func newEventSourceIterable(ctx context.Context, next <-chan Item, strategy BackpressureStrategy) Iterable {
func newEventSourceIterable(ctx context.Context, next <-chan Item, strategy BackpressureStrategy, opts ...Option) Iterable {
it := &eventSourceIterable{
observers: make([]chan Item, 0),
opts: opts,
}

go func() {
Expand Down Expand Up @@ -60,8 +62,8 @@ func (i *eventSourceIterable) closeAllObservers() {
i.Unlock()
}

func (i *eventSourceIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(opts...)
func (i *eventSourceIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
next := option.buildChannel()

i.Lock()
Expand Down
2 changes: 1 addition & 1 deletion iterable_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ func newColdIterable(factory func() <-chan Item) Iterable {
return &factoryIterable{factory: factory}
}

func (i *factoryIterable) Observe(_ ...Option) <-chan Item {
func (i *factoryIterable) Observe() <-chan Item {
return i.factory()
}
8 changes: 5 additions & 3 deletions iterable_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package rxgo

type rangeIterable struct {
start, count int
opts []Option
}

func newRangeIterable(start, count int) Iterable {
func newRangeIterable(start, count int, opts ...Option) Iterable {
return &rangeIterable{
start: start,
count: count,
opts: opts,
}
}

func (i *rangeIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(opts...)
func (i *rangeIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
next := option.buildChannel()

go func() {
Expand Down
12 changes: 8 additions & 4 deletions iterable_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package rxgo

type sliceIterable struct {
items []Item
opts []Option
}

func newSliceIterable(items []Item) Iterable {
return &sliceIterable{items: items}
func newSliceIterable(items []Item, opts ...Option) Iterable {
return &sliceIterable{
items: items,
opts: opts,
}
}

func (i *sliceIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(opts...)
func (i *sliceIterable) Observe() <-chan Item {
option := parseOptions(i.opts...)
next := option.buildChannel()

go func() {
Expand Down
Loading

0 comments on commit cad74dc

Please sign in to comment.