Skip to content

Commit

Permalink
Just operator
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 15, 2020
1 parent 3f30d5e commit c447f37
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 16 deletions.
15 changes: 13 additions & 2 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@ func Empty() Observable {
next := make(chan Item)
close(next)
return &observable{
iterable: newIterable(next),
iterable: newChannelIterable(next),
}
}

func FromChannel(next <-chan Item) Observable {
return &observable{
iterable: newIterable(next),
iterable: newChannelIterable(next),
}
}

func Just(item Item, items ...Item) Observable {
if len(items) > 0 {
items = append([]Item{item}, items...)
} else {
items = []Item{item}
}
return &observable{
iterable: newSliceIterable(items),
}
}
6 changes: 6 additions & 0 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ func Test_FromChannel(t *testing.T) {
obs := FromChannel(next)
assertObservable(t, context.Background(), obs, hasItems(1, 2, 3), hasNotRaisedError())
}

func Test_Just(t *testing.T) {
obs := Just(FromValue(1), FromValue(2), FromValue(3))
assertObservable(t, context.Background(), obs, hasItems(1, 2, 3), hasNotRaisedError())
assertObservable(t, context.Background(), obs, hasItems(1, 2, 3), hasNotRaisedError())
}
12 changes: 0 additions & 12 deletions iterable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,3 @@ package rxgo
type Iterable interface {
Next() <-chan Item
}

type Source struct {
next <-chan Item
}

func newIterable(next <-chan Item) Iterable {
return &Source{next: next}
}

func (s *Source) Next() <-chan Item {
return s.next
}
13 changes: 13 additions & 0 deletions iterable_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rxgo

type channelIterable struct {
next <-chan Item
}

func newChannelIterable(next <-chan Item) Iterable {
return &channelIterable{next: next}
}

func (i *channelIterable) Next() <-chan Item {
return i.next
}
20 changes: 20 additions & 0 deletions iterable_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package rxgo

type sliceIterable struct {
items []Item
}

func newSliceIterable(items []Item) Iterable {
return &sliceIterable{items: items}
}

func (i *sliceIterable) Next() <-chan Item {
next := make(chan Item)
go func() {
for _, item := range i.items {
next <- item
}
close(next)
}()
return next
}
4 changes: 2 additions & 2 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newObservable(ctx context.Context, source Observable, handler Handler) Obse
go handler(ctx, source.Next(), next)

return &observable{
iterable: newIterable(next),
iterable: newChannelIterable(next),
handler: handler,
}
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func newOperator(ctx context.Context, source Observable, nextFunc Operator, errF
}()

return &observable{
iterable: newIterable(next),
iterable: newChannelIterable(next),
}
}

Expand Down

0 comments on commit c447f37

Please sign in to comment.