You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The CompletableObservable (and CompletableFlowable) implementors, publish and replay in RxJava 1.x and 2.x are inconsistent in their terminal behavior.
When publish terminates, its CompletableObservable will appear as fresh to the new subscribers. This has the drawback that such subscribers may hang as connect may be never called again.
In contrast, replay will stay terminated along with any cached items and new subscribers can still get those events. The drawback here is that a new connect will clear the internal storage and start the consumption of the main source while not giving any chance to subscribers to prepare and receive that stream of events from the start if the replay is bounded.
Dealing with this inconsistency currently requires refCount to trigger a reset on an unofficial channel: casting the CompletableObserver into Disposable if possible and disposing it when the count reaches zero again.
Suggested solution
I suggest changing the API to include an explicit reset() method and changing the logic to have 3 states:
In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached, at which point the state atomically changes to terminated. Consumers subscribing in this state will always receive the terminal event, and in case of replay, the cached items as well.
A call to reset() will clear the internal storage of the ConnectableObservable and start out as fresh again, allowing new consumers to gather around and get all fresh events from the beginning.
It is possible to support the call to connect in the terminated state to skip the fresh state. Preventing this transition, however, may be more involved as connect() should communicate this to be illegal transition someway as well as the need for a soft way for checking if connect is to succeed or not. Note that calling connect on a running ConnectableObservable is a no-op in 1.x and 2.x.
The text was updated successfully, but these errors were encountered:
Problem
The
CompletableObservable
(andCompletableFlowable
) implementors,publish
andreplay
in RxJava 1.x and 2.x are inconsistent in their terminal behavior.When
publish
terminates, itsCompletableObservable
will appear as fresh to the new subscribers. This has the drawback that such subscribers may hang asconnect
may be never called again.In contrast,
replay
will stay terminated along with any cached items and new subscribers can still get those events. The drawback here is that a newconnect
will clear the internal storage and start the consumption of the main source while not giving any chance to subscribers to prepare and receive that stream of events from the start if the replay is bounded.Dealing with this inconsistency currently requires
refCount
to trigger a reset on an unofficial channel: casting theCompletableObserver
intoDisposable
if possible and disposing it when the count reaches zero again.Suggested solution
I suggest changing the API to include an explicit
reset()
method and changing the logic to have 3 states:fresh -->
connect()
--> running -->onComplete()
/onError()
--> terminated -->reset()
--> freshand possibly:
terminated -->
connect()
--> runningIn the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached, at which point the state atomically changes to terminated. Consumers subscribing in this state will always receive the terminal event, and in case of
replay
, the cached items as well.A call to
reset()
will clear the internal storage of theConnectableObservable
and start out as fresh again, allowing new consumers to gather around and get all fresh events from the beginning.It is possible to support the call to
connect
in the terminated state to skip the fresh state. Preventing this transition, however, may be more involved asconnect()
should communicate this to be illegal transition someway as well as the need for a soft way for checking ifconnect
is to succeed or not. Note that callingconnect
on a runningConnectableObservable
is a no-op in 1.x and 2.x.The text was updated successfully, but these errors were encountered: