Skip to content

Commit

Permalink
fix reactor#1003 Add fuseable/conditional xxxDoOnEach
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle authored Apr 12, 2018
1 parent 0254f45 commit 3c8008e
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 49 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ ext {
// Testing
assertJVersion = '3.9.0'
mockitoVersion = '2.10.0'
jUnitParamsVersion = '1.1.1'

javadocLinks = ["https://docs.oracle.com/javase/8/docs/api/",
"https://docs.oracle.com/javaee/6/api/",
Expand Down Expand Up @@ -265,7 +266,8 @@ project('reactor-core') {
"org.testng:testng:6.8.5",
"org.assertj:assertj-core:$assertJVersion",
"org.mockito:mockito-core:$mockitoVersion",
"org.openjdk.jol:jol-core:0.9"
"org.openjdk.jol:jol-core:0.9",
"pl.pragmatists:JUnitParams:$jUnitParamsVersion"

if ("$compatibleVersion" != "SKIP") {
baseline("io.projectreactor:reactor-core:$compatibleVersion") {
Expand Down
3 changes: 3 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -3675,6 +3675,9 @@ public final Flux<T> doOnComplete(Runnable onComplete) {
* @see Signal
*/
public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
if (this instanceof Fuseable) {
return onAssembly(new FluxDoOnEachFuseable<>(this, signalConsumer));
}
return onAssembly(new FluxDoOnEach<>(this, signalConsumer));
}

Expand Down
134 changes: 128 additions & 6 deletions reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEach.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Fuseable.ConditionalSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand All @@ -41,14 +43,28 @@ final class FluxDoOnEach<T> extends FluxOperator<T, T> {
this.onSignal = Objects.requireNonNull(onSignal, "onSignal");
}

@SuppressWarnings("unchecked")
static <T> DoOnEachSubscriber<T> createSubscriber(CoreSubscriber<? super T> actual,
Consumer<? super Signal<T>> onSignal, boolean fuseable) {
if (fuseable) {
if(actual instanceof ConditionalSubscriber) {
return new DoOnEachFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, onSignal);
}
return new DoOnEachFuseableSubscriber<>(actual, onSignal);
}

if (actual instanceof ConditionalSubscriber) {
return new DoOnEachConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, onSignal);
}
return new DoOnEachSubscriber<>(actual, onSignal);
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
//TODO fuseable version?
//TODO conditional version?
source.subscribe(new DoOnEachSubscriber<>(actual, onSignal));
source.subscribe(createSubscriber(actual, onSignal, false));
}

static final class DoOnEachSubscriber<T> implements InnerOperator<T, T>, Signal<T> {
static class DoOnEachSubscriber<T> implements InnerOperator<T, T>, Signal<T> {

final CoreSubscriber<? super T> actual;
final Context cachedContext;
Expand All @@ -57,6 +73,8 @@ static final class DoOnEachSubscriber<T> implements InnerOperator<T, T>, Signal<
T t;

Subscription s;
@Nullable
Fuseable.QueueSubscription<T> qs;

boolean done;

Expand All @@ -79,8 +97,11 @@ public void cancel() {

@Override
public void onSubscribe(Subscription s) {
this.s = s;
actual.onSubscribe(this);
if (Operators.validate(this.s, s)) {
this.s = s;
this.qs = Operators.as(s);
actual.onSubscribe(this);
}
}

@Override
Expand Down Expand Up @@ -203,4 +224,105 @@ public String toString() {
return "doOnEach_onNext(" + t + ")";
}
}

static class DoOnEachFuseableSubscriber<T> extends DoOnEachSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {

boolean syncFused;

DoOnEachFuseableSubscriber(CoreSubscriber<? super T> actual,
Consumer<? super Signal<T>> onSignal) {
super(actual, onSignal);
}

@Override
public int requestFusion(int mode) {
QueueSubscription<T> qs = this.qs;
if (qs != null && (mode & Fuseable.THREAD_BARRIER) == 0) {
int m = qs.requestFusion(mode);
if (m != Fuseable.NONE) {
syncFused = m == Fuseable.SYNC;
}
return m;
}
return Fuseable.NONE;
}

@Override
public void clear() {
qs.clear(); //throws NPE, but should only be called after onSubscribe on a Fuseable
}

@Override
public boolean isEmpty() {
return qs == null || qs.isEmpty();
}

@Override
@Nullable
public T poll() {
if (qs == null) {
return null;
}
T v = qs.poll();
if (v == null && syncFused) {
done = true;
try {
onSignal.accept(Signal.complete(cachedContext));
}
catch (Throwable e) {
throw e;
}
} else if (v != null) {
this.t = v;
onSignal.accept(this); //throws in case of error
}
return v;
}

@Override
public int size() {
return qs == null ? 0 : qs.size();
}
}

static final class DoOnEachConditionalSubscriber<T> extends DoOnEachSubscriber<T>
implements ConditionalSubscriber<T> {

DoOnEachConditionalSubscriber(ConditionalSubscriber<? super T> actual,
Consumer<? super Signal<T>> onSignal) {
super(actual, onSignal);
}

@Override
@SuppressWarnings("unchecked")
public boolean tryOnNext(T t) {
boolean result = ((ConditionalSubscriber<? super T>)actual).tryOnNext(t);
if (result) {
this.t = t;
onSignal.accept(this);
}
return result;
}
}

static final class DoOnEachFuseableConditionalSubscriber<T> extends DoOnEachFuseableSubscriber<T>
implements ConditionalSubscriber<T> {

DoOnEachFuseableConditionalSubscriber(ConditionalSubscriber<? super T> actual,
Consumer<? super Signal<T>> onSignal) {
super(actual, onSignal);
}

@Override
@SuppressWarnings("unchecked")
public boolean tryOnNext(T t) {
boolean result = ((ConditionalSubscriber<? super T>) actual).tryOnNext(t);
if (result) {
this.t = t;
onSignal.accept(this);
}
return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Objects;
import java.util.function.Consumer;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;

/**
* Peek into the lifecycle events and signals of a sequence, {@link Fuseable} version of
* {@link FluxDoOnEach}.
*
* @param <T> the value type
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxDoOnEachFuseable<T> extends FluxOperator<T, T> implements Fuseable {

final Consumer<? super Signal<T>> onSignal;

FluxDoOnEachFuseable(Flux<? extends T> source, Consumer<? super Signal<T>> onSignal) {
super(source);
this.onSignal = Objects.requireNonNull(onSignal, "onSignal");
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
this.source.subscribe(FluxDoOnEach.createSubscriber(actual, this.onSignal, true));
}
}
3 changes: 3 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,9 @@ public final Mono<T> doOnSuccess(Consumer<? super T> onSuccess) {
*/
public final Mono<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
Objects.requireNonNull(signalConsumer, "signalConsumer");
if (this instanceof Fuseable) {
return onAssembly(new MonoDoOnEachFuseable<>(this, signalConsumer));
}
return onAssembly(new MonoDoOnEach<>(this, signalConsumer));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ final class MonoDoOnEach<T> extends MonoOperator<T, T> {

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
//TODO fuseable version?
//TODO conditional version?
source.subscribe(new FluxDoOnEach.DoOnEachSubscriber<>(actual, onSignal));
source.subscribe(FluxDoOnEach.createSubscriber(actual, onSignal, false));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Objects;
import java.util.function.Consumer;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;

/**
* Peek into the lifecycle events and signals of a sequence, {@link reactor.core.Fuseable}
* version of {@link MonoDoOnEach}.
*
* @param <T> the value type
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class MonoDoOnEachFuseable<T> extends MonoOperator<T, T> implements Fuseable {

final Consumer<? super Signal<T>> onSignal;

MonoDoOnEachFuseable(Mono<? extends T> source, Consumer<? super Signal<T>> onSignal) {
super(source);
this.onSignal = Objects.requireNonNull(onSignal, "onSignal");
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(FluxDoOnEach.createSubscriber(actual, onSignal, true));
}
}
Loading

0 comments on commit 3c8008e

Please sign in to comment.