Skip to content

Commit

Permalink
fix #1270 Explicitly log() QueueSubcription as toString during onNext
Browse files Browse the repository at this point in the history
This is done by catching `UnsupportedOperationException` in
`SignalLogger`. The exception is additionally logged in DEBUG mode.

`SignalLogger` has been reworked a bit (`log(...)` was called with a
3rd unused parameter), and the `QueueSubcription` UOE message has been
reworded.
  • Loading branch information
simonbasle authored Jul 2, 2018
1 parent abd5ffe commit b669827
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 48 deletions.
33 changes: 19 additions & 14 deletions reactor-core/src/main/java/reactor/core/Fuseable.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ interface ConditionalSubscriber<T> extends CoreSubscriber<T> {
* @param <T> the value type emitted
*/
interface QueueSubscription<T> extends Queue<T>, Subscription {

String NOT_SUPPORTED_MESSAGE = "Although QueueSubscription extends Queue it is purely internal" +
" and only guarantees support for poll/clear/size/isEmpty." +
" Instances shouldn't be used/exposed as Queue outside of Reactor operators.";

/**
* Request a specific fusion mode from this QueueSubscription.
Expand All @@ -104,75 +108,76 @@ interface QueueSubscription<T> extends Queue<T>, Subscription {
*/
int requestFusion(int requestedMode);


@Override
@Nullable
default T peek() {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean add(@Nullable T t) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean offer(@Nullable T t) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default T remove() {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default T element() {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean contains(@Nullable Object o) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default Iterator<T> iterator() {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default Object[] toArray() {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default <T1> T1[] toArray(T1[] a) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean remove(@Nullable Object o) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}

@Override
default boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("Operators should not use this method!");
throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE);
}
}

Expand Down
80 changes: 53 additions & 27 deletions reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,47 +165,55 @@ public Object scanUnsafe(Attr key) {
return null;
}

void log(Object... args) {
/**
* Structured logging with level adaptation and operator ascii graph if required.
*
* @param signalType the type of signal being logged
* @param signalValue the value for the signal (use empty string if not required)
*/
void log(SignalType signalType, Object signalValue) {
String line = fuseable ? LOG_TEMPLATE_FUSEABLE : LOG_TEMPLATE;
if (operatorLine != null) {
line = line + " " + operatorLine;
}
if (level == Level.FINEST) {
log.trace(line, args);
log.trace(line, signalType, signalValue);
}
else if (level == Level.FINE) {
log.debug(line, args);
log.debug(line, signalType, signalValue);
}
else if (level == Level.INFO) {
log.info(line, args);
log.info(line, signalType, signalValue);
}
else if (level == Level.WARNING) {
log.warn(line, args);
log.warn(line, signalType, signalValue);
}
else if (level == Level.SEVERE) {
log.error(line, args);
log.error(line, signalType, signalValue);
}
}

@Override
@Nullable
public Consumer<? super Subscription> onSubscribeCall() {
if ((options & ON_SUBSCRIBE) == ON_SUBSCRIBE && (level != Level.INFO || log.isInfoEnabled())) {
return s -> log(SignalType.ON_SUBSCRIBE, subscriptionAsString(s), source);
/**
* Structured logging with level adaptation and operator ascii graph if required +
* protection against loggers that detect objects like {@link Fuseable.QueueSubscription}
* as {@link java.util.Collection} and attempt to use their iterator for logging.
*
* @see #log
*/
void safeLog(SignalType signalType, Object signalValue) {
try {
log(signalType, signalValue);
}
return null;
}

@Nullable
@Override
public Consumer<? super Context> onCurrentContextCall() {
if ((options & CONTEXT_PARENT) == CONTEXT_PARENT && (level != Level.INFO || log
.isInfoEnabled())) {
return c -> log(SignalType.ON_CONTEXT, c, source);
catch (UnsupportedOperationException uoe) {
log(signalType, String.valueOf(signalValue));
if (log.isDebugEnabled()) {
log.debug("UnsupportedOperationException has been raised by the logging framework, does your log() placement make sense? " +
"eg. 'window(2).log()' instead of 'window(2).flatMap(w -> w.log())'", uoe);
}
}
return null;
}


static String subscriptionAsString(@Nullable Subscription s) {
if (s == null) {
return "null subscription";
Expand All @@ -230,11 +238,30 @@ else if (s instanceof Fuseable.QueueSubscription) {
return asString.toString();
}

@Override
@Nullable
public Consumer<? super Subscription> onSubscribeCall() {
if ((options & ON_SUBSCRIBE) == ON_SUBSCRIBE && (level != Level.INFO || log.isInfoEnabled())) {
return s -> log(SignalType.ON_SUBSCRIBE, subscriptionAsString(s));
}
return null;
}

@Nullable
@Override
public Consumer<? super Context> onCurrentContextCall() {
if ((options & CONTEXT_PARENT) == CONTEXT_PARENT && (level != Level.INFO || log
.isInfoEnabled())) {
return c -> log(SignalType.ON_CONTEXT, c);
}
return null;
}

@Override
@Nullable
public Consumer<? super IN> onNextCall() {
if ((options & ON_NEXT) == ON_NEXT && (level != Level.INFO || log.isInfoEnabled())) {
return d -> log(SignalType.ON_NEXT, d, source);
return d -> safeLog(SignalType.ON_NEXT, d);
}
return null;
}
Expand Down Expand Up @@ -278,7 +305,7 @@ else if (shouldLogAsDebug) {
@Nullable
public Runnable onCompleteCall() {
if ((options & ON_COMPLETE) == ON_COMPLETE && (level != Level.INFO || log.isInfoEnabled())) {
return () -> log(SignalType.ON_COMPLETE, "", source);
return () -> log(SignalType.ON_COMPLETE, "");
}
return null;
}
Expand All @@ -287,7 +314,7 @@ public Runnable onCompleteCall() {
@Nullable
public Runnable onAfterTerminateCall() {
if ((options & AFTER_TERMINATE) == AFTER_TERMINATE && (level != Level.INFO || log.isInfoEnabled())) {
return () -> log(SignalType.AFTER_TERMINATE, "", source);
return () -> log(SignalType.AFTER_TERMINATE, "");
}
return null;
}
Expand All @@ -297,8 +324,7 @@ public Runnable onAfterTerminateCall() {
public LongConsumer onRequestCall() {
if ((options & REQUEST) == REQUEST && (level != Level.INFO || log.isInfoEnabled())) {
return n -> log(SignalType.REQUEST,
Long.MAX_VALUE == n ? "unbounded" : n,
source);
Long.MAX_VALUE == n ? "unbounded" : n);
}
return null;
}
Expand All @@ -307,7 +333,7 @@ public LongConsumer onRequestCall() {
@Nullable
public Runnable onCancelCall() {
if ((options & CANCEL) == CANCEL && (level != Level.INFO || log.isInfoEnabled())) {
return () -> log(SignalType.CANCEL, "", source);
return () -> log(SignalType.CANCEL, "");
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package reactor.core.publisher;

import java.util.Collection;
import java.util.Objects;
import java.util.logging.Level;

import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
Expand All @@ -48,17 +47,42 @@ public void testLogCollectionSubscription() {
false, CollectionSpecialLogger::new,
SignalType.ON_SUBSCRIBE);

StepVerifier.create(flux.doOnSubscribe(signalLogger.onSubscribeCall()))
StepVerifier.create(flux.doOnSubscribe(Objects.requireNonNull(signalLogger.onSubscribeCall())))
.expectSubscription()
.expectNext(1, 2, 3)
.expectComplete()
.verify();

//verify that passing the subscription directly to logger would have considered
// it a Collection and thus failed with this custom Logger.
StepVerifier.create(flux.doOnSubscribe(s -> signalLogger.log(s, "")))
StepVerifier.create(flux.doOnSubscribe(s -> signalLogger.log(SignalType.ON_SUBSCRIBE, s)))
.expectErrorMatches(t -> t instanceof UnsupportedOperationException &&
t.getMessage().equals("Operators should not use this method!"))
t.getMessage().equals(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE))
.verify();
}

@Test
public void testLogQueueSubscriptionValue() {
Flux<Flux<Integer>> source = Flux.just(1, 2, 3)
.window(2); //windows happen to be UnicastProcessor, which implements QueueSubscription directly :o

FluxPeek<Flux<Integer>> flux = new FluxPeek<>(source, null, null, null, null, null, null, null);
SignalLogger<Flux<Integer>> signalLogger = new SignalLogger<>(flux,
"test",
Level.INFO,
false, CollectionSpecialLogger::new,
SignalType.ON_NEXT);

StepVerifier.create(flux.doOnNext(Objects.requireNonNull(signalLogger.onNextCall())))
.expectNextCount(2)
.expectComplete()
.verify();

//verify that passing the QueueSubscription directly to logger would have considered
// it a Collection and thus failed with this custom Logger.
StepVerifier.create(flux.doOnNext(w -> signalLogger.log(SignalType.ON_NEXT, w)))
.expectErrorMatches(t -> t instanceof UnsupportedOperationException &&
t.getMessage().equals(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE))
.verify();
}

Expand Down Expand Up @@ -236,7 +260,7 @@ public void fluxLogWithGivenLogger() {
.subscribe();

verify(mockLogger, only()).warn(anyString(), eq(SignalType.ON_NEXT),
eq("foo"), any());
eq("foo"));
verifyNoMoreInteractions(mockLogger);
}

Expand All @@ -251,7 +275,7 @@ public void monoLogWithGivenLogger() {
.subscribe();

verify(mockLogger, only()).warn(anyString(), eq(SignalType.ON_NEXT),
eq("foo"), any());
eq("foo"));
verifyNoMoreInteractions(mockLogger);
}

Expand Down

0 comments on commit b669827

Please sign in to comment.