Skip to content

Commit

Permalink
fix #1270 Explicitly log() QueueSubcription as toString during onNext
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Jun 29, 2018
1 parent abd5ffe commit a20b840
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,18 @@ else if (s instanceof Fuseable.QueueSubscription) {
@Nullable
public Consumer<? super IN> onNextCall() {
if ((options & ON_NEXT) == ON_NEXT && (level != Level.INFO || log.isInfoEnabled())) {
return d -> log(SignalType.ON_NEXT, d, source);
return d -> log(SignalType.ON_NEXT, safeLog(d), source);
}
return null;
}

protected Object safeLog(Object value) {
if (value instanceof Fuseable.QueueSubscription) {
return value.toString();
}
return value;
}

@Override
@Nullable
public Consumer<? super Throwable> onErrorCall() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package reactor.core.publisher;

import java.util.Collection;
import java.util.Objects;
import java.util.logging.Level;

import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
Expand All @@ -48,7 +47,7 @@ public void testLogCollectionSubscription() {
false, CollectionSpecialLogger::new,
SignalType.ON_SUBSCRIBE);

StepVerifier.create(flux.doOnSubscribe(signalLogger.onSubscribeCall()))
StepVerifier.create(flux.doOnSubscribe(Objects.requireNonNull(signalLogger.onSubscribeCall())))
.expectSubscription()
.expectNext(1, 2, 3)
.expectComplete()
Expand All @@ -62,6 +61,31 @@ public void testLogCollectionSubscription() {
.verify();
}

@Test
public void testLogQueueSubscriptionValue() {
Flux<Flux<Integer>> source = Flux.just(1, 2, 3)
.window(2); //windows happen to be UnicastProcessor, which implements QueueSubscription directly :o

FluxPeek<Flux<Integer>> flux = new FluxPeek<>(source, null, null, null, null, null, null, null);
SignalLogger<Flux<Integer>> signalLogger = new SignalLogger<>(flux,
"test",
Level.INFO,
false, CollectionSpecialLogger::new,
SignalType.ON_NEXT);

StepVerifier.create(flux.doOnNext(Objects.requireNonNull(signalLogger.onNextCall())))
.expectNextCount(2)
.expectComplete()
.verify();

//verify that passing the QueueSubscription directly to logger would have considered
// it a Collection and thus failed with this custom Logger.
StepVerifier.create(flux.doOnNext(w -> signalLogger.log("{}", w)))
.expectErrorMatches(t -> t instanceof UnsupportedOperationException &&
t.getMessage().equals("Operators should not use this method!"))
.verify();
}

@Test
public void nullSubscriptionAsString() {
assertThat(SignalLogger.subscriptionAsString(null), is("null subscription"));
Expand Down

0 comments on commit a20b840

Please sign in to comment.