Skip to content

Commit

Permalink
Timeout operator, pass context to Iterator() and Next(), errors manag…
Browse files Browse the repository at this point in the history
…ement
  • Loading branch information
teivah committed Apr 22, 2019
1 parent bf53b6e commit 633eef2
Show file tree
Hide file tree
Showing 27 changed files with 313 additions and 747 deletions.
3 changes: 1 addition & 2 deletions assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"

"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/optional"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -215,7 +214,7 @@ func AssertThatOptionalSingle(t *testing.T, optionalSingle OptionalSingle, asser
assert.Fail(t, "error while retrieving OptionalSingle results")
}

if optional, ok := v.(optional.Optional); ok {
if optional, ok := v.(Optional); ok {
checkIsEmpty, empty := ass.isEmptyFunc()
if checkIsEmpty {
if empty {
Expand Down
12 changes: 5 additions & 7 deletions assert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package rxgo
import (
"errors"
"testing"

"github.com/reactivex/rxgo/optional"
)

func TestAssertThatObservableHasItems(t *testing.T) {
Expand Down Expand Up @@ -37,21 +35,21 @@ func TestAssertThatSingleNotError(t *testing.T) {
}

func TestAssertThatOptionalSingleIsEmpty(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Empty()), IsEmpty())
AssertThatOptionalSingle(t, newOptionalSingleFrom(EmptyOptional()), IsEmpty())
}

func TestAssertThatOptionalSingleIsNotEmpty(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(1)), IsNotEmpty())
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(1)), IsNotEmpty())
}

func TestAssertThatOptionalSingleHasValue(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(1)), HasValue(1))
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(1)), HasValue(1))
}

func TestAssertThatOptionalSingleHasRaisedAnError(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(errors.New("foo"))), HasRaisedAnError())
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(errors.New("foo"))), HasRaisedAnError())
}

func TestAssertThatOptionalSingleHasRaisedError(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(errors.New("foo"))), HasRaisedError(errors.New("foo")))
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(errors.New("foo"))), HasRaisedError(errors.New("foo")))
}
12 changes: 7 additions & 5 deletions connectableobservable.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package rxgo

import (
"context"
"sync"

"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/options"
"sync"
)

type ConnectableObservable interface {
Expand All @@ -23,8 +25,8 @@ func newConnectableObservableFromObservable(observable Observable) ConnectableOb
}
}

func (c *connectableObservable) Iterator() Iterator {
return c.observable.Iterator()
func (c *connectableObservable) Iterator(ctx context.Context) Iterator {
return c.observable.Iterator(context.Background())
}

func (c *connectableObservable) All(predicate Predicate) Single {
Expand Down Expand Up @@ -74,9 +76,9 @@ func (c *connectableObservable) BufferWithTimeOrCount(timespan Duration, count i
func (c *connectableObservable) Connect() Observer {
out := NewObserver()
go func() {
it := c.observable.Iterator()
it := c.observable.Iterator(context.Background())
for {
if item, err := it.Next(); err == nil {
if item, err := it.Next(context.Background()); err == nil {
c.observersMutex.Lock()
for _, observer := range c.observers {
c.observersMutex.Unlock()
Expand Down
44 changes: 44 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package rxgo

type CancelledIteratorError struct {
}

type EndOfIteratorError struct {
}

type IllegalInputError struct {
reason string
}

type IndexOutOfBoundError struct {
}

type NoSuchElementError struct {
}

type TimeoutError struct {
}

func (e *CancelledIteratorError) Error() string {
return "CancelledIteratorError"
}

func (e *EndOfIteratorError) Error() string {
return "EndOfIteratorError"
}

func (e *IllegalInputError) Error() string {
return e.reason
}

func (e *IndexOutOfBoundError) Error() string {
return "IndexOutOfBoundError"
}

func (e *NoSuchElementError) Error() string {
return "NoSuchElementError"
}

func (e *TimeoutError) Error() string {
return "TimeoutError"
}
17 changes: 0 additions & 17 deletions errors/errorcode_string.go

This file was deleted.

47 changes: 0 additions & 47 deletions errors/errors.go

This file was deleted.

60 changes: 0 additions & 60 deletions errors/errors_test.go

This file was deleted.

26 changes: 0 additions & 26 deletions examples/flatmap/flatmap_slice.go

This file was deleted.

31 changes: 0 additions & 31 deletions examples/flatmap/flatmap_slice_test.go

This file was deleted.

76 changes: 0 additions & 76 deletions examples/reactive_sum/sum.go

This file was deleted.

Loading

0 comments on commit 633eef2

Please sign in to comment.