Skip to content

Commit

Permalink
Modify Just to consume from a channel
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 25, 2020
1 parent e586431 commit 54fcd1d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 10 deletions.
7 changes: 7 additions & 0 deletions doc/just.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ Output:
3
```

### Channel

```go
externalCh := make(chan int)
observable := rxgo.Just(externalCh)
```

## Options

### WithBufferedChannel
Expand Down
12 changes: 12 additions & 0 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,18 @@ func Test_Just_CustomStructure(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(customer{id: 1}, customer{id: 2}, customer{id: 3}), HasNoError())
}

func Test_Just_Channel(t *testing.T) {
ch := make(chan int, 1)
go func() {
ch <- 1
ch <- 2
ch <- 3
close(ch)
}()
obs := Just(ch)
Assert(context.Background(), t, obs, HasItems(1, 2, 3))
}

func Test_Just_SimpleCapacity(t *testing.T) {
ch := Just([]Item{Of(1)}, WithBufferedChannel(5)).Observe()
assert.Equal(t, 5, cap(ch))
Expand Down
25 changes: 20 additions & 5 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Error(err error) Item {

// SendItems is an utility function that send a list of interface{} and indicate a strategy on whether to close
// the channel once the function completes.
func SendItems(ch chan<- Item, strategy CloseChannelStrategy, items ...interface{}) {
func SendItems(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, items ...interface{}) {
if strategy == CloseChannel {
defer close(ch)
}
Expand All @@ -52,21 +52,36 @@ func SendItems(ch chan<- Item, strategy CloseChannelStrategy, items ...interface
rt := reflect.TypeOf(item)
switch rt.Kind() {
default:
ch <- Of(item)
Of(item).SendContext(ctx, ch)
case reflect.Chan:
in := reflect.ValueOf(currentItem)
for {
v, ok := in.Recv()
if !ok {
return
}
currentItem := v.Interface()
switch item := currentItem.(type) {
default:
Of(item).SendContext(ctx, ch)
case error:
Error(item).SendContext(ctx, ch)
}
}
case reflect.Slice:
s := reflect.ValueOf(currentItem)
for i := 0; i < s.Len(); i++ {
currentItem := s.Index(i).Interface()
switch item := currentItem.(type) {
default:
ch <- Of(item)
Of(item).SendContext(ctx, ch)
case error:
ch <- Error(item)
Error(item).SendContext(ctx, ch)
}
}
}
case error:
ch <- Error(item)
Error(item).SendContext(ctx, ch)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,25 @@ import (

func Test_SendItems_Variadic(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, 1, 2, 3)
go SendItems(context.Background(), ch, CloseChannel, 1, 2, 3)
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 2, 3), HasNoError())
}

func Test_SendItems_VariadicWithError(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, 1, errFoo, 3)
go SendItems(context.Background(), ch, CloseChannel, 1, errFoo, 3)
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 3), HasError(errFoo))
}

func Test_SendItems_Slice(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, []int{1, 2, 3})
go SendItems(context.Background(), ch, CloseChannel, []int{1, 2, 3})
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 2, 3), HasNoError())
}

func Test_SendItems_SliceWithError(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, []interface{}{1, errFoo, 3})
go SendItems(context.Background(), ch, CloseChannel, []interface{}{1, errFoo, 3})
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 3), HasError(errFoo))
}

Expand Down
2 changes: 1 addition & 1 deletion iterable_just.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ func (i *justIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()

go SendItems(next, CloseChannel, i.items)
go SendItems(option.buildContext(), next, CloseChannel, i.items)
return next
}

0 comments on commit 54fcd1d

Please sign in to comment.