Skip to content

Support @Scheduled on Reactive methods and Kotlin suspending functions #29924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
polish and fix tests
  • Loading branch information
simonbasle committed May 5, 2023
commit 78a47c9cb498bbfce34a8ee7fd5bfef1fa7a8c3f
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ static boolean isReactive(Method method) {
Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be"
+ " annotated with @Scheduled if declared without arguments");
Assert.isTrue(coroutinesReactorPresent, "Kotlin suspending functions may only be annotated with"
+ " @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactive) is present at runtime");
+ " @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime");
return true;
}
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
Expand Down Expand Up @@ -165,8 +165,8 @@ public void onNext(Object o) {
}

@Override
public void onError(Throwable e) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", e);
public void onError(Throwable ex) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
latch.countDown();
}

Expand All @@ -178,8 +178,8 @@ public void onComplete() {
try {
latch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
};
}
Expand All @@ -195,8 +195,8 @@ public void onNext(Object o) {
}

@Override
public void onError(Throwable e) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", e);
public void onError(Throwable ex) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,15 @@ void rejectCantAccessMethod() {

@Test
void hasCheckpointToString() {
//FIXME test checkpointing
assertThat("FIXME").isEqualTo("@Scheduled 'mono()' in bean 'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods'");
ReactiveMethods target = new ReactiveMethods();
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono");
Publisher<?> p = getPublisherFor(m, target);

assertThat(p.getClass().getName())
.as("checkpoint class")
.isEqualTo("reactor.core.publisher.FluxOnAssembly");

assertThat(p).hasToString("checkpoint(\"@Scheduled 'mono()' in bean 'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods'\")");
}

static class ReactiveMethods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.ReactiveTask
import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherFor
import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive
import org.springframework.util.ReflectionUtils
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.Continuation

Expand Down Expand Up @@ -122,11 +120,6 @@ class KotlinScheduledAnnotationReactiveSupportTests {
Assertions.assertThatIllegalArgumentException().isThrownBy { isReactive(m) }
.withMessage("Kotlin suspending functions may only be annotated with @Scheduled if declared without arguments")
.withNoCause()

//constructor of task doesn't reject
Assertions.assertThatNoException().isThrownBy {
ReactiveTask(m, target!!, Duration.ZERO, Duration.ZERO, false)
}
}

@Test
Expand All @@ -137,13 +130,6 @@ class KotlinScheduledAnnotationReactiveSupportTests {
Assertions.assertThatIllegalArgumentException().isThrownBy { getPublisherFor(m!!, target!!) }
.withMessage("Cannot convert the @Scheduled reactive method return type to Publisher")
.withNoCause()

//constructor of task
Assertions.assertThatIllegalArgumentException().isThrownBy {
ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ZERO, false)
}
.withMessage("Cannot convert the @Scheduled reactive method return type to Publisher")
.withNoCause()
}

@Test
Expand All @@ -155,11 +141,6 @@ class KotlinScheduledAnnotationReactiveSupportTests {
Assertions.assertThatIllegalStateException().isThrownBy { mono.block() }
.withMessage("expected")
.withNoCause()

//constructor of task doesn't throw
Assertions.assertThatNoException().isThrownBy {
ReactiveTask(m, target!!, Duration.ZERO, Duration.ZERO, false)
}
}

@Test
Expand All @@ -171,74 +152,4 @@ class KotlinScheduledAnnotationReactiveSupportTests {
mono.block()
assertThat(target!!.subscription).describedAs("after subscription").hasValue(1)
}

@Test
fun hasCheckpointToString() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspending", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ZERO, false)
assertThat(reactiveTask).hasToString("@Scheduled 'suspending()' in bean 'org.springframework.scheduling.annotation.KotlinScheduledAnnotationReactiveSupportTests\$SuspendingFunctions'")
}

@Test
fun cancelledEarlyPreventsSubscription() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofSeconds(10), false)
reactiveTask.cancel()
reactiveTask.subscribe()
assertThat(target!!.subscription).hasValue(0)
}

@Test
fun multipleSubscriptionsTracked() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofMillis(500), false)
reactiveTask.subscribe()
Thread.sleep(1500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValueGreaterThanOrEqualTo(3)
}

@Test
@Throws(InterruptedException::class)
fun noInitialDelayFixedDelay() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofSeconds(10), false)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(1)
}

@Test
@Throws(InterruptedException::class)
fun noInitialDelayFixedRate() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofSeconds(10), true)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(1)
}

@Test
@Throws(InterruptedException::class)
fun initialDelayFixedDelay() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ofSeconds(10), Duration.ofMillis(500), false)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(0)
}

@Test
@Throws(InterruptedException::class)
fun initialDelayFixedRate() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ofSeconds(10), Duration.ofMillis(500), true)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(0)
}
}