Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxJava + Reactive Streams #2917

Closed
benjchristensen opened this issue Apr 27, 2015 · 14 comments
Closed

RxJava + Reactive Streams #2917

benjchristensen opened this issue Apr 27, 2015 · 14 comments

Comments

@benjchristensen
Copy link
Member

I'd like to get clear communication of the relationship and recommendations of how Reactive Streams and RxJava relate and work together. I have started here: https://github.com/ReactiveX/RxJava/wiki/Reactive-Streams

I'd appreciate comments on what should be added, removed or changed.

@benjchristensen
Copy link
Member Author

Missing right now that I'd like to add is information on how to use RxJava for implementation a library but shading the dependency and exposing only Reactive Streams publicly, as I know this is something needed/wanted by library authors. It's easy to do but we should show how to do it.

@mfiguiere
Copy link

Good job! I think this page really clarify the relationship between RxJava and ReactiveStreams. A few comments:

  • in the "Pros of Exposing Reactive Stream APIs instead of RxJava", you mention it as being future proof, whereas actually ReactiveStreams will be relevant for the next 2 years before being deprecated as Flow will be the de-facto choice. So do you mean future-proof in the sense that a Publisher and a Flow are designed alike which will make refactoring trivial when moving to JDK9? If that's the case, it's kind of a simplified view as a breaking change of any API exposing Publisher will still be required to move to Flow.
  • The recommended approach with the example of the database driver seems a bit unrealistic to me. As it's hard to expect each vendor of such DB or MQ to deliver a separate module for each possible ReactiveStreams implementation on the market. It seems that the whole problem here is that ReactiveStreams provides a generic interface to represent these streams, but that there's no API provided to combien them and work with these streams. The whole point of having a standard reactive stream API is to actually allow vendors to expose it without having to expose the underlying implementation, which is the opposite of what this example does, at the end of the day.

@benjchristensen
Copy link
Member Author

Thank you for the feedback.

future proof

Valid point about Flow, though Reactive Streams itself will not stop working because Flow is released. The point about future proofing is that the Reactive Streams API will itself not be changing.

And yes, moving to Flow will be easy. In fact, the same class should be capable of implementing both a Reactive Stream Publisher and a Flow.Publisher since Flow is a JDK 9 copy/paste/rename of Reactive Streams.

It will be interesting to see whether Flow picks up quickly when Java 9 is released or takes until Java 10/11. People often take a long time to adopt JDKs so libraries are often slow to adopt. I'm curious to see whether multi-versioned jars make it into Java 8u60 and Java 9 to make it easier for libraries to adopt new language features and APIs and therefore speed up adoption of something like Flow.

The recommended approach with the example of the database driver seems a bit unrealistic to me.

It's not a recommendation to support "each possible implementation", but the popular one or two that customers are asking for. It is definitely not required, it is a recommendation to alleviate the "cons" of exposing only interfaces.

Current examples are Couchbase which have natively adopted RxJava 1.x and MongoDB which are exposing variants for RxJava (http://mongodb.github.io/mongo-java-driver-rx/) and Reactive Streams (http://mongodb.github.io/mongo-java-driver-reactivestreams/).

Of course using just Reactive Streams is sufficient and a developer can then convert it to whatever implementation they wish to by themselves, or create their own wrapper of the APIs.

The whole point of having a standard reactive stream API is to actually allow vendors to expose it without having to expose the underlying implementation

Yes, but note that the Reactive Streams API is only standardizing the protocol, not the higher order functions, scheduling or composition. The protocol of subscribe, request, onNext, onComplete, onError is important, but it's just the foundation. Anybody using Reactive Streams will need operators to make it usable, and the ReactiveStreams/Flow standard APIs don't offer any of those. And honestly I don't imagine that level of detail can or should be standardized beyond possibly map/flatMap.

How do you think I should adjust my recommendation to be more realistic?

@rkuhn
Copy link

rkuhn commented Apr 28, 2015

Looking great, @benjchristensen, thanks for writing this up!

@akarnokd
Copy link
Member

Since RS is very strict on backpressure, I'd mention our backpressure control mechanisms because many of our operators (the timed ones especially) don't support backpressure and observing them from i.e. Akka streams could be a problem. I think these operators can't get RS compliant ever so RxJava 2.0 as a whole library won't be RS compliant.

@benjchristensen
Copy link
Member Author

I think these operators can't get RS compliant ever so RxJava 2.0 as a whole library won't be RS compliant.

It can be compliant in that an Observable that implements Publisher would onError if it needs to emit but has not outstanding requests. This is even on "hot" sources like a mouse event stream. The source will keep firing, but if the consumer can't keep up and the requested count hits 0 then all the Observable/Publisher can then do is emit an error via onError.

many of our operators (the timed ones especially) don't support backpressure and observing them from i.e. Akka streams could be a problem

The RxJavaReactiveStreams bridge would emit an error in this case for 1.x. Version 2.x would need to behave as explained above. A valid outcome is to invoke onError if overflow is occurring and tell the user they need to choose a strategy. There are really only 2 options (since buffering is not allowed):

  1. drop messages silently
  2. emit an error

We choose strategy 2 as our default.

@akarnokd
Copy link
Member

It can be compliant in that an Observable that implements Publisher would onError if it needs to emit but has not outstanding requests. This is even on "hot" sources like a mouse event stream. The source will keep firing, but if the consumer can't keep up and the requested count hits 0 then all the Observable/Publisher can then do is emit an error via onError.

Good idea. How should we proceed?

  • Refit the operators to watch downstream request count and emit BackpressureNotSupportedException
  • add an operator onBackpressureError() which checks & emits this exception.
  • throw/onError immediately if downstream's first request is not 0 or Long.MAX_VALUE forcing the developer to always add one of the onBackpressureXXX operators.

@benjchristensen
Copy link
Member Author

For RxJava 1.x there isn't anything we need to do. The RxJavaReactiveStreams bridge handles things (or should if it doesn't and we missed something).

For RxJava 2.x I was experimenting but got distracted recently on making it so the abstractions of Observable/Publisher handle this. Let's take this up in the 2.x discussion issues instead of here.

@benjchristensen
Copy link
Member Author

Thanks @rkuhn for the review.

@akarnokd
Copy link
Member

Currently, the PublisherAdapter doesn't verify if the upstream tries to onNext without request and it still has the concurrency issue with the HashMap. Do you want to do a PR for these?

Edit: I post a PR there anyway.

@benjchristensen
Copy link
Member Author

I see you're already doing a PR for it, thank you.

@benjchristensen
Copy link
Member Author

I have updated the wiki page based on feedback I've received and a re-read of it. Hopefully it further clarifies some things without defeating my goal of being brief.

@benjchristensen
Copy link
Member Author

Publishing the page now. If any further improvements please comment here.

@rstoyanchev
Copy link

The following is interesting but not completely clear:

RxJava 2 will truly be "Reactive Extensions" now that there is an interface to extend.
RxJava 1 didn't have a base interface or contract to extend so had to define it from scratch.

The wiki page is about RS so "base interface" could be interpreted as the RS interfaces. Yet the mention of "Reactive Extensions" sounds more like an interface for Rx operations? This can be re-phrased a little perhaps to disambiguate. Also the next sentence ("RxJava 2 intends on being a high performing...") seems like it should be a new paragraph?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants