Skip to content

Commit

Permalink
SendItems utility function
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 18, 2020
1 parent a0acd5f commit 1de77d1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
46 changes: 45 additions & 1 deletion item.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
package rxgo

import "context"
import (
"context"
"reflect"
)

type (
// Item is a wrapper having either a value or an error.
Item struct {
V interface{}
E error
}

// CloseChannelStrategy indicates a strategy on whether to close a channel.
CloseChannelStrategy uint32
)

const (
// LeaveChannelOpen indicates to leave the channel open after completion.
LeaveChannelOpen CloseChannelStrategy = iota
// CloseChannel indicates to close the channel open after completion.
CloseChannel
)

// Of creates an item from a value.
Expand All @@ -20,6 +33,37 @@ func Error(err error) Item {
return Item{E: err}
}

// SendItems is an utility function that send a list of interface{} and indicate a strategy on whether to close
// the channel once the function completes.
func SendItems(ch chan<- Item, strategy CloseChannelStrategy, items ...interface{}) {
if strategy == CloseChannel {
defer close(ch)
}
for _, currentItem := range items {
switch item := currentItem.(type) {
default:
rt := reflect.TypeOf(item)
switch rt.Kind() {
default:
ch <- Of(item)
case reflect.Slice:
s := reflect.ValueOf(currentItem)
for i := 0; i < s.Len(); i++ {
currentItem := s.Index(i).Interface()
switch item := currentItem.(type) {
default:
ch <- Of(item)
case error:
ch <- Error(item)
}
}
}
case error:
ch <- Error(item)
}
}
}

// Error checks if an item is an error.
func (i Item) Error() bool {
return i.E != nil
Expand Down
24 changes: 24 additions & 0 deletions item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_SendItems_Variadic(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, 1, 2, 3)
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_SendItems_VariadicWithError(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, 1, errFoo, 3)
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 3), HasRaisedError(errFoo))
}

func Test_SendItems_Slice(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, []int{1, 2, 3})
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 2, 3), HasNotRaisedError())
}

func Test_SendItems_SliceWithError(t *testing.T) {
ch := make(chan Item, 3)
go SendItems(ch, CloseChannel, []interface{}{1, errFoo, 3})
Assert(context.Background(), t, FromChannel(ch), HasItems(1, 3), HasRaisedError(errFoo))
}

func Test_Item_SendBlocking(t *testing.T) {
ch := make(chan Item, 1)
defer close(ch)
Expand Down

0 comments on commit 1de77d1

Please sign in to comment.