Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #895 Safer scan of ACTUAL and PARENT #896

Merged
merged 2 commits into from
Oct 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 66 additions & 10 deletions reactor-core/src/main/java/reactor/core/Scannable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -51,14 +51,23 @@ public interface Scannable {
*/
class Attr<T> {

/* IMPLEMENTATION NOTE
Note that some attributes define a object-to-T converter, which means their
private {@link #tryConvert(Object)} method can safely be used by
{@link Scannable#scan(Attr)}, making them resilient to class cast exceptions.
*/

/**
* The direct dependent component downstream reference if any. Operators in
* Flux/Mono for instance delegate to a target Subscriber, which is going to be
* the actual chain navigated with this reference key.
* the actual chain navigated with this reference key. Subscribers are not always
* {@link Scannable}, but this attribute will convert these raw results to an
* {@link Scannable#isScanAvailable() unavailable scan} object in this case.
* <p>
* A reference chain downstream can be navigated via {@link Scannable#actuals()}.
*/
public static final Attr<Scannable> ACTUAL = new Attr<>(null);
public static final Attr<Scannable> ACTUAL = new Attr<>(null,
Scannable::from);

/**
* A {@link Integer} attribute implemented by components with a backlog
Expand Down Expand Up @@ -126,11 +135,14 @@ class Attr<T> {
* Parent key exposes the direct upstream relationship of the scanned component.
* It can be a Publisher source to an operator, a Subscription to a Subscriber
* (main flow if ambiguous with inner Subscriptions like flatMap), a Scheduler to
* a Worker.
* a Worker. These types are not always {@link Scannable}, but this attribute
* will convert such raw results to an {@link Scannable#isScanAvailable() unavailable scan}
* object in this case.
* <p>
* {@link Scannable#parents()} can be used to navigate the parent chain.
*/
public static final Attr<Scannable> PARENT = new Attr<>(null);
public static final Attr<Scannable> PARENT = new Attr<>(null,
Scannable::from);

/**
* Prefetch is an {@link Integer} attribute defining the rate of processing in a
Expand Down Expand Up @@ -176,10 +188,50 @@ public T defaultValue(){
return defaultValue;
}

final T defaultValue;
/**
* Checks if this attribute is capable of safely converting any Object into
* a {@code T} via {@link #tryConvert(Object)} (potentially returning {@code null}
* or a Null Object for incompatible raw values).
* @return true if the attribute can safely convert any object, false if it can
* throw {@link ClassCastException}
*/
boolean isConversionSafe() {
return safeConverter != null;
}

/**
* Attempt to convert any {@link Object} instance o into a {@code T}. By default
* unsafe attributes will just try forcing a cast, which can lead to {@link ClassCastException}.
* However, attributes for which {@link #isConversionSafe()} returns true are
* required to not throw an exception (but rather return {@code null} or a Null
* Object).
*
* @param o the instance to attempt conversion on
* @return the converted instance
*/
@Nullable
T tryConvert(@Nullable Object o) {
if (o == null) {
return null;
}
if (safeConverter == null) {
//noinspection unchecked
return (T) o;
}
return safeConverter.apply(o);
}

final T defaultValue;
final Function<Object, ? extends T> safeConverter;

protected Attr(@Nullable T defaultValue){
this(defaultValue, null);
}

protected Attr(@Nullable T defaultValue,
@Nullable Function<Object, ? extends T> safeConverter) {
this.defaultValue = defaultValue;
this.safeConverter = safeConverter;
}

/**
Expand Down Expand Up @@ -362,8 +414,9 @@ default Stream<? extends Scannable> parents() {
*/
@Nullable
default <T> T scan(Attr<T> key) {
@SuppressWarnings("unchecked")
T value = (T) scanUnsafe(key);
//note tryConvert will just plain cast most of the time
//except e.g. for Attr<Scannable>
T value = key.tryConvert(scanUnsafe(key));
if (value == null)
return key.defaultValue();
return value;
Expand All @@ -380,8 +433,11 @@ default <T> T scan(Attr<T> key) {
* @return a value associated to the key or the provided default if unmatched or unresolved
*/
default <T> T scanOrDefault(Attr<T> key, T defaultValue) {
@SuppressWarnings("unchecked")
T v = (T) scanUnsafe(key);
T v;
//note tryConvert will just plain cast most of the time
//except e.g. for Attr<Scannable>
v = key.tryConvert(scanUnsafe(key));

if (v == null) {
return Objects.requireNonNull(defaultValue, "defaultValue");
}
Expand Down
52 changes: 52 additions & 0 deletions reactor-core/src/test/java/reactor/core/ScannableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.List;

import org.assertj.core.api.Condition;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -453,5 +454,56 @@ public void taggedAppendedParallelFluxTest() {
.containsExactlyInAnyOrder(Tuples.of("1", "One"), Tuples.of( "2", "Two"));
}

@Test
public void scanForParentIsSafe() {
Scannable scannable = key -> "String";

assertThat(scannable.scan(Scannable.Attr.PARENT))
.isSameAs(Scannable.Attr.UNAVAILABLE_SCAN);
}

@Test
public void scanForActualIsSafe() {
Scannable scannable = key -> "String";

assertThat(scannable.scan(Scannable.Attr.ACTUAL))
.isSameAs(Scannable.Attr.UNAVAILABLE_SCAN);
}

@Test
public void scanForRawParentOrActual() {
Scannable scannable = key -> "String";

assertThat(scannable.scanUnsafe(Scannable.Attr.ACTUAL))
.isInstanceOf(String.class)
.isEqualTo("String");

assertThat(scannable.scan(Scannable.Attr.ACTUAL))
.isSameAs(Scannable.Attr.UNAVAILABLE_SCAN);

assertThat(scannable.scanUnsafe(Scannable.Attr.PARENT))
.isInstanceOf(String.class)
.isEqualTo("String");

assertThat(scannable.scan(Scannable.Attr.PARENT))
.isSameAs(Scannable.Attr.UNAVAILABLE_SCAN);
}

@Test
public void attributeIsConversionSafe() {
assertThat(Scannable.Attr.ACTUAL.isConversionSafe()).as("ACTUAL").isTrue();
assertThat(Scannable.Attr.PARENT.isConversionSafe()).as("PARENT").isTrue();

assertThat(Scannable.Attr.BUFFERED.isConversionSafe()).as("BUFFERED").isFalse();
assertThat(Scannable.Attr.CAPACITY.isConversionSafe()).as("CAPACITY").isFalse();
assertThat(Scannable.Attr.CANCELLED.isConversionSafe()).as("CANCELLED").isFalse();
assertThat(Scannable.Attr.DELAY_ERROR.isConversionSafe()).as("DELAY_ERROR").isFalse();
assertThat(Scannable.Attr.ERROR.isConversionSafe()).as("ERROR").isFalse();
assertThat(Scannable.Attr.LARGE_BUFFERED.isConversionSafe()).as("LARGE_BUFFERED").isFalse();
assertThat(Scannable.Attr.NAME.isConversionSafe()).as("NAME").isFalse();
assertThat(Scannable.Attr.PREFETCH.isConversionSafe()).as("PREFETCH").isFalse();
assertThat(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM.isConversionSafe()).as("REQUESTED_FROM_DOWNSTREAM").isFalse();
assertThat(Scannable.Attr.TERMINATED.isConversionSafe()).as("TERMINATED").isFalse();
}

}