Skip to content

Commit

Permalink
Filter operator
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 15, 2020
1 parent 6fb958a commit e09b5aa
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
12 changes: 12 additions & 0 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

type Observable interface {
Iterable
Filter(ctx context.Context, apply Predicate) Observable
ForEach(ctx context.Context, nextFunc NextFunc, errFunc ErrFunc, doneFunc DoneFunc)
Map(ctx context.Context, apply Func) Observable
SkipWhile(ctx context.Context, apply Predicate) Observable
Expand Down Expand Up @@ -64,6 +65,17 @@ func (o *observable) Next() <-chan Item {
return o.iterable.Next()
}

func (o *observable) Filter(ctx context.Context, apply Predicate) Observable {
return newOperator(ctx, o, func(item Item, dst chan<- Item, stop func()) {
if apply(item.Value) {
dst <- item
}
}, func(item Item, dst chan<- Item, stop func()) {
dst <- item
stop()
})
}

func (o *observable) ForEach(ctx context.Context, nextFunc NextFunc, errFunc ErrFunc, doneFunc DoneFunc) {
handler := func(ctx context.Context, src <-chan Item, dst chan<- Item) {
for {
Expand Down
11 changes: 11 additions & 0 deletions observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ import (
"testing"
)

func Test_Filter(t *testing.T) {
obs := FromChannel(channelValue(1, 2, 3, 4, closeCmd)).Filter(context.Background(),
func(i interface{}) bool {
if i.(int)%2 == 0 {
return true
}
return false
})
assertObservable(t, context.Background(), obs, hasItems(2, 4), hasNotRaisedError())
}

func Test_ForEach(t *testing.T) {
count := 0
var gotErr error
Expand Down

0 comments on commit e09b5aa

Please sign in to comment.