From b6698270a740c6dd725def6de4a66ae3951513a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Mon, 2 Jul 2018 11:05:14 +0200 Subject: [PATCH] fix #1270 Explicitly log() QueueSubcription as toString during onNext This is done by catching `UnsupportedOperationException` in `SignalLogger`. The exception is additionally logged in DEBUG mode. `SignalLogger` has been reworked a bit (`log(...)` was called with a 3rd unused parameter), and the `QueueSubcription` UOE message has been reworded. --- .../src/main/java/reactor/core/Fuseable.java | 33 ++++---- .../reactor/core/publisher/SignalLogger.java | 80 ++++++++++++------- .../core/publisher/SignalLoggerTests.java | 38 +++++++-- 3 files changed, 103 insertions(+), 48 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/Fuseable.java b/reactor-core/src/main/java/reactor/core/Fuseable.java index 5d9599d14f..a848ad5b4a 100644 --- a/reactor-core/src/main/java/reactor/core/Fuseable.java +++ b/reactor-core/src/main/java/reactor/core/Fuseable.java @@ -86,6 +86,10 @@ interface ConditionalSubscriber extends CoreSubscriber { * @param the value type emitted */ interface QueueSubscription extends Queue, Subscription { + + String NOT_SUPPORTED_MESSAGE = "Although QueueSubscription extends Queue it is purely internal" + + " and only guarantees support for poll/clear/size/isEmpty." + + " Instances shouldn't be used/exposed as Queue outside of Reactor operators."; /** * Request a specific fusion mode from this QueueSubscription. @@ -104,75 +108,76 @@ interface QueueSubscription extends Queue, Subscription { */ int requestFusion(int requestedMode); + @Override @Nullable default T peek() { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean add(@Nullable T t) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean offer(@Nullable T t) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default T remove() { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default T element() { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean contains(@Nullable Object o) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default Iterator iterator() { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default Object[] toArray() { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default T1[] toArray(T1[] a) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean remove(@Nullable Object o) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean containsAll(Collection c) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean addAll(Collection c) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean removeAll(Collection c) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } @Override default boolean retainAll(Collection c) { - throw new UnsupportedOperationException("Operators should not use this method!"); + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java b/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java index f54c2a5cec..7b9fb6aecc 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java @@ -165,47 +165,55 @@ public Object scanUnsafe(Attr key) { return null; } - void log(Object... args) { + /** + * Structured logging with level adaptation and operator ascii graph if required. + * + * @param signalType the type of signal being logged + * @param signalValue the value for the signal (use empty string if not required) + */ + void log(SignalType signalType, Object signalValue) { String line = fuseable ? LOG_TEMPLATE_FUSEABLE : LOG_TEMPLATE; if (operatorLine != null) { line = line + " " + operatorLine; } if (level == Level.FINEST) { - log.trace(line, args); + log.trace(line, signalType, signalValue); } else if (level == Level.FINE) { - log.debug(line, args); + log.debug(line, signalType, signalValue); } else if (level == Level.INFO) { - log.info(line, args); + log.info(line, signalType, signalValue); } else if (level == Level.WARNING) { - log.warn(line, args); + log.warn(line, signalType, signalValue); } else if (level == Level.SEVERE) { - log.error(line, args); + log.error(line, signalType, signalValue); } } - @Override - @Nullable - public Consumer onSubscribeCall() { - if ((options & ON_SUBSCRIBE) == ON_SUBSCRIBE && (level != Level.INFO || log.isInfoEnabled())) { - return s -> log(SignalType.ON_SUBSCRIBE, subscriptionAsString(s), source); + /** + * Structured logging with level adaptation and operator ascii graph if required + + * protection against loggers that detect objects like {@link Fuseable.QueueSubscription} + * as {@link java.util.Collection} and attempt to use their iterator for logging. + * + * @see #log + */ + void safeLog(SignalType signalType, Object signalValue) { + try { + log(signalType, signalValue); } - return null; - } - - @Nullable - @Override - public Consumer onCurrentContextCall() { - if ((options & CONTEXT_PARENT) == CONTEXT_PARENT && (level != Level.INFO || log - .isInfoEnabled())) { - return c -> log(SignalType.ON_CONTEXT, c, source); + catch (UnsupportedOperationException uoe) { + log(signalType, String.valueOf(signalValue)); + if (log.isDebugEnabled()) { + log.debug("UnsupportedOperationException has been raised by the logging framework, does your log() placement make sense? " + + "eg. 'window(2).log()' instead of 'window(2).flatMap(w -> w.log())'", uoe); + } } - return null; } + static String subscriptionAsString(@Nullable Subscription s) { if (s == null) { return "null subscription"; @@ -230,11 +238,30 @@ else if (s instanceof Fuseable.QueueSubscription) { return asString.toString(); } + @Override + @Nullable + public Consumer onSubscribeCall() { + if ((options & ON_SUBSCRIBE) == ON_SUBSCRIBE && (level != Level.INFO || log.isInfoEnabled())) { + return s -> log(SignalType.ON_SUBSCRIBE, subscriptionAsString(s)); + } + return null; + } + + @Nullable + @Override + public Consumer onCurrentContextCall() { + if ((options & CONTEXT_PARENT) == CONTEXT_PARENT && (level != Level.INFO || log + .isInfoEnabled())) { + return c -> log(SignalType.ON_CONTEXT, c); + } + return null; + } + @Override @Nullable public Consumer onNextCall() { if ((options & ON_NEXT) == ON_NEXT && (level != Level.INFO || log.isInfoEnabled())) { - return d -> log(SignalType.ON_NEXT, d, source); + return d -> safeLog(SignalType.ON_NEXT, d); } return null; } @@ -278,7 +305,7 @@ else if (shouldLogAsDebug) { @Nullable public Runnable onCompleteCall() { if ((options & ON_COMPLETE) == ON_COMPLETE && (level != Level.INFO || log.isInfoEnabled())) { - return () -> log(SignalType.ON_COMPLETE, "", source); + return () -> log(SignalType.ON_COMPLETE, ""); } return null; } @@ -287,7 +314,7 @@ public Runnable onCompleteCall() { @Nullable public Runnable onAfterTerminateCall() { if ((options & AFTER_TERMINATE) == AFTER_TERMINATE && (level != Level.INFO || log.isInfoEnabled())) { - return () -> log(SignalType.AFTER_TERMINATE, "", source); + return () -> log(SignalType.AFTER_TERMINATE, ""); } return null; } @@ -297,8 +324,7 @@ public Runnable onAfterTerminateCall() { public LongConsumer onRequestCall() { if ((options & REQUEST) == REQUEST && (level != Level.INFO || log.isInfoEnabled())) { return n -> log(SignalType.REQUEST, - Long.MAX_VALUE == n ? "unbounded" : n, - source); + Long.MAX_VALUE == n ? "unbounded" : n); } return null; } @@ -307,7 +333,7 @@ public LongConsumer onRequestCall() { @Nullable public Runnable onCancelCall() { if ((options & CANCEL) == CANCEL && (level != Level.INFO || log.isInfoEnabled())) { - return () -> log(SignalType.CANCEL, "", source); + return () -> log(SignalType.CANCEL, ""); } return null; } diff --git a/reactor-core/src/test/java/reactor/core/publisher/SignalLoggerTests.java b/reactor-core/src/test/java/reactor/core/publisher/SignalLoggerTests.java index 2293c6a782..00b76bbcee 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/SignalLoggerTests.java +++ b/reactor-core/src/test/java/reactor/core/publisher/SignalLoggerTests.java @@ -17,12 +17,11 @@ package reactor.core.publisher; import java.util.Collection; +import java.util.Objects; import java.util.logging.Level; import org.assertj.core.api.Assertions; -import org.hamcrest.CoreMatchers; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; import org.reactivestreams.Subscription; import reactor.core.Fuseable; @@ -48,7 +47,7 @@ public void testLogCollectionSubscription() { false, CollectionSpecialLogger::new, SignalType.ON_SUBSCRIBE); - StepVerifier.create(flux.doOnSubscribe(signalLogger.onSubscribeCall())) + StepVerifier.create(flux.doOnSubscribe(Objects.requireNonNull(signalLogger.onSubscribeCall()))) .expectSubscription() .expectNext(1, 2, 3) .expectComplete() @@ -56,9 +55,34 @@ public void testLogCollectionSubscription() { //verify that passing the subscription directly to logger would have considered // it a Collection and thus failed with this custom Logger. - StepVerifier.create(flux.doOnSubscribe(s -> signalLogger.log(s, ""))) + StepVerifier.create(flux.doOnSubscribe(s -> signalLogger.log(SignalType.ON_SUBSCRIBE, s))) .expectErrorMatches(t -> t instanceof UnsupportedOperationException && - t.getMessage().equals("Operators should not use this method!")) + t.getMessage().equals(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE)) + .verify(); + } + + @Test + public void testLogQueueSubscriptionValue() { + Flux> source = Flux.just(1, 2, 3) + .window(2); //windows happen to be UnicastProcessor, which implements QueueSubscription directly :o + + FluxPeek> flux = new FluxPeek<>(source, null, null, null, null, null, null, null); + SignalLogger> signalLogger = new SignalLogger<>(flux, + "test", + Level.INFO, + false, CollectionSpecialLogger::new, + SignalType.ON_NEXT); + + StepVerifier.create(flux.doOnNext(Objects.requireNonNull(signalLogger.onNextCall()))) + .expectNextCount(2) + .expectComplete() + .verify(); + + //verify that passing the QueueSubscription directly to logger would have considered + // it a Collection and thus failed with this custom Logger. + StepVerifier.create(flux.doOnNext(w -> signalLogger.log(SignalType.ON_NEXT, w))) + .expectErrorMatches(t -> t instanceof UnsupportedOperationException && + t.getMessage().equals(Fuseable.QueueSubscription.NOT_SUPPORTED_MESSAGE)) .verify(); } @@ -236,7 +260,7 @@ public void fluxLogWithGivenLogger() { .subscribe(); verify(mockLogger, only()).warn(anyString(), eq(SignalType.ON_NEXT), - eq("foo"), any()); + eq("foo")); verifyNoMoreInteractions(mockLogger); } @@ -251,7 +275,7 @@ public void monoLogWithGivenLogger() { .subscribe(); verify(mockLogger, only()).warn(anyString(), eq(SignalType.ON_NEXT), - eq("foo"), any()); + eq("foo")); verifyNoMoreInteractions(mockLogger); }