Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: ConnectableObservable redesign #5628

Closed
akarnokd opened this issue Sep 29, 2017 · 2 comments
Closed

3.x: ConnectableObservable redesign #5628

akarnokd opened this issue Sep 29, 2017 · 2 comments
Milestone

Comments

@akarnokd
Copy link
Member

akarnokd commented Sep 29, 2017

Problem

The CompletableObservable (and CompletableFlowable) implementors, publish and replay in RxJava 1.x and 2.x are inconsistent in their terminal behavior.

When publish terminates, its CompletableObservable will appear as fresh to the new subscribers. This has the drawback that such subscribers may hang as connect may be never called again.

In contrast, replay will stay terminated along with any cached items and new subscribers can still get those events. The drawback here is that a new connect will clear the internal storage and start the consumption of the main source while not giving any chance to subscribers to prepare and receive that stream of events from the start if the replay is bounded.

Dealing with this inconsistency currently requires refCount to trigger a reset on an unofficial channel: casting the CompletableObserver into Disposable if possible and disposing it when the count reaches zero again.

Suggested solution

I suggest changing the API to include an explicit reset() method and changing the logic to have 3 states:

fresh --> connect() --> running --> onComplete()/onError() --> terminated --> reset() --> fresh

and possibly:

terminated --> connect() --> running

In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached, at which point the state atomically changes to terminated. Consumers subscribing in this state will always receive the terminal event, and in case of replay, the cached items as well.

A call to reset() will clear the internal storage of the ConnectableObservable and start out as fresh again, allowing new consumers to gather around and get all fresh events from the beginning.

It is possible to support the call to connect in the terminated state to skip the fresh state. Preventing this transition, however, may be more involved as connect() should communicate this to be illegal transition someway as well as the need for a soft way for checking if connect is to succeed or not. Note that calling connect on a running ConnectableObservable is a no-op in 1.x and 2.x.

@OlegDokuka
Copy link

Hey @akarnokd. When are you planning to start working on that? I believe we may collaborate on that so I will port the same design to Reactor 3?

cc/ @smaldini

@akarnokd
Copy link
Member Author

Closing via #6519

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants