Skip to content

Support @Scheduled on Reactive methods and Kotlin suspending functions #29924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Polish: use a single check for reactive and kotlin methods
The support's `isReactive` method checks Kotlin suspending functions
first then reactive (Publisher-returning) methods second. It asserts
the relevant runtime, all in a single call.

Similarly, turning the Method into a Publisher is done via a single
common helper method `getPublisherFor(Method, Object)`.

All imports of reactive classes and other reactive-specific logic is
still outsourced to ScheduledAnnotationReactiveSupport in order to avoid
any classpath issue in the bean postprocessor.
  • Loading branch information
simonbasle committed May 5, 2023
commit c90595e4032ec681c9ffea1620ae024dbb873a40
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,10 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// Check for Kotlin suspending functions. If one is found but reactor bridge isn't on the classpath, throw
if (ScheduledAnnotationReactiveSupport.checkKotlinRuntimeIfNeeded(method)) {
processScheduledReactive(scheduled, method, bean, true);
return;
}
// Check for Publisher-returning methods. If found but Reactor isn't on the classpath, throw.
if (ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(method)) {
processScheduledReactive(scheduled, method, bean, false);
// Is method a Kotlin suspending function? Throws if true but reactor bridge isn't on the classpath.
// Is method returning a Publisher instance? Throws if true but Reactor isn't on the classpath.
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
processScheduledReactive(scheduled, method, bean);
return;
}
processScheduledSync(scheduled, method, bean);
Expand Down Expand Up @@ -553,7 +549,7 @@ protected void processScheduledSync(Scheduled scheduled, Method method, Object b
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
protected void processScheduledReactive(Scheduled scheduled, Method method, Object bean, boolean isSuspendingFunction) {
protected void processScheduledReactive(Scheduled scheduled, Method method, Object bean) {
try {
boolean processedSchedule = false;
String errorMessage =
Expand Down Expand Up @@ -592,7 +588,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
Duration fixedDelay = toDuration(scheduled.fixedDelay(), scheduled.timeUnit());
if (!fixedDelay.isNegative()) {
processedSchedule = true;
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false, isSuspendingFunction));
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false));
}

String fixedDelayString = scheduled.fixedDelayString();
Expand All @@ -610,7 +606,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false, isSuspendingFunction));
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false));
}
}

Expand All @@ -619,7 +615,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
if (!fixedRate.isNegative()) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true, isSuspendingFunction));
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
Expand All @@ -636,7 +632,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true, isSuspendingFunction));
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive cases
* without a dependency on optional classes.
* @author Simon Baslé
* @since 6.0.x //FIXME
* @since 6.1.0 //FIXME
*/
abstract class ScheduledAnnotationReactiveSupport {

Expand All @@ -53,17 +53,35 @@ abstract class ScheduledAnnotationReactiveSupport {
"kotlinx.coroutines.reactor.MonoKt", ScheduledAnnotationReactiveSupport.class.getClassLoader());

/**
* Checks that if the method is reactive, it can be scheduled. If it isn't reactive
* (Reactive Streams {@code Publisher} is not present at runtime or the method doesn't
* return a form of Publisher) then this check returns {@code false}. Otherwise, it is
* eligible for reactive scheduling and Reactor MUST also be present at runtime.
* Checks that if the method is reactive, it can be scheduled. Methods are considered
* eligible for reactive scheduling if they either return an instance of
* {@code Publisher} or are a Kotlin Suspending Function. If the method isn't matching
* these criteria then this check returns {@code false}.
* <p>For reactive scheduling, Reactor MUST be present at runtime. In the case of
* Kotlin, the Coroutine {@code kotlinx.coroutines.reactor} bridge MUST also be
* present at runtime (in order to invoke suspending functions as a {@code Mono}).
* Provided that is the case, this method returns {@code true}. Otherwise, it throws
* an {@code IllegalStateException}.
* @throws IllegalStateException if the method is reactive but Reactor and/or the
* Kotlin coroutines bridge are not present at runtime
*/
static boolean checkReactorRuntimeIfNeeded(Method method) {
if (!(publisherPresent && Publisher.class.isAssignableFrom(method.getReturnType()))) {
static boolean isReactive(Method method) {
if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
if (coroutinesReactorPresent) {
return true;
}
else {
throw new IllegalStateException("Kotlin suspending functions may only be annotated with @Scheduled"
+ "if Reactor and the Reactor-Coroutine bridge (kotlinx.coroutines.reactor) are present at runtime");
}
}
if (!publisherPresent) {
return false;
}
if (!Publisher.class.isAssignableFrom(method.getReturnType())) {
return false;
}

if (reactorPresent) {
return true;
}
Expand All @@ -72,31 +90,21 @@ static boolean checkReactorRuntimeIfNeeded(Method method) {
}

/**
* Checks that if the method is a Kotlin suspending function, it can be scheduled.
* If it isn't a suspending function (or Kotlin is not detected at runtime) then this
* check returns {@code false}. Otherwise, it is eligible for conversion to a
* {@code Mono} for reactive scheduling and both Reactor and the
* {@code kotlinx.coroutines.reactor} bridge MUST also be present at runtime.
* Provided that is the case, this method returns {@code true}. Otherwise, it throws
* an {@code IllegalStateException}.
* Turn the invocation of the provided {@code Method} into a {@code Publisher},
* either by reflectively invoking it and returning the resulting {@code Publisher}
* or by converting a Kotlin suspending function into a Publisher via
* {@link CoroutinesUtils}.
* The {@link #isReactive(Method)} check is a precondition to calling this method.
*/
static boolean checkKotlinRuntimeIfNeeded(Method method) {
if (!(KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method))) {
return false;
}
if (coroutinesReactorPresent) {
return true;
static Publisher<?> getPublisherFor(Method method, Object bean) {
if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
//Note that suspending functions declared without args have a single Continuation parameter in reflective inspection
Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be annotated "
+ "with @Scheduled if declared without arguments");

return CoroutinesUtils.invokeSuspendingFunction(method, bean, (Object[]) method.getParameters());
}
throw new IllegalStateException("Kotlin suspending functions may only be annotated with @Scheduled"
+ "if Reactor and the Reactor-Coroutine bridge (kotlinx.coroutines.reactor) are present at runtime");
}

/**
* Reflectively invoke a reactive method and return the resulting {@code Publisher}.
* The {@link #checkReactorRuntimeIfNeeded(Method)} check is a precondition to calling
* this method.
*/
static Publisher<?> getPublisherForReactiveMethod(Method method, Object bean) {
Assert.isTrue(method.getParameterCount() == 0, "Reactive methods may only be annotated with "
+ "@Scheduled if declared without arguments");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
Expand All @@ -113,22 +121,10 @@ static Publisher<?> getPublisherForReactiveMethod(Method method, Object bean) {
}
}

/**
* Turn the provided suspending function method into a {@code Publisher} via
* {@link CoroutinesUtils} and return that Publisher.
* The {@link #checkKotlinRuntimeIfNeeded(Method)} check is a precondition to calling
* this method.
*/
static Publisher<?> getPublisherForSuspendingFunction(Method method, Object bean) {
//Note that suspending functions declared without args have a single Continuation parameter in reflective inspection
Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be annotated "
+ "with @Scheduled if declared without arguments");

return CoroutinesUtils.invokeSuspendingFunction(method, bean, (Object[]) method.getParameters());
}

/**
* Encapsulates the logic of {@code @Scheduled} on reactive types, using Reactor.
* The {@link ScheduledAnnotationReactiveSupport#isReactive(Method)} check is a
* precondition to instantiating this class.
*/
static class ReactiveTask {

Expand All @@ -142,14 +138,8 @@ static class ReactiveTask {
private final Log logger = LogFactory.getLog(getClass());


protected ReactiveTask(Method method, Object bean, Duration initialDelay,
Duration otherDelay, boolean isFixedRate, boolean isSuspendingFunction) {
if (isSuspendingFunction) {
this.publisher = getPublisherForSuspendingFunction(method, bean);
}
else {
this.publisher = getPublisherForReactiveMethod(method, bean);
}
protected ReactiveTask(Method method, Object bean, Duration initialDelay, Duration otherDelay, boolean isFixedRate) {
this.publisher = getPublisherFor(method, bean);

this.initialDelay = initialDelay;
this.otherDelay = otherDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherForReactiveMethod;
import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherFor;

class ScheduledAnnotationReactiveSupportTests {

Expand All @@ -48,17 +48,17 @@ void ensureReactor() {
"publisherString", "monoThrows" }) //note: monoWithParams can't be found by this test
void checkIsReactive(String method) {
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, method);
assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(m)).as(m.getName()).isTrue();
assertThat(ScheduledAnnotationReactiveSupport.isReactive(m)).as(m.getName()).isTrue();
}

@Test
void checkNotReactive() {
Method string = ReflectionUtils.findMethod(ReactiveMethods.class, "oops");
Method future = ReflectionUtils.findMethod(ReactiveMethods.class, "future");

assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(string))
assertThat(ScheduledAnnotationReactiveSupport.isReactive(string))
.as("String-returning").isFalse();
assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(future))
assertThat(ScheduledAnnotationReactiveSupport.isReactive(future))
.as("Future-returning").isFalse();
}

Expand Down Expand Up @@ -137,16 +137,16 @@ void init() {
void rejectWithParams() {
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoWithParam", String.class);

assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(m)).as("isReactive").isTrue();
assertThat(ScheduledAnnotationReactiveSupport.isReactive(m)).as("isReactive").isTrue();

//static helper method
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherForReactiveMethod(m, target))
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherFor(m, target))
.withMessage("Reactive methods may only be annotated with @Scheduled if declared without arguments")
.withNoCause();

//constructor of task
assertThatIllegalArgumentException().isThrownBy(() -> new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ZERO, false, false))
m, target, Duration.ZERO, Duration.ZERO, false))
.withMessage("Reactive methods may only be annotated with @Scheduled if declared without arguments")
.withNoCause();
}
Expand All @@ -156,13 +156,13 @@ void rejectCantProducePublisher() {
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoThrows");

//static helper method
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherForReactiveMethod(m, target))
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherFor(m, target))
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
.withCause(new IllegalStateException("expected"));

//constructor of task
assertThatIllegalArgumentException().isThrownBy(() -> new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ZERO, false, false))
m, target, Duration.ZERO, Duration.ZERO, false))
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
.withCause(new IllegalStateException("expected"));
}
Expand All @@ -172,13 +172,13 @@ void rejectCantAccessMethod() {
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoThrowsIllegalAccess");

//static helper method
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherForReactiveMethod(m, target))
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherFor(m, target))
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
.withCause(new IllegalAccessException("expected"));

//constructor of task
assertThatIllegalArgumentException().isThrownBy(() -> new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ZERO, false, false))
m, target, Duration.ZERO, Duration.ZERO, false))
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
.withCause(new IllegalAccessException("expected"));
}
Expand All @@ -187,7 +187,7 @@ void rejectCantAccessMethod() {
void hasCheckpointToString() {
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono");
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ZERO, false, false);
m, target, Duration.ZERO, Duration.ZERO, false);

assertThat(reactiveTask).hasToString("@Scheduled 'mono()' in bean 'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods'");
}
Expand All @@ -196,7 +196,7 @@ void hasCheckpointToString() {
void cancelledEarlyPreventsSubscription() {
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "trackingMono");
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ofSeconds(10), false, false);
m, target, Duration.ZERO, Duration.ofSeconds(10), false);
reactiveTask.cancel();
reactiveTask.subscribe();

Expand All @@ -207,7 +207,7 @@ void cancelledEarlyPreventsSubscription() {
void noInitialDelayFixedDelay() throws InterruptedException {
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ofSeconds(10), false, false);
m, target, Duration.ZERO, Duration.ofSeconds(10), false);
reactiveTask.subscribe();
Thread.sleep(500);
reactiveTask.cancel();
Expand All @@ -219,7 +219,7 @@ void noInitialDelayFixedDelay() throws InterruptedException {
void noInitialDelayFixedRate() throws InterruptedException {
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ofSeconds(10), true, false);
m, target, Duration.ZERO, Duration.ofSeconds(10), true);
reactiveTask.subscribe();
Thread.sleep(500);
reactiveTask.cancel();
Expand All @@ -231,7 +231,7 @@ void noInitialDelayFixedRate() throws InterruptedException {
void initialDelayFixedDelay() throws InterruptedException {
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), false, false);
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), false);
reactiveTask.subscribe();
Thread.sleep(500);
reactiveTask.cancel();
Expand All @@ -243,7 +243,7 @@ void initialDelayFixedDelay() throws InterruptedException {
void initialDelayFixedRate() throws InterruptedException {
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), true, false);
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), true);
reactiveTask.subscribe();
Thread.sleep(500);
reactiveTask.cancel();
Expand All @@ -255,7 +255,7 @@ void initialDelayFixedRate() throws InterruptedException {
void monoErrorHasCheckpoint() throws InterruptedException {
Method m = ReflectionUtils.findMethod(target.getClass(), "monoError");
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
m, target, Duration.ZERO, Duration.ofSeconds(10), true, false);
m, target, Duration.ZERO, Duration.ofSeconds(10), true);

assertThat(reactiveTask.checkpoint).isEqualTo("@Scheduled 'monoError()' in bean "
+ "'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods'");
Expand Down
Loading