Skip to content

Commit

Permalink
Return context connect
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed May 13, 2020
1 parent 818dca5 commit 064c558
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 8 deletions.
2 changes: 1 addition & 1 deletion observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Observable interface {
BufferWithCount(count int, opts ...Option) Observable
BufferWithTime(timespan Duration, opts ...Option) Observable
BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
Connect() (Disposed, Disposable)
Connect() (context.Context, Disposable)
Contains(equal Predicate, opts ...Option) Single
Count(opts ...Option) Single
Debounce(timespan Duration, opts ...Option) Observable
Expand Down
9 changes: 2 additions & 7 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,10 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
}

// Connect instructs a connectable Observable to begin emitting items to its subscribers.
func (o *ObservableImpl) Connect() (Disposed, Disposable) {
func (o *ObservableImpl) Connect() (context.Context, Disposable) {
ctx, cancel := context.WithCancel(context.Background())
o.Observe(WithContext(ctx), connect())
ch := make(chan struct{})
go func() {
<-ctx.Done()
close(ch)
}()
return ch, Disposable(cancel)
return ctx, Disposable(cancel)
}

// Contains determines whether an Observable emits a particular item or not.
Expand Down

0 comments on commit 064c558

Please sign in to comment.