diff --git a/observable.go b/observable.go index 94c33734..343170b7 100644 --- a/observable.go +++ b/observable.go @@ -25,7 +25,7 @@ type Observable interface { BufferWithCount(count int, opts ...Option) Observable BufferWithTime(timespan Duration, opts ...Option) Observable BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable - Connect() (Disposed, Disposable) + Connect() (context.Context, Disposable) Contains(equal Predicate, opts ...Option) Single Count(opts ...Option) Single Debounce(timespan Duration, opts ...Option) Observable diff --git a/observable_operator.go b/observable_operator.go index d897b92b..3995a5df 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -594,15 +594,10 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt } // Connect instructs a connectable Observable to begin emitting items to its subscribers. -func (o *ObservableImpl) Connect() (Disposed, Disposable) { +func (o *ObservableImpl) Connect() (context.Context, Disposable) { ctx, cancel := context.WithCancel(context.Background()) o.Observe(WithContext(ctx), connect()) - ch := make(chan struct{}) - go func() { - <-ctx.Done() - close(ch) - }() - return ch, Disposable(cancel) + return ctx, Disposable(cancel) } // Contains determines whether an Observable emits a particular item or not.