Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x Flowable.flatMap maxConcurrency should be unbounded unless it is restricted #5126

Closed
mitermayer opened this issue Feb 22, 2017 · 3 comments

Comments

@mitermayer
Copy link

mitermayer commented Feb 22, 2017

When more than 128 publishers that not all emit events causes miss of any further message that would be sent by subsequent publishers. This is due to the underlying implementation defaults to bufferSize() which is 128 https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/Flowable.java#L8239

According to http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#flatMap(io.reactivex.functions.Function)

The outer Publisher is consumed in unbounded mode

Example to replicate the issue: (assumes that publishers is greater than 128 and that none of the first 128 publishers are emitting any message):

public Flowable<MyType> foo() {
      return publishers.flatMap(publisher -> {
        return publisher
            .subscribeTo("bar")
            .doOnSubscribe(s -> System.out.println(s))
            .doOnError(t -> System.out.println(t))
            .map(MyClass::someOperation)
            .takeUntil(publisher.onClose().toFlowable());
     });
}

At the moment the work around is to use the overloaded flatMap to set manually set maxConcurrency to be unbounded.

@akarnokd
Copy link
Member

akarnokd commented Feb 22, 2017

Hi and thanks for reporting. flatMap(Function) should be bounded and the javadoc is wrong due to copy-paste error from merge() which is unbounded.

@benjchristensen
Copy link
Member

The reason it was never bounded in v1 is that it causes these types of hidden system hangs that are very hard to debug. This seems like a dangerous behavior to have changed.

@akarnokd
Copy link
Member

Yes, you have to unbound Flowable.flatMap manually for your use case as it defaults to a fixed number of active inner Publishers at once; this is a typical behavior for in-sequence operators that have an N:M relation between the input and output because Flowable is the backpressure-enabled, no-overflow base reactive type.

If you need unbounded behavior, consider using Observable. In v1, there was a theoretical problem when the same bounding on merge was attempted: GUI scenarios may merge more than 128 sources at once and bounding merge that late in the life of v1 could have caused unexpected hangs indeed.

The Javadoc has been fixed by and closing via #5127.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants