diff --git a/factory.go b/factory.go index 3873e051..dd64d0e7 100644 --- a/factory.go +++ b/factory.go @@ -4,12 +4,23 @@ func Empty() Observable { next := make(chan Item) close(next) return &observable{ - iterable: newIterable(next), + iterable: newChannelIterable(next), } } func FromChannel(next <-chan Item) Observable { return &observable{ - iterable: newIterable(next), + iterable: newChannelIterable(next), + } +} + +func Just(item Item, items ...Item) Observable { + if len(items) > 0 { + items = append([]Item{item}, items...) + } else { + items = []Item{item} + } + return &observable{ + iterable: newSliceIterable(items), } } diff --git a/factory_test.go b/factory_test.go index 8704824e..beb614bf 100644 --- a/factory_test.go +++ b/factory_test.go @@ -15,3 +15,9 @@ func Test_FromChannel(t *testing.T) { obs := FromChannel(next) assertObservable(t, context.Background(), obs, hasItems(1, 2, 3), hasNotRaisedError()) } + +func Test_Just(t *testing.T) { + obs := Just(FromValue(1), FromValue(2), FromValue(3)) + assertObservable(t, context.Background(), obs, hasItems(1, 2, 3), hasNotRaisedError()) + assertObservable(t, context.Background(), obs, hasItems(1, 2, 3), hasNotRaisedError()) +} diff --git a/iterable.go b/iterable.go index ed2fbbdb..c91293ab 100644 --- a/iterable.go +++ b/iterable.go @@ -3,15 +3,3 @@ package rxgo type Iterable interface { Next() <-chan Item } - -type Source struct { - next <-chan Item -} - -func newIterable(next <-chan Item) Iterable { - return &Source{next: next} -} - -func (s *Source) Next() <-chan Item { - return s.next -} diff --git a/iterable_channel.go b/iterable_channel.go new file mode 100644 index 00000000..61fdc34b --- /dev/null +++ b/iterable_channel.go @@ -0,0 +1,13 @@ +package rxgo + +type channelIterable struct { + next <-chan Item +} + +func newChannelIterable(next <-chan Item) Iterable { + return &channelIterable{next: next} +} + +func (i *channelIterable) Next() <-chan Item { + return i.next +} diff --git a/iterable_slice.go b/iterable_slice.go new file mode 100644 index 00000000..4725835d --- /dev/null +++ b/iterable_slice.go @@ -0,0 +1,20 @@ +package rxgo + +type sliceIterable struct { + items []Item +} + +func newSliceIterable(items []Item) Iterable { + return &sliceIterable{items: items} +} + +func (i *sliceIterable) Next() <-chan Item { + next := make(chan Item) + go func() { + for _, item := range i.items { + next <- item + } + close(next) + }() + return next +} diff --git a/observable.go b/observable.go index 263c96e0..ecf605b1 100644 --- a/observable.go +++ b/observable.go @@ -23,7 +23,7 @@ func newObservable(ctx context.Context, source Observable, handler Handler) Obse go handler(ctx, source.Next(), next) return &observable{ - iterable: newIterable(next), + iterable: newChannelIterable(next), handler: handler, } } @@ -57,7 +57,7 @@ func newOperator(ctx context.Context, source Observable, nextFunc Operator, errF }() return &observable{ - iterable: newIterable(next), + iterable: newChannelIterable(next), } }