Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions RxSwift/Observables/Multicast.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ extension ConnectableObservableType {
public func refCount() -> Observable<Element> {
RefCount(source: self)
}

/**
Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
Waits the given amount of time on the given scheduler after the last subscription has been disposed before disconnecting from the source.

- seealso: [refCount operator on reactivex.io](http://reactivex.io/documentation/operators/refcount.html)

- parameter timeout: Time interval to wait after the last subscription to the result is disposed before disconnecting from the source.
- parameter scheduler: Scheduler to run the timeout timer on.
- returns: An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
*/
public func refCount(timeout: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
RefCount(source: self, timeout: RefCount.TimeoutConfig(interval: timeout, scheduler: scheduler))
}
}

extension ObservableType {
Expand Down Expand Up @@ -270,6 +284,8 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O

if self.parent.count == 0 {
self.parent.count = 1
self.parent.timeoutSubscription?.dispose()
self.parent.timeoutSubscription = nil
self.parent.connectableSubscription = self.parent.source.connect()
}
else {
Expand All @@ -288,8 +304,22 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O
return
}

connectableSubscription.dispose()
self.parent.connectableSubscription = nil
if let timeout = self.parent.timeout {
self.parent.timeoutSubscription = timeout.scheduler.scheduleRelative((), dueTime: timeout.interval) { _ in
self.parent.lock.lock(); defer { self.parent.lock.unlock() }
guard self.parent.count == 0,
self.parent.connectionId == self.connectionIdSnapshot,
let connectableSubscription = self.parent.connectableSubscription else {
return Disposables.create()
}
connectableSubscription.dispose()
self.parent.connectableSubscription = nil
return Disposables.create()
}
} else {
connectableSubscription.dispose()
self.parent.connectableSubscription = nil
}
}
else if self.parent.count > 1 {
self.parent.count -= 1
Expand All @@ -308,10 +338,12 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O
self.parent.lock.lock()
if self.parent.connectionId == self.connectionIdSnapshot {
let connection = self.parent.connectableSubscription
defer { connection?.dispose() }
let timeout = self.parent.timeoutSubscription
defer { connection?.dispose(); timeout?.dispose() }
self.parent.count = 0
self.parent.connectionId = self.parent.connectionId &+ 1
self.parent.connectableSubscription = nil
self.parent.timeoutSubscription = nil
}
self.parent.lock.unlock()
self.forwardOn(event)
Expand All @@ -321,17 +353,25 @@ final private class RefCountSink<ConnectableSource: ConnectableObservableType, O
}

final private class RefCount<ConnectableSource: ConnectableObservableType>: Producer<ConnectableSource.Element> {
struct TimeoutConfig {
let interval: RxTimeInterval
let scheduler: SchedulerType
}

fileprivate let lock = RecursiveLock()

// state
fileprivate var count = 0
fileprivate var connectionId: Int64 = 0
fileprivate var connectableSubscription = nil as Disposable?
fileprivate var timeoutSubscription = nil as Disposable?

fileprivate let source: ConnectableSource
fileprivate let timeout: TimeoutConfig?

init(source: ConnectableSource) {
init(source: ConnectableSource, timeout: TimeoutConfig? = nil) {
self.source = source
self.timeout = timeout
}

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
Expand Down
99 changes: 99 additions & 0 deletions Tests/RxSwiftTests/Observable+MulticastTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,105 @@ extension ObservableMulticastTest {
])
}

func testRefCount_timeoutKeepsConnectionAlive() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createHotObservable([
.next(210, 1),
.next(220, 2),
.next(230, 3),
.next(240, 4),
.next(250, 5),
.next(260, 6),
.next(270, 7),
.next(280, 8),
.next(290, 9),
.completed(300)
])

let res = scheduler.start(disposed: 245) {
xs.publish().refCount(timeout: .seconds(50), scheduler: scheduler)
}

XCTAssertEqual(res.events, [
.next(210, 1),
.next(220, 2),
.next(230, 3),
.next(240, 4)
])

XCTAssertEqual(xs.subscriptions, [Subscription(200, 295)])
}

func testRefCount_timeoutIsCancelledOnceAnotherSubscriberIsAdded() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createHotObservable([
.next(210, 1),
.next(220, 2),
.next(230, 3),
.next(240, 4),
.next(250, 5),
.next(260, 6),
.next(270, 7),
.next(280, 8),
.next(290, 9),
.completed(300)
])

let refCounted = xs.publish().refCount(timeout: .seconds(50), scheduler: scheduler)

let dis1 = refCounted.subscribe()
scheduler.scheduleAt(200, action: dis1.dispose)

let res = scheduler.start {
refCounted
}

XCTAssertEqual(res.events, [
.next(210, 1),
.next(220, 2),
.next(230, 3),
.next(240, 4),
.next(250, 5),
.next(260, 6),
.next(270, 7),
.next(280, 8),
.next(290, 9),
.completed(300)
])

XCTAssertEqual(xs.subscriptions, [Subscription(0, 300)])
}

func testRefCount_timeoutNotUsedWhenOneOfTwoSubscribersDisconnects() {
let scheduler = TestScheduler(initialClock: 0)

let xs = scheduler.createHotObservable([
.next(210, 1),
.next(220, 2),
.next(230, 3),
.next(240, 4),
.next(250, 5),
.next(260, 6),
.next(270, 7),
.next(280, 8),
.next(290, 9),
.completed(300)
])

let refCounted = xs.publish().refCount(timeout: .seconds(50), scheduler: scheduler)

let dis1 = refCounted.subscribe()
defer { dis1.dispose() }

_ = scheduler.start(disposed: 240) {
refCounted
}

XCTAssertEqual(xs.subscriptions, [Subscription(0, 300)])
}

#if TRACE_RESOURCES
func testRefCountReleasesResourcesOnComplete() {
_ = Observable<Int>.just(1).publish().refCount().subscribe()
Expand Down
Loading