Skip to content

Commit

Permalink
Include classname of null-returning map function in NPE msg (#2984)
Browse files Browse the repository at this point in the history
This commit logs the class name of the mapper provided to FluxMap,
FluxMapFuseable and FluxMapSignal operators in the 

NB: The later is exposed in the API as `flatMap(Function,Function,Function)`.

Fixes #2982.

Co-authored-by: Simon Baslé <sbasle@vmware.com>
  • Loading branch information
ismailalammar and simonbasle authored May 5, 2022
1 parent ced9a12 commit e33211c
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 33 deletions.
20 changes: 13 additions & 7 deletions reactor-core/src/main/java/reactor/core/publisher/FluxMap.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,8 +103,10 @@ public void onNext(T t) {
R v;

try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
v = mapper.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
}
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
Expand Down Expand Up @@ -203,8 +205,10 @@ public void onNext(T t) {
R v;

try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
v = mapper.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
}
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
Expand All @@ -230,8 +234,10 @@ public boolean tryOnNext(T t) {
R v;

try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
v = mapper.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
}
return actual.tryOnNext(v);
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -110,8 +110,10 @@ public void onNext(T t) {
R v;

try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
v = mapper.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
}
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
Expand Down Expand Up @@ -278,8 +280,10 @@ public void onNext(T t) {
R v;

try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
v = mapper.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
}
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
Expand All @@ -306,8 +310,10 @@ public boolean tryOnNext(T t) {
R v;

try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
v = mapper.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
}
return actual.tryOnNext(v);
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -141,8 +141,10 @@ public void onNext(T t) {
R v;

try {
v = Objects.requireNonNull(mapperNext.apply(t),
"The mapper returned a null value.");
v = mapperNext.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapperNext.getClass().getName() + "] returned a null value.");
}
}
catch (Throwable e) {
done = true;
Expand Down Expand Up @@ -171,8 +173,10 @@ public void onError(Throwable t) {
R v;

try {
v = Objects.requireNonNull(mapperError.apply(t),
"The mapper returned a null value.");
v = mapperError.apply(t);
if (v == null) {
throw new NullPointerException("The mapper [" + mapperError.getClass().getName() + "] returned a null value.");
}
}
catch (Throwable e) {
done = true;
Expand Down Expand Up @@ -203,8 +207,10 @@ public void onComplete() {
R v;

try {
v = Objects.requireNonNull(mapperComplete.get(),
"The mapper returned a null value.");
v = mapperComplete.get();
if (v == null) {
throw new NullPointerException("The mapper [" + mapperComplete.getClass().getName() + "] returned a null value.");
}
}
catch (Throwable e) {
done = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -135,6 +135,44 @@ public void flatMapSignal2() {
.verifyComplete();
}

@Test
void mapOnNextRejectsNull() {
FluxMapTest.NullFunction<String, Mono<Integer>> mapper = new FluxMapTest.NullFunction<>();
Flux.just("test")
.flatMap(mapper, null, null)
.as(StepVerifier::create)
.verifyErrorSatisfies(err -> assertThat(err)
.isInstanceOf(NullPointerException.class)
.hasMessage("The mapper [reactor.core.publisher.FluxMapTest$NullFunction] returned a null value.")
);
}

@Test
void mapOnErrorRejectsNull() {
final IllegalStateException originalException = new IllegalStateException("expected");
FluxMapTest.NullFunction<Throwable, Mono<Integer>> mapper = new FluxMapTest.NullFunction<>();
Flux.error(originalException)
.flatMap(null, mapper, null)
.as(StepVerifier::create)
.verifyErrorSatisfies(err -> assertThat(err)
.isInstanceOf(NullPointerException.class)
.hasMessage("The mapper [reactor.core.publisher.FluxMapTest$NullFunction] returned a null value.")
.hasSuppressedException(originalException)
);
}

@Test
void mapOnCompleteRejectsNull() {
FluxMapTest.NullSupplier<Mono<Integer>> mapper = new FluxMapTest.NullSupplier<>();
Flux.just("test")
.flatMap(null, null, mapper)
.as(StepVerifier::create)
.verifyErrorSatisfies(err -> assertThat(err)
.isInstanceOf(NullPointerException.class)
.hasMessage("The mapper [reactor.core.publisher.FluxMapTest$NullSupplier] returned a null value.")
);
}

@Test
public void scanOperator(){
Flux<Integer> parent = Flux.just(1);
Expand Down
97 changes: 86 additions & 11 deletions reactor-core/src/test/java/reactor/core/publisher/FluxMapTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,8 +21,14 @@
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;
import org.mockito.Mockito;
import org.reactivestreams.Subscription;

Expand All @@ -33,9 +39,11 @@
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.ConditionalTestSubscriber;
import reactor.test.subscriber.TestSubscriber;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.*;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

public class FluxMapTest extends FluxOperatorTest<String, String> {
Expand Down Expand Up @@ -125,16 +133,65 @@ public void mapperThrows() {
.assertNotComplete();
}

@Test
public void mapperReturnsNull() {
AssertSubscriber<Object> ts = AssertSubscriber.create();
@TestFactory
Stream<DynamicTest> mapperReturnsNull() {
Function<Integer, Integer> mapper = new NullFunction<>();

just.map(v -> null)
.subscribe(ts);
Stream<Named<Flux<Integer>>> sources = Stream.of(
Named.named("normal", Flux.just(1).hide()),
Named.named("fused", Flux.just(1))
);

ts.assertError(NullPointerException.class)
.assertNoValues()
.assertNotComplete();
return DynamicTest.stream(sources, src -> {
Flux<Integer> underTest = src.map(mapper);

underTest
.as(StepVerifier::create)
.verifyErrorSatisfies(err -> assertThat(err)
.isInstanceOf(NullPointerException.class)
.hasMessage("The mapper [reactor.core.publisher.FluxMapTest$NullFunction] returned a null value.")
);
});
}

@TestFactory
Stream<DynamicTest> mapperReturnsNullConditional() {
Function<Integer, Integer> mapper = new NullFunction<>();

Stream<Named<Boolean>> fusionModes = Stream.of(
Named.named("normal", false),
Named.named("fused", true)
);

return DynamicTest.stream(fusionModes, fused -> {
//the error will terminate the downstream, so we need two pairs: normal path and tryOnNext path
ConditionalTestSubscriber<Integer> testSubscriberTryPath = TestSubscriber.builder().buildConditional(i -> true);
ConditionalTestSubscriber<Integer> testSubscriberNormalPath = TestSubscriber.builder().buildConditional(i -> true);
Fuseable.ConditionalSubscriber<Integer> conditionalSubscriberNormalPath;
Fuseable.ConditionalSubscriber<Integer> conditionalSubscriberTryPath;
if (fused) {
conditionalSubscriberNormalPath = new FluxMapFuseable.MapFuseableConditionalSubscriber<>(testSubscriberNormalPath, mapper);
conditionalSubscriberTryPath = new FluxMapFuseable.MapFuseableConditionalSubscriber<>(testSubscriberTryPath, mapper);
}
else {
conditionalSubscriberNormalPath = new FluxMap.MapConditionalSubscriber<>(testSubscriberNormalPath, mapper);
conditionalSubscriberTryPath = new FluxMap.MapConditionalSubscriber<>(testSubscriberTryPath, mapper);
}

//test the non-conditional path
conditionalSubscriberNormalPath.onNext(1);
assertThat(testSubscriberNormalPath.expectTerminalError())
.as("normal path")
.isInstanceOf(NullPointerException.class)
.hasMessage("The mapper [reactor.core.publisher.FluxMapTest$NullFunction] returned a null value.");

//test the conditional path
conditionalSubscriberTryPath.tryOnNext(2);
assertThat(testSubscriberTryPath.expectTerminalError())
.as("try path")
.isInstanceOf(NullPointerException.class)
.hasMessage("The mapper [reactor.core.publisher.FluxMapTest$NullFunction] returned a null value.");
});
}

@Test
Expand Down Expand Up @@ -514,4 +571,22 @@ public void mapFuseableTryOnNextFailureStrategyResume() {
Hooks.resetOnNextError();
}
}

static class NullFunction<T, R> implements Function<T, R> {

@Nullable
@Override
public R apply(T t) {
return null;
}
}

static class NullSupplier<T> implements Supplier<T> {

@Nullable
@Override
public T get() {
return null;
}
}
}

0 comments on commit e33211c

Please sign in to comment.