Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log() after window leads to UnsupportedOperationException #1270

Closed
helmbold opened this issue Jun 28, 2018 · 4 comments
Closed

log() after window leads to UnsupportedOperationException #1270

helmbold opened this issue Jun 28, 2018 · 4 comments
Assignees
Labels
status/has-workaround This has a known workaround described type/bug A general bug type/enhancement A general enhancement
Milestone

Comments

@helmbold
Copy link
Contributor

The following example with log() after window(3) leads to a java.lang.UnsupportedOperationException:

Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .window(3)
        .log("demo")
        .subscribe()

It works if I move the log() method call before window(3):

Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .log("demo")
        .window(3)
        .subscribe()

It looks a bit like a Log4j problem, but the Log4j config is actually very basic:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

Expected behavior

Usual logging.

Actual behavior

The code throws an UnsupportedOperationException with this stacktrace:

3:00:05.918 [main] INFO  demo - onSubscribe(FluxWindow.WindowExactSubscriber)
13:00:05.928 [main] INFO  demo - request(unbounded)
2018-06-28 13:00:05,934 main ERROR An exception occurred processing Appender Console java.lang.UnsupportedOperationException: Operators should not use this method!
	at reactor.core.Fuseable$QueueSubscription.iterator(Fuseable.java:140)
	at org.apache.logging.log4j.message.ParameterFormatter.appendCollection(ParameterFormatter.java:593)
	at org.apache.logging.log4j.message.ParameterFormatter.appendPotentiallyRecursiveValue(ParameterFormatter.java:501)
	at org.apache.logging.log4j.message.ParameterFormatter.recursiveDeepToString(ParameterFormatter.java:457)
	at org.apache.logging.log4j.message.ParameterFormatter.formatMessage2(ParameterFormatter.java:190)
	at org.apache.logging.log4j.message.ParameterizedMessage.formatTo(ParameterizedMessage.java:228)
	at org.apache.logging.log4j.core.pattern.MessagePatternConverter.format(MessagePatternConverter.java:123)
	at org.apache.logging.log4j.core.pattern.PatternFormatter.format(PatternFormatter.java:38)
	at org.apache.logging.log4j.core.layout.PatternLayout$PatternSerializer.toSerializable(PatternLayout.java:334)
	at org.apache.logging.log4j.core.layout.PatternLayout.toText(PatternLayout.java:233)
	at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:218)
	at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:58)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.directEncodeEvent(AbstractOutputStreamAppender.java:177)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.tryAppend(AbstractOutputStreamAppender.java:170)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:161)
	at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
	at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
	at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:433)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:403)
	at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:63)
	at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:146)
	at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2163)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2118)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2101)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2006)
	at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1875)
	at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:194)
	at reactor.util.Loggers$Slf4JLogger.info(Loggers.java:238)
	at reactor.core.publisher.SignalLogger.log(SignalLogger.java:180)
	at reactor.core.publisher.SignalLogger.lambda$onNextCall$2(SignalLogger.java:237)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:177)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.onNext(FluxWindow.java:164)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.request(FluxWindow.java:216)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:130)
	at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:89)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:163)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.onSubscribe(FluxWindow.java:144)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:54)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:60)
	at reactor.core.publisher.FluxWindow.subscribe(FluxWindow.java:89)
	at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:50)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6877)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:7044)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6870)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6834)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6752)
	at com.example.domain.operators.TimeBasedWindowOperatorTest.test(TimeBasedWindowOperatorTest.kt:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:513)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:170)
	at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:166)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:113)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:58)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:113)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:55)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:74)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

2018-06-28 13:00:05,943 main ERROR An exception occurred processing Appender Console java.lang.UnsupportedOperationException: Operators should not use this method!
	at reactor.core.Fuseable$QueueSubscription.iterator(Fuseable.java:140)
	at org.apache.logging.log4j.message.ParameterFormatter.appendCollection(ParameterFormatter.java:593)
	at org.apache.logging.log4j.message.ParameterFormatter.appendPotentiallyRecursiveValue(ParameterFormatter.java:501)
	at org.apache.logging.log4j.message.ParameterFormatter.recursiveDeepToString(ParameterFormatter.java:457)
	at org.apache.logging.log4j.message.ParameterFormatter.formatMessage2(ParameterFormatter.java:190)
	at org.apache.logging.log4j.message.ParameterizedMessage.formatTo(ParameterizedMessage.java:228)
	at org.apache.logging.log4j.core.pattern.MessagePatternConverter.format(MessagePatternConverter.java:123)
	at org.apache.logging.log4j.core.pattern.PatternFormatter.format(PatternFormatter.java:38)
	at org.apache.logging.log4j.core.layout.PatternLayout$PatternSerializer.toSerializable(PatternLayout.java:334)
	at org.apache.logging.log4j.core.layout.PatternLayout.toText(PatternLayout.java:233)
	at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:218)
	at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:58)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.directEncodeEvent(AbstractOutputStreamAppender.java:177)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.tryAppend(AbstractOutputStreamAppender.java:170)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:161)
	at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
	at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
	at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:433)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:403)
	at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:63)
	at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:146)
	at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2163)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2118)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2101)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2006)
	at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1875)
	at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:194)
	at reactor.util.Loggers$Slf4JLogger.info(Loggers.java:238)
	at reactor.core.publisher.SignalLogger.log(SignalLogger.java:180)
	at reactor.core.publisher.SignalLogger.lambda$onNextCall$2(SignalLogger.java:237)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:177)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.onNext(FluxWindow.java:164)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.request(FluxWindow.java:216)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:130)
	at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:89)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:163)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.onSubscribe(FluxWindow.java:144)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:54)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:60)
	at reactor.core.publisher.FluxWindow.subscribe(FluxWindow.java:89)
	at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:50)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6877)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:7044)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6870)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6834)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6752)
	at com.example.domain.operators.TimeBasedWindowOperatorTest.test(TimeBasedWindowOperatorTest.kt:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:513)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:170)
	at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:166)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:113)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:58)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:113)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:55)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:74)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

2018-06-28 13:00:05,950 main ERROR An exception occurred processing Appender Console java.lang.UnsupportedOperationException: Operators should not use this method!
	at reactor.core.Fuseable$QueueSubscription.iterator(Fuseable.java:140)
	at org.apache.logging.log4j.message.ParameterFormatter.appendCollection(ParameterFormatter.java:593)
	at org.apache.logging.log4j.message.ParameterFormatter.appendPotentiallyRecursiveValue(ParameterFormatter.java:501)
	at org.apache.logging.log4j.message.ParameterFormatter.recursiveDeepToString(ParameterFormatter.java:457)
	at org.apache.logging.log4j.message.ParameterFormatter.formatMessage2(ParameterFormatter.java:190)
	at org.apache.logging.log4j.message.ParameterizedMessage.formatTo(ParameterizedMessage.java:228)
	at org.apache.logging.log4j.core.pattern.MessagePatternConverter.format(MessagePatternConverter.java:123)
	at org.apache.logging.log4j.core.pattern.PatternFormatter.format(PatternFormatter.java:38)
	at org.apache.logging.log4j.core.layout.PatternLayout$PatternSerializer.toSerializable(PatternLayout.java:334)
	at org.apache.logging.log4j.core.layout.PatternLayout.toText(PatternLayout.java:233)
	at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:218)
	at org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:58)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.directEncodeEvent(AbstractOutputStreamAppender.java:177)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.tryAppend(AbstractOutputStreamAppender.java:170)
	at org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:161)
	at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
	at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
	at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
	at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:433)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:417)
	at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:403)
	at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:63)
	at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:146)
	at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2163)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2118)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2101)
	at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2006)
	at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1875)
	at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:194)
	at reactor.util.Loggers$Slf4JLogger.info(Loggers.java:238)
	at reactor.core.publisher.SignalLogger.log(SignalLogger.java:180)
	at reactor.core.publisher.SignalLogger.lambda$onNextCall$2(SignalLogger.java:237)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:177)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.onNext(FluxWindow.java:164)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.request(FluxWindow.java:216)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:130)
	at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:89)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:163)
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.onSubscribe(FluxWindow.java:144)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:54)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:60)
	at reactor.core.publisher.FluxWindow.subscribe(FluxWindow.java:89)
	at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:50)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6877)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:7044)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6870)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6834)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6752)
	at com.example.domain.operators.TimeBasedWindowOperatorTest.test(TimeBasedWindowOperatorTest.kt:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:513)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:170)
	at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:166)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:113)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:58)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:113)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:55)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:74)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Steps to reproduce

Run this method:

import reactor.core.publisher.Flux

fun main(args: Array<String>) {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
            .window(3)
            .log("demo")
            .subscribe()
}

Reactor Core version

3.1.8.RELEASE

JVM version

1.8.0_172

@simonbasle
Copy link
Member

simonbasle commented Jun 28, 2018

That's a case of Log4J2 (being a bit too smart and) attempting "deep toString" on a Collection by iterating through it.

The window operator returns a Flux<Flux<T>>, conceptually a Flux<WINDOW>. These WINDOW happen to implement QueueSubscription internally, a dumbed down Queue interface that doesn't support iteration (because most implementation will focus on lock-free offer() and poll() at the expense of other methods).

What you can do:

  • use Logback (haha)
  • consume the WINDOW, as you should always do anyway, and put the log() on each WINDOW consumption rather than on the main Flux<WINDOW>:
Flux.just(1, 2, 3, 4, 5)
    .window(2)
    .flatMap(w -> w.log())
    .subscribe();

What we could do:
Force Log4J's hand by detecting the QueueSubscription case in the log()'s onNext method and passing a toString() down to the actual logging framework.
This has the penalty of introducing an instanceof on the critical path, so we'd be happier if this could be avoided...

@simonbasle simonbasle self-assigned this Jun 28, 2018
@simonbasle simonbasle added status/need-decision This needs a decision from the team pending-confirm-resolved status/has-workaround This has a known workaround described and removed pending-confirm-resolved labels Jun 28, 2018
@helmbold
Copy link
Contributor Author

helmbold commented Jun 28, 2018

Thanks for explaining this strange behavior. Maybe you could make the exception message a bit more self explanatory, since "UnsupportedOperationException: Operators should not use this method!" is not that helpful in this case.

@simonbasle
Copy link
Member

It is a very generic message, but it comes from a very general-purpose class (the UnicastProcessor), which happens to be used as the implementation for the windows in this case.

@simonbasle
Copy link
Member

simonbasle commented Jun 28, 2018

We'll probably reword the message a bit (and maybe make it a static exception [1]) .
We could also try/catch for UnsupportedOperationException during logging, to fallback to the toString() representation

[1] https://shipilev.net/blog/2014/exceptional-performance/ for reference, but in the present case I think the stack trace has value (knowing who called the unsupported method, and which method it was)

@simonbasle simonbasle added type/enhancement A general enhancement and removed status/need-decision This needs a decision from the team labels Jun 28, 2018
@simonbasle simonbasle added this to the 3.1.x Maintenance Backlog milestone Jun 28, 2018
@simonbasle simonbasle added the type/bug A general bug label Jun 29, 2018
simonbasle added a commit that referenced this issue Jul 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/has-workaround This has a known workaround described type/bug A general bug type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

2 participants