Skip to content

Commit 70f5126

Browse files
committed
Merge #3677 into 3.6.3
2 parents d10724d + 5304ecf commit 70f5126

File tree

1 file changed

+48
-16
lines changed

1 file changed

+48
-16
lines changed

reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -192,20 +192,25 @@ void timingOfActiveAndPendingTasks() throws InterruptedException {
192192
void schedulePeriodicallyTimesOneRunInActiveAndAllRunsInCompleted() throws InterruptedException {
193193
MockClock virtualClock = new MockClock();
194194
SimpleMeterRegistry registryWithVirtualClock = new SimpleMeterRegistry(SimpleConfig.DEFAULT, virtualClock);
195-
TimedScheduler test = new TimedScheduler(Schedulers.single(), registryWithVirtualClock, "test", Tags.empty());
195+
TimedScheduler test = new TimedScheduler(Schedulers.single(), registryWithVirtualClock, "test",
196+
Tags.empty());
196197

197198
//schedule a periodic task for which one run takes 500ms. we cancel after 3 runs
198199
CountDownLatch latch = new CountDownLatch(3);
199-
Disposable d = test.schedulePeriodically(
200-
() -> {
201-
try {
202-
virtualClock.add(Duration.ofMillis(500));
203-
}
204-
finally {
205-
latch.countDown();
206-
}
207-
},
208-
100, 100, TimeUnit.MILLISECONDS);
200+
201+
//decrement latch after all task & wrapper actions are performed
202+
Schedulers.onScheduleHook("test", task -> () -> {
203+
try {
204+
task.run();
205+
}
206+
finally {
207+
latch.countDown();
208+
}
209+
});
210+
211+
Disposable d = test.schedulePeriodically(() -> virtualClock.add(Duration.ofMillis(500)),
212+
100, 100, TimeUnit.MILLISECONDS);
213+
209214
latch.await(1, TimeUnit.SECONDS);
210215
d.dispose();
211216

@@ -219,6 +224,9 @@ void schedulePeriodicallyTimesOneRunInActiveAndAllRunsInCompleted() throws Inter
219224
assertThat(test.completedTasks.totalTime(TimeUnit.MILLISECONDS))
220225
.as("total duration of tasks")
221226
.isEqualTo(1500);
227+
228+
Schedulers.resetOnScheduleHook("test");
229+
test.disposeGracefully().block(Duration.ofSeconds(1));
222230
}
223231

224232
@Test
@@ -250,7 +258,16 @@ void schedulePeriodicallyIsCorrectlyMetered() throws InterruptedException {
250258
CountDownLatch latch = new CountDownLatch(5);
251259
TimedScheduler test = new TimedScheduler(Schedulers.single(), registry, "test", Tags.empty());
252260

253-
Disposable d = test.schedulePeriodically(latch::countDown, 100, 100, TimeUnit.MILLISECONDS);
261+
Schedulers.onScheduleHook("test", task -> () -> {
262+
try {
263+
task.run();
264+
}
265+
finally {
266+
latch.countDown();
267+
}
268+
});
269+
270+
Disposable d = test.schedulePeriodically(() -> {}, 100, 100, TimeUnit.MILLISECONDS);
254271

255272
latch.await(10, TimeUnit.SECONDS);
256273
d.dispose();
@@ -264,6 +281,9 @@ void schedulePeriodicallyIsCorrectlyMetered() throws InterruptedException {
264281
.isEqualTo(5)
265282
.matches(l -> l == test.submittedDirect.count() + test.submittedDelayed.count() + test.submittedPeriodicInitial.count()
266283
+ test.submittedPeriodicIteration.count(), "completed tasks == sum of all timer counts");
284+
285+
Schedulers.resetOnScheduleHook("test");
286+
test.disposeGracefully().block(Duration.ofSeconds(1));
267287
}
268288

269289
@Test
@@ -307,12 +327,20 @@ void workerScheduleDelayIncrementsDelayedCounter() throws InterruptedException {
307327

308328
@Test
309329
void workerSchedulePeriodicallyIsCorrectlyMetered() throws InterruptedException {
310-
Scheduler original = Schedulers.single();
311330
CountDownLatch latch = new CountDownLatch(5);
312-
TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty());
331+
TimedScheduler testScheduler = new TimedScheduler(Schedulers.single(), registry, "test", Tags.empty());
313332
Scheduler.Worker test = testScheduler.createWorker();
314333

315-
Disposable d = test.schedulePeriodically(latch::countDown, 100, 100, TimeUnit.MILLISECONDS);
334+
Schedulers.onScheduleHook("test", task -> () -> {
335+
try {
336+
task.run();
337+
}
338+
finally {
339+
latch.countDown();
340+
}
341+
});
342+
343+
Disposable d = test.schedulePeriodically(() -> {}, 100, 100, TimeUnit.MILLISECONDS);
316344

317345
latch.await(10, TimeUnit.SECONDS);
318346
d.dispose();
@@ -328,6 +356,10 @@ void workerSchedulePeriodicallyIsCorrectlyMetered() throws InterruptedException
328356
+ testScheduler.submittedDelayed.count()
329357
+ testScheduler.submittedPeriodicInitial.count()
330358
+ testScheduler.submittedPeriodicIteration.count(), "completed tasks == sum of all timer counts");
359+
360+
test.dispose();
361+
Schedulers.resetOnScheduleHook("test");
362+
testScheduler.disposeGracefully().block(Duration.ofSeconds(1));
331363
}
332364

333365
@Test

0 commit comments

Comments
 (0)