Skip to content

Commit 67b2f25

Browse files
authored
Add AutoCloseable shourtcut on Flux#using, Mono#using (#3704)
Make `Flux, Mono#using` to close `AutoCloseable` resource, so that users don't have to pass resourceCleanup consumer. Fixes #3333.
1 parent 70f5126 commit 67b2f25

File tree

6 files changed

+834
-119
lines changed

6 files changed

+834
-119
lines changed

reactor-core/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,10 @@ task japicmp(type: JapicmpTask) {
256256
classExcludes = [
257257
]
258258
methodExcludes = [
259+
"reactor.core.publisher.Flux#using(java.util.concurrent.Callable, java.util.function.Function)",
260+
"reactor.core.publisher.Flux#using(java.util.concurrent.Callable, java.util.function.Function, boolean)",
261+
"reactor.core.publisher.Mono#using(java.util.concurrent.Callable, java.util.function.Function)",
262+
"reactor.core.publisher.Mono#using(java.util.concurrent.Callable, java.util.function.Function, boolean)"
259263
]
260264
}
261265

reactor-core/src/main/java/reactor/core/Exceptions.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import java.util.Objects;
2424
import java.util.concurrent.RejectedExecutionException;
2525
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
26+
import java.util.function.Consumer;
2627

2728
import reactor.core.publisher.Flux;
2829
import reactor.util.Logger;
@@ -34,6 +35,7 @@
3435
* Global Reactor Core Exception handling and utils to operate on.
3536
*
3637
* @author Stephane Maldini
38+
* @author Injae Kim
3739
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
3840
*/
3941
public abstract class Exceptions {
@@ -861,4 +863,16 @@ static final class StaticThrowable extends Error {
861863
}
862864
}
863865

866+
/**
867+
* A general-purpose {@link Consumer} that closes {@link AutoCloseable} resource.
868+
* If exception is thrown during closing the resource, it will be propagated by {@link Exceptions#propagate(Throwable)}.
869+
*/
870+
public static final Consumer<? super AutoCloseable> AUTO_CLOSE = resource -> {
871+
try {
872+
resource.close();
873+
} catch (Throwable t) {
874+
throw Exceptions.propagate(t);
875+
}
876+
};
877+
864878
}

reactor-core/src/main/java/reactor/core/publisher/Flux.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -119,6 +119,7 @@
119119
* @author Stephane Maldini
120120
* @author David Karnok
121121
* @author Simon Baslé
122+
* @author Injae Kim
122123
*
123124
* @see Mono
124125
*/
@@ -2132,6 +2133,64 @@ public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Funct
21322133
eager));
21332134
}
21342135

2136+
/**
2137+
* Uses an {@link AutoCloseable} resource, generated by a supplier for each individual Subscriber,
2138+
* while streaming the values from a Publisher derived from the same resource and makes sure
2139+
* the resource is released if the sequence terminates or the Subscriber cancels.
2140+
* <p>
2141+
* Eager {@link AutoCloseable} resource cleanup happens just before the source termination and exceptions raised
2142+
* by the cleanup Consumer may override the terminal event.
2143+
* <p>
2144+
* <img class="marble" src="doc-files/marbles/usingForFlux.svg" alt="">
2145+
* <p>
2146+
* For an asynchronous version of the cleanup, with distinct path for onComplete, onError
2147+
* and cancel terminations, see {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)}.
2148+
*
2149+
* @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource
2150+
* @param sourceSupplier a factory to derive a {@link Publisher} from the supplied resource
2151+
* @param <T> emitted type
2152+
* @param <D> resource type
2153+
*
2154+
* @return a new {@link Flux} built around a disposable resource
2155+
* @see #usingWhen(Publisher, Function, Function, BiFunction, Function)
2156+
* @see #usingWhen(Publisher, Function, Function)
2157+
*/
2158+
public static <T, D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier,
2159+
Function<? super D, ? extends Publisher<? extends T>> sourceSupplier) {
2160+
return using(resourceSupplier, sourceSupplier, true);
2161+
}
2162+
2163+
/**
2164+
* Uses an {@link AutoCloseable} resource, generated by a supplier for each individual Subscriber,
2165+
* while streaming the values from a Publisher derived from the same resource and makes sure
2166+
* the resource is released if the sequence terminates or the Subscriber cancels.
2167+
* <p>
2168+
* <ul>
2169+
* <li>Eager {@link AutoCloseable} resource cleanup happens just before the source termination and exceptions raised
2170+
* by the cleanup Consumer may override the terminal event.</li>
2171+
* <li>Non-eager cleanup will drop any exception.</li>
2172+
* </ul>
2173+
* <p>
2174+
* <img class="marble" src="doc-files/marbles/usingForFlux.svg" alt="">
2175+
* <p>
2176+
* For an asynchronous version of the cleanup, with distinct path for onComplete, onError
2177+
* and cancel terminations, see {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)}.
2178+
*
2179+
* @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource
2180+
* @param sourceSupplier a factory to derive a {@link Publisher} from the supplied resource
2181+
* @param eager true to clean before terminating downstream subscribers
2182+
* @param <T> emitted type
2183+
* @param <D> resource type
2184+
*
2185+
* @return a new {@link Flux} built around a disposable resource
2186+
* @see #usingWhen(Publisher, Function, Function, BiFunction, Function)
2187+
* @see #usingWhen(Publisher, Function, Function)
2188+
*/
2189+
public static <T, D extends AutoCloseable> Flux<T> using(Callable<? extends D> resourceSupplier,
2190+
Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, boolean eager) {
2191+
return using(resourceSupplier, sourceSupplier, Exceptions.AUTO_CLOSE, eager);
2192+
}
2193+
21352194
/**
21362195
* Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber},
21372196
* while streaming the values from a {@link Publisher} derived from the same resource.

reactor-core/src/main/java/reactor/core/publisher/Mono.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -115,6 +115,7 @@
115115
* @author Stephane Maldini
116116
* @author David Karnok
117117
* @author Simon Baslé
118+
* @author Injae Kim
118119
* @see Flux
119120
*/
120121
public abstract class Mono<T> implements CorePublisher<T> {
@@ -912,6 +913,56 @@ public static <T, D> Mono<T> using(Callable<? extends D> resourceSupplier,
912913
return using(resourceSupplier, sourceSupplier, resourceCleanup, true);
913914
}
914915

916+
/**
917+
* Uses an {@link AutoCloseable} resource, generated by a supplier for each individual Subscriber,
918+
* while streaming the value from a Mono derived from the same resource and makes sure
919+
* the resource is released if the sequence terminates or the Subscriber cancels.
920+
* <p>
921+
* Unlike in {@link Flux#using(Callable, Function, Consumer) Flux}, in the case of a valued {@link Mono} the cleanup
922+
* happens just before passing the value to downstream. In all cases, exceptions raised by the cleanup
923+
* {@link Consumer} may override the terminal event, discarding the element if the derived {@link Mono} was valued.
924+
* <p>
925+
* <img class="marble" src="doc-files/marbles/usingForMono.svg" alt="">
926+
*
927+
* @param resourceSupplier a {@link Callable} that is called on subscribe to create the resource
928+
* @param sourceSupplier a {@link Mono} factory to create the Mono depending on the created resource
929+
* @param <T> emitted type
930+
* @param <D> resource type
931+
*
932+
* @return new {@link Mono}
933+
*/
934+
public static <T, D extends AutoCloseable> Mono<T> using(Callable<? extends D> resourceSupplier,
935+
Function<? super D, ? extends Mono<? extends T>> sourceSupplier) {
936+
return using(resourceSupplier, sourceSupplier, true);
937+
}
938+
939+
/**
940+
* Uses an {@link AutoCloseable} resource, generated by a supplier for each individual Subscriber,
941+
* while streaming the value from a Mono derived from the same resource and makes sure
942+
* the resource is released if the sequence terminates or the Subscriber cancels.
943+
* <p>
944+
* <ul>
945+
* <li>For eager cleanup, Unlike in {@link Flux#using(Callable, Function, Consumer) Flux},
946+
* in the case of a valued {@link Mono} the cleanup happens just before passing the value to downstream.
947+
* In all cases, exceptions raised by the cleanup {@link Consumer} may override the terminal event,
948+
* discarding the element if the derived {@link Mono} was valued.</li>
949+
* <li>Non-eager cleanup will drop any exception.</li>
950+
* </ul>
951+
* <p>
952+
* <img class="marble" src="doc-files/marbles/usingForMono.svg" alt="">
953+
*
954+
* @param resourceSupplier a {@link Callable} that is called on subscribe to create the resource
955+
* @param sourceSupplier a {@link Mono} factory to create the Mono depending on the created resource
956+
* @param eager set to true to clean before any signal (including onNext) is passed downstream
957+
* @param <T> emitted type
958+
* @param <D> resource type
959+
*
960+
* @return new {@link Mono}
961+
*/
962+
public static <T, D extends AutoCloseable> Mono<T> using(Callable<? extends D> resourceSupplier,
963+
Function<? super D, ? extends Mono<? extends T>> sourceSupplier, boolean eager) {
964+
return using(resourceSupplier, sourceSupplier, Exceptions.AUTO_CLOSE, eager);
965+
}
915966

916967
/**
917968
* Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber},

0 commit comments

Comments
 (0)