Skip to content

Support @Scheduled on Reactive methods and Kotlin suspending functions #29924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
improve testability of the subscribing runnables
  • Loading branch information
simonbasle committed May 15, 2023
commit ae3f20843cd8d67073844b26676e1e1dba69d9e5
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,45 @@ static Runnable createSubscriptionRunnable(Method method, Object targetBean, Sch
List<Runnable> subscriptionTrackerRegistry) {
boolean shouldBlock = scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString());
final Publisher<?> publisher = getPublisherFor(method, targetBean);
if (shouldBlock) {
return () -> {
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry);
}

/**
* Utility implementation of {@code Runnable} that subscribes to a {@code Publisher}
* or subscribes-then-blocks if {@code shouldBlock} is set to {@code true}.
*/
static final class SubscribingRunnable implements Runnable {

final Publisher<?> publisher;
final boolean shouldBlock;
final List<Runnable> subscriptionTrackerRegistry;

SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry) {
this.publisher = publisher;
this.shouldBlock = shouldBlock;
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
}

@Override
public void run() {
if (this.shouldBlock) {
final CountDownLatch latch = new CountDownLatch(1);
TrackingSubscriber subscriber = new TrackingSubscriber(subscriptionTrackerRegistry, latch);
subscriptionTrackerRegistry.add(subscriber);
publisher.subscribe(subscriber);
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, latch);
this.subscriptionTrackerRegistry.add(subscriber);
this.publisher.subscribe(subscriber);
try {
latch.await();
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
};
}
else {
final TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry);
this.subscriptionTrackerRegistry.add(subscriber);
this.publisher.subscribe(subscriber);
}
}
return () -> {
final TrackingSubscriber subscriber = new TrackingSubscriber(subscriptionTrackerRegistry);
subscriptionTrackerRegistry.add(subscriber);
publisher.subscribe(subscriber);
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.springframework.scheduling.annotation;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -29,10 +32,12 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ReflectionUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.createSubscriptionRunnable;
import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherFor;
import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive;

Expand Down Expand Up @@ -99,7 +104,56 @@ void rejectCantAccessMethod() {
.withCause(new IllegalAccessException("expected"));
}

//TODO find a way to test the case with fixedDelay effectively turning into a fixedRate ?
@Test
void fixedDelayIsBlocking() {
ReactiveMethods target = new ReactiveMethods();
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono");
Scheduled fixedDelayString = AnnotationUtils.synthesizeAnnotation(Map.of("fixedDelayString", "123"), Scheduled.class, null);
Scheduled fixedDelayLong = AnnotationUtils.synthesizeAnnotation(Map.of("fixedDelay", 123L), Scheduled.class, null);
List<Runnable> tracker = new ArrayList<>();

assertThat(createSubscriptionRunnable(m, target, fixedDelayString, tracker))
.isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr ->
assertThat(sr.shouldBlock).as("fixedDelayString.shouldBlock").isTrue()
);

assertThat(createSubscriptionRunnable(m, target, fixedDelayLong, tracker))
.isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr ->
assertThat(sr.shouldBlock).as("fixedDelayLong.shouldBlock").isTrue()
);
}

@Test
void fixedRateIsNotBlocking() {
ReactiveMethods target = new ReactiveMethods();
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono");
Scheduled fixedRateString = AnnotationUtils.synthesizeAnnotation(Map.of("fixedRateString", "123"), Scheduled.class, null);
Scheduled fixedRateLong = AnnotationUtils.synthesizeAnnotation(Map.of("fixedRate", 123L), Scheduled.class, null);
List<Runnable> tracker = new ArrayList<>();

assertThat(createSubscriptionRunnable(m, target, fixedRateString, tracker))
.isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr ->
assertThat(sr.shouldBlock).as("fixedRateString.shouldBlock").isFalse()
);

assertThat(createSubscriptionRunnable(m, target, fixedRateLong, tracker))
.isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr ->
assertThat(sr.shouldBlock).as("fixedRateLong.shouldBlock").isFalse()
);
}

@Test
void cronIsNotBlocking() {
ReactiveMethods target = new ReactiveMethods();
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono");
Scheduled cron = AnnotationUtils.synthesizeAnnotation(Map.of("cron", "-"), Scheduled.class, null);
List<Runnable> tracker = new ArrayList<>();

assertThat(createSubscriptionRunnable(m, target, cron, tracker))
.isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr ->
assertThat(sr.shouldBlock).as("cron.shouldBlock").isFalse()
);
}

@Test
void hasCheckpointToString() {
Expand Down