Skip to content

Commit

Permalink
Observable behavior refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Dec 5, 2018
1 parent f5d04df commit c8dc5d8
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 690 deletions.
17 changes: 11 additions & 6 deletions connectableobservable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ import (
)

type ConnectableObservable interface {
Iterable
Connect() Observer
Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer
}

type connectableObservable struct {
iterator Iterator
observable Observable
observers []Observer
}

func NewConnectableObservable(observable Observable) ConnectableObservable {
func newConnectableObservableFromObservable(observable Observable) ConnectableObservable {
return &connectableObservable{
observable: observable,
iterator: observable.Iterator(),
}
}

func (c *connectableObservable) Iterator() Iterator {
return c.iterator
}

func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer {
ob := CheckEventHandler(handler)
c.observers = append(c.observers, ob)
Expand All @@ -32,11 +39,9 @@ func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ..
func (c *connectableObservable) Connect() Observer {
source := make([]interface{}, 0)

for {
item, err := c.observable.Next()
if err != nil {
break
}
it := c.iterator
for it.Next() {
item := it.Value()
source = append(source, item)
}

Expand Down
14 changes: 5 additions & 9 deletions flatmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func (o *observable) flatMap(

go flatteningFunc(out, o, apply, maxInParallel)

return &observable{
ch: out,
}
return NewObservableFromChannel(out)
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
Expand All @@ -43,12 +41,10 @@ func flatObservedSequence(out chan interface{}, o Observable, apply func(interfa

count = 0

for {
element, err := o.Next()
if err != nil {
break
}
sequence = apply(element)
it := o.Iterator()
for it.Next() {
item := it.Value()
sequence = apply(item)
count++
wg.Add(1)
go func() {
Expand Down
5 changes: 5 additions & 0 deletions iterable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package rxgo

type Iterable interface {
Iterator() Iterator
}
38 changes: 0 additions & 38 deletions iterable/iterable.go

This file was deleted.

99 changes: 0 additions & 99 deletions iterable/iterable_test.go

This file was deleted.

28 changes: 26 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
package rxgo

// Iterator type is implemented by Iterable.
type Iterator interface {
Next() (interface{}, error)
Next() bool
Value() interface{}
}

type iteratorFromChannel struct {
item interface{}
ch chan interface{}
}

func (s *iteratorFromChannel) Next() bool {
if v, ok := <-s.ch; ok {
s.item = v
return true
}

return false
}

func (s *iteratorFromChannel) Value() interface{} {
return s.item
}

func NewIteratorFromChannel(ch chan interface{}) Iterator {
return &iteratorFromChannel{
ch: ch,
}
}
23 changes: 23 additions & 0 deletions iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rxgo

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIteratorFromChannel(t *testing.T) {
ch := make(chan interface{}, 1)
it := NewIteratorFromChannel(ch)

ch <- 1
assert.True(t, it.Next())
assert.Equal(t, 1, it.Value())

ch <- 2
assert.True(t, it.Next())
assert.Equal(t, 2, it.Value())

close(ch)
assert.False(t, it.Next())
}
Loading

0 comments on commit c8dc5d8

Please sign in to comment.