-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Connectable Observable Operators
This section explains the ConnectableObservable
subclass and its operators:
-
ConnectableObservable.connect( )
— instructs a Connectable Observable to begin emitting items -
Observable.publish( )
andObservable.multicast( )
— represents an Observable as a Connectable Observable -
Observable.publishLast( )
— represent an Observable as a Connectable Observable that emits only the last item emitted by the source Observable -
Observable.replay( )
— ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items -
ConnectableObservable.refCount( )
— makes a Connectable Observable behave like an ordinary Observable
A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect()
method is called. In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.
The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output:
Example #1:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete
Example #2:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete
Call a Connectable Observable's connect( )
method to instruct it to begin emitting the items from its underlying Observable to its Subscribers.
The connect( )
method returns a Subscription
. You can call that object's unsubscribe( )
method to instruct the Observable to stop emitting items to its Subscribers.
You can also use the connect( )
method to instruct an Observable to begin emitting items (or, to begin generating items that would be emitted) even before any Subscriber has subscribed to it.
- javadoc:
connect()
- RxJS:
connect
- Linq:
Connect
- Introduction to Rx: Publish and Connect
To represent an Observable as a Connectable Observable, use the publish( )
or multicast()
method.
- javadoc:
multicast(subject)
- javadoc:
publish()
- RxJS:
multicast
andpublish
- Linq:
Multicast
andPublish
- Introduction to Rx: Publish and Connect
- Introduction to Rx: Multicast
represent an Observable as a Connectable Observable that emits only the last item emitted by the source Observable
- javadoc:
publishLast()
- RxJS:
publishLast
- Linq:
PublishLast
- Introduction to Rx: PublishLast
ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting items
There are varieties of replay( )
that return a ConnectableObservable that you then must use the publish( )
operator on so that Subscribers may connect to it:
And there are also varieties of replay( )
that accept a selector argument and return a simple Observable:
In each variety there are versions with which you can limit the number of replayable items either by quantity or by whether or not they were emitted within a particular timespan.
- javadoc:
replay()
- RxJS:
replay
- Linq:
Replay
- Introduction to Rx: Replay
You can represent a Connectable Observable so that it behaves much like an ordinary Observable by using the refCount( )
operator. This operator keeps track of how many Subscribers are subscribed to the resulting Observable and refrains from disconnecting from the source ConnectableObservable until all such Observables unsubscribe.
- javadoc:
refCount( )
- RxJS:
refCount
- Linq:
RefCount
- Introduction to Rx: RefCount
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava