Skip to content

Commit

Permalink
Optimize Mono.flatMap
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jun 5, 2016
1 parent 49d7d41 commit ebca3fa
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 13 deletions.
19 changes: 6 additions & 13 deletions src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ public final Mono<T> filter(final Predicate<? super T> tester) {
* @return a new {@link Flux} as the sequence is not guaranteed to be single at most
*/
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return flatMap(mapper, PlatformDependent.SMALL_BUFFER_SIZE);
return new MonoFlatten<>(this, mapper);
}

/**
Expand All @@ -1523,17 +1523,10 @@ public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? exten
* @param <R> the merged sequence type
*
* @return a new {@link Flux} as the sequence is not guaranteed to be single at most
* @deprecated the single generated source can be streamed directly without prefetching and queueing
*/
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch) {
return new FluxFlatMap<>(
this,
mapper,
false,
Integer.MAX_VALUE,
QueueSupplier.one(),
prefetch,
QueueSupplier.get(prefetch)
);
return flatMap(mapper);
}

/**
Expand Down Expand Up @@ -1584,7 +1577,7 @@ public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? exten
*
*/
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
return flatMapIterable(mapper, PlatformDependent.SMALL_BUFFER_SIZE);
return new FluxFlattenIterable<>(this, mapper, Integer.MAX_VALUE, QueueSupplier.one());
}

/**
Expand All @@ -1600,10 +1593,10 @@ public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<
* @param <R> the merged output sequence type
*
* @return a merged {@link Flux}
*
* @deprecated prefetch amount is meaningless with a Mono source
*/
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
return new FluxFlattenIterable<>(this, mapper, prefetch, QueueSupplier.get(prefetch));
return flatMapIterable(mapper);
}

/**
Expand Down
210 changes: 210 additions & 0 deletions src/main/java/reactor/core/publisher/MonoFlatten.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;
import java.util.function.Function;

import org.reactivestreams.*;

import reactor.core.util.*;

public final class MonoFlatten<T, R> extends Flux<R> {
final Mono<? extends T> source;

final Function<? super T, ? extends Publisher<? extends R>> mapper;

public MonoFlatten(Mono<? extends T> source, Function<? super T, ? extends Publisher<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
public void subscribe(Subscriber<? super R> s) {
if (FluxFlatMap.trySubscribeScalarMap(source, s, mapper, false)) {
return;
}
source.subscribe(new FlattenSubscriber<>(s, mapper));
}

static final class FlattenSubscriber<T, R> implements Subscriber<T>, Subscription {
final Subscriber<? super R> actual;

final Function<? super T, ? extends Publisher<? extends R>> mapper;

Subscription main;

volatile Subscription inner;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<FlattenSubscriber, Subscription> INNER =
AtomicReferenceFieldUpdater.newUpdater(FlattenSubscriber.class, Subscription.class, "inner");

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<FlattenSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(FlattenSubscriber.class, "requested");

boolean hasValue;

public FlattenSubscriber(Subscriber<? super R> actual,
Function<? super T, ? extends Publisher<? extends R>> mapper) {
this.actual = actual;
this.mapper = mapper;
}

@Override
public void request(long n) {
Subscription a = inner;
if (a != null) {
a.request(n);
} else {
if (BackpressureUtils.validate(n)) {
BackpressureUtils.getAndAddCap(REQUESTED, this, n);
a = inner;
if (a != null) {
n = REQUESTED.getAndSet(this, 0L);
if (n != 0L) {
a.request(n);
}
}
}
}
}

@Override
public void cancel() {
main.cancel();
BackpressureUtils.terminate(INNER, this);
}

@Override
public void onSubscribe(Subscription s) {
if (BackpressureUtils.validate(this.main, s)) {
this.main = s;

actual.onSubscribe(this);

s.request(Long.MAX_VALUE);
}
}

boolean onSubscribeInner(Subscription s) {
if (BackpressureUtils.setOnce(INNER, this, s)) {

long r = REQUESTED.getAndSet(this, 0L);
if (r != 0) {
s.request(r);
}

return true;
}
return false;
}

@SuppressWarnings("unchecked")
@Override
public void onNext(T t) {
hasValue = true;

Publisher<? extends R> p;

try {
p = mapper.apply(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
actual.onError(ex);
return;
}

if (p == null) {
actual.onError(new NullPointerException("The mapper returned a null Publisher."));
return;
}

if (p instanceof Callable) {
R v;

try {
v = ((Callable<R>)p).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
actual.onError(ex);
return;
}

if (v == null) {
actual.onComplete();
} else {
onSubscribeInner(new ScalarSubscription<>(actual, v));
}

return;
}

p.subscribe(new InnerSubscriber<>(this, actual));
}

@Override
public void onError(Throwable t) {
if (hasValue) {
Exceptions.onErrorDropped(t);
return;
}
actual.onError(t);
}

@Override
public void onComplete() {
if (!hasValue) {
actual.onComplete();
}
}

static final class InnerSubscriber<R> implements Subscriber<R> {

final FlattenSubscriber<?, R> parent;

final Subscriber<? super R> actual;

public InnerSubscriber(FlattenSubscriber<?, R> parent, Subscriber<? super R> actual) {
this.parent = parent;
this.actual = actual;
}

@Override
public void onSubscribe(Subscription s) {
parent.onSubscribeInner(s);
}

@Override
public void onNext(R t) {
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
actual.onComplete();
}

}
}
}
60 changes: 60 additions & 0 deletions src/test/java/reactor/core/publisher/MonoFlattenTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;

import org.junit.Test;

import reactor.core.test.TestSubscriber;

public class MonoFlattenTest {

@Test
public void normal() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Mono.just(1).hide().flatMap(v -> Flux.just(2).hide())
.subscribe(ts);

ts.assertValues(2)
.assertNoError()
.assertComplete();
}

@Test
public void normalInnerJust() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Mono.just(1).hide().flatMap(v -> Flux.just(2))
.subscribe(ts);

ts.assertValues(2)
.assertNoError()
.assertComplete();
}

@Test
public void normalInnerEmpty() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Mono.just(1).hide().flatMap(v -> Flux.<Integer>empty())
.subscribe(ts);

ts.assertNoValues()
.assertNoError()
.assertComplete();
}

}

0 comments on commit ebca3fa

Please sign in to comment.