Skip to content

Commit 05026d8

Browse files
committed
Quartz: introduce Nonconcurrent
1 parent 5100063 commit 05026d8

File tree

15 files changed

+415
-82
lines changed

15 files changed

+415
-82
lines changed

extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.Optional;
1111
import java.util.Set;
1212
import java.util.logging.Level;
13+
import java.util.stream.Collectors;
1314

1415
import jakarta.inject.Singleton;
1516

@@ -55,6 +56,7 @@
5556
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
5657
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
5758
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
59+
import io.quarkus.quartz.Nonconcurrent;
5860
import io.quarkus.quartz.runtime.QuarkusQuartzConnectionPoolProvider;
5961
import io.quarkus.quartz.runtime.QuartzBuildTimeConfig;
6062
import io.quarkus.quartz.runtime.QuartzExtensionPointConfig;
@@ -69,6 +71,7 @@
6971
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
7072
import io.quarkus.runtime.configuration.ConfigurationException;
7173
import io.quarkus.scheduler.Scheduled;
74+
import io.quarkus.scheduler.deployment.ScheduledBusinessMethodItem;
7275
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;
7376

7477
public class QuartzProcessor {
@@ -79,6 +82,7 @@ public class QuartzProcessor {
7982
private static final DotName DELEGATE_HSQLDB = DotName.createSimple(QuarkusHSQLDBDelegate.class.getName());
8083
private static final DotName DELEGATE_MSSQL = DotName.createSimple(QuarkusMSSQLDelegate.class.getName());
8184
private static final DotName DELEGATE_STDJDBC = DotName.createSimple(QuarkusStdJDBCDelegate.class.getName());
85+
private static final DotName NONCONCURRENT = DotName.createSimple(Nonconcurrent.class);
8286

8387
@BuildStep
8488
FeatureBuildItem feature() {
@@ -313,12 +317,17 @@ public void start(BuildProducer<ServiceStartBuildItem> serviceStart,
313317
@Record(RUNTIME_INIT)
314318
public void quartzSupportBean(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig,
315319
QuartzRecorder recorder,
316-
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer,
317-
QuartzJDBCDriverDialectBuildItem driverDialect) {
320+
QuartzJDBCDriverDialectBuildItem driverDialect,
321+
List<ScheduledBusinessMethodItem> scheduledMethods,
322+
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer) {
318323

319324
syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem.configure(QuartzSupport.class)
320325
.scope(Singleton.class) // this should be @ApplicationScoped but it fails for some reason
321326
.setRuntimeInit()
322-
.supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver())).done());
327+
.supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver(),
328+
scheduledMethods.stream().filter(m -> m.getMethod().hasAnnotation(NONCONCURRENT))
329+
.map(m -> m.getMethod().declaringClass().name().toString() + "_" + m.getMethod().name())
330+
.collect(Collectors.toSet())))
331+
.done());
323332
}
324333
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import jakarta.inject.Inject;
10+
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.RegisterExtension;
13+
14+
import io.quarkus.quartz.QuartzScheduler;
15+
import io.quarkus.scheduler.Scheduled;
16+
import io.quarkus.test.QuarkusUnitTest;
17+
18+
public class NonconcurrentJobDefinitionTest {
19+
20+
@RegisterExtension
21+
static final QuarkusUnitTest test = new QuarkusUnitTest()
22+
.withApplicationRoot(root -> root
23+
.addClasses(Jobs.class))
24+
.overrideConfigKey("quarkus.scheduler.start-mode", "forced")
25+
.overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread",
26+
"true");
27+
28+
@Inject
29+
QuartzScheduler scheduler;
30+
31+
@Test
32+
public void testExecution() throws InterruptedException {
33+
scheduler.newJob("foo")
34+
.setTask(se -> {
35+
Jobs.NONCONCURRENT_COUNTER.incrementAndGet();
36+
try {
37+
if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
38+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
39+
}
40+
} catch (InterruptedException e) {
41+
throw new IllegalStateException(e);
42+
}
43+
if (Jobs.NONCONCURRENT_COUNTER.get() == 1) {
44+
// concurrent() executed >= 5x and nonconcurrent() 1x
45+
Jobs.NONCONCURRENT_LATCH.countDown();
46+
}
47+
})
48+
.setInterval("1s")
49+
.setNonconcurrent()
50+
.schedule();
51+
52+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
53+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
54+
}
55+
56+
static class Jobs {
57+
58+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
59+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
60+
61+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
62+
63+
@Scheduled(identity = "bar", every = "1s")
64+
void concurrent() throws InterruptedException {
65+
CONCURRENT_LATCH.countDown();
66+
}
67+
68+
}
69+
70+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import jakarta.inject.Inject;
10+
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.RegisterExtension;
13+
import org.quartz.DisallowConcurrentExecution;
14+
import org.quartz.Job;
15+
import org.quartz.JobBuilder;
16+
import org.quartz.JobDetail;
17+
import org.quartz.JobExecutionContext;
18+
import org.quartz.JobExecutionException;
19+
import org.quartz.SchedulerException;
20+
import org.quartz.SimpleScheduleBuilder;
21+
import org.quartz.Trigger;
22+
import org.quartz.TriggerBuilder;
23+
24+
import io.quarkus.quartz.QuartzScheduler;
25+
import io.quarkus.scheduler.Scheduled;
26+
import io.quarkus.scheduler.Scheduler;
27+
import io.quarkus.test.QuarkusUnitTest;
28+
29+
public class NonconcurrentProgrammaticTest {
30+
31+
@RegisterExtension
32+
static final QuarkusUnitTest test = new QuarkusUnitTest()
33+
.withApplicationRoot(root -> root
34+
.addClasses(Jobs.class))
35+
.overrideConfigKey("quarkus.scheduler.start-mode", "halted");
36+
37+
@Inject
38+
QuartzScheduler scheduler;
39+
40+
@Test
41+
public void testExecution() throws SchedulerException, InterruptedException {
42+
JobDetail job = JobBuilder.newJob(Jobs.class)
43+
.withIdentity("foo", Scheduler.class.getName())
44+
.build();
45+
Trigger trigger = TriggerBuilder.newTrigger()
46+
.withIdentity("foo", Scheduler.class.getName())
47+
.startNow()
48+
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
49+
.withIntervalInSeconds(1)
50+
.repeatForever())
51+
.build();
52+
scheduler.getScheduler().scheduleJob(job, trigger);
53+
54+
scheduler.resume();
55+
56+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
57+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
58+
}
59+
60+
@DisallowConcurrentExecution
61+
static class Jobs implements Job {
62+
63+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
64+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
65+
66+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
67+
68+
@Scheduled(identity = "bar", every = "1s")
69+
void concurrent() throws InterruptedException {
70+
CONCURRENT_LATCH.countDown();
71+
}
72+
73+
@Override
74+
public void execute(JobExecutionContext context) throws JobExecutionException {
75+
Jobs.NONCONCURRENT_COUNTER.incrementAndGet();
76+
try {
77+
if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
78+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
79+
}
80+
} catch (InterruptedException e) {
81+
throw new IllegalStateException(e);
82+
}
83+
if (Jobs.NONCONCURRENT_COUNTER.get() == 1) {
84+
// concurrent() executed >= 5x and nonconcurrent() 1x
85+
Jobs.NONCONCURRENT_LATCH.countDown();
86+
}
87+
}
88+
89+
}
90+
91+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.RegisterExtension;
11+
12+
import io.quarkus.quartz.Nonconcurrent;
13+
import io.quarkus.scheduler.Scheduled;
14+
import io.quarkus.test.QuarkusUnitTest;
15+
16+
public class NonconcurrentTest {
17+
18+
@RegisterExtension
19+
static final QuarkusUnitTest test = new QuarkusUnitTest()
20+
.withApplicationRoot(root -> root
21+
.addClasses(Jobs.class))
22+
.overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread",
23+
"true");
24+
25+
@Test
26+
public void testExecution() throws InterruptedException {
27+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
28+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
29+
}
30+
31+
static class Jobs {
32+
33+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
34+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
35+
36+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
37+
38+
@Nonconcurrent
39+
@Scheduled(identity = "foo", every = "1s")
40+
void nonconcurrent() throws InterruptedException {
41+
NONCONCURRENT_COUNTER.incrementAndGet();
42+
if (!CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
43+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
44+
}
45+
if (NONCONCURRENT_COUNTER.get() == 1) {
46+
// concurrent() executed >= 5x and nonconcurrent() 1x
47+
NONCONCURRENT_LATCH.countDown();
48+
}
49+
}
50+
51+
@Scheduled(identity = "bar", every = "1s")
52+
void concurrent() throws InterruptedException {
53+
CONCURRENT_LATCH.countDown();
54+
}
55+
56+
}
57+
}

extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testJobs() throws InterruptedException {
6969
.setSkipPredicate(AlwaysSkipPredicate.class)
7070
.schedule();
7171

72-
Scheduler.JobDefinition job1 = scheduler.newJob("foo")
72+
Scheduler.JobDefinition<?> job1 = scheduler.newJob("foo")
7373
.setInterval("1s")
7474
.setTask(ec -> {
7575
assertTrue(Arc.container().requestContext().isActive());
@@ -79,7 +79,7 @@ public void testJobs() throws InterruptedException {
7979
assertEquals("Sync task was already set",
8080
assertThrows(IllegalStateException.class, () -> job1.setAsyncTask(ec -> null)).getMessage());
8181

82-
Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?");
82+
Scheduler.JobDefinition<?> job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?");
8383
assertEquals("Either sync or async task must be set",
8484
assertThrows(IllegalStateException.class, () -> job2.schedule()).getMessage());
8585
job2.setTask(ec -> {
@@ -117,7 +117,7 @@ public void testJobs() throws InterruptedException {
117117
@Test
118118
public void testAsyncJob() throws InterruptedException, SchedulerException {
119119
String identity = "fooAsync";
120-
JobDefinition asyncJob = scheduler.newJob(identity)
120+
JobDefinition<?> asyncJob = scheduler.newJob(identity)
121121
.setInterval("1s")
122122
.setAsyncTask(ec -> {
123123
assertTrue(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext());
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.quarkus.quartz;
2+
3+
import static java.lang.annotation.ElementType.METHOD;
4+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
5+
6+
import java.lang.annotation.Retention;
7+
import java.lang.annotation.Target;
8+
9+
import org.quartz.DisallowConcurrentExecution;
10+
import org.quartz.Job;
11+
12+
import io.quarkus.scheduler.Scheduled;
13+
import io.quarkus.scheduler.SkippedExecution;
14+
15+
/**
16+
* Annotated scheduled method may not be executed concurrently. The behavior is identical to a {@link Job} class annotated with
17+
* {@link DisallowConcurrentExecution}. Keep in mind that this annotation can be only used if
18+
* {@code quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread} is set to {@code true}.
19+
* <p>
20+
* Unlike with {@link Scheduled.ConcurrentExecution#SKIP} the {@link SkippedExecution} event is never fired if a method
21+
* execution is skipped by Quartz.
22+
*
23+
* @see DisallowConcurrentExecution
24+
*/
25+
@Target(METHOD)
26+
@Retention(RUNTIME)
27+
public @interface Nonconcurrent {
28+
29+
}

extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,18 @@ public interface QuartzScheduler extends Scheduler {
1313
*/
1414
org.quartz.Scheduler getScheduler();
1515

16+
@Override
17+
QuartzJobDefinition newJob(String identity);
18+
19+
interface QuartzJobDefinition extends JobDefinition<QuartzJobDefinition> {
20+
21+
/**
22+
*
23+
* @return self
24+
* @see Nonconcurrent
25+
*/
26+
QuartzJobDefinition setNonconcurrent();
27+
28+
}
29+
1630
}

extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.quarkus.quartz.runtime;
22

33
import java.util.Optional;
4+
import java.util.Set;
45
import java.util.function.Supplier;
56

67
import io.quarkus.runtime.annotations.Recorder;
@@ -9,11 +10,11 @@
910
public class QuartzRecorder {
1011

1112
public Supplier<QuartzSupport> quartzSupportSupplier(QuartzRuntimeConfig runtimeConfig,
12-
QuartzBuildTimeConfig buildTimeConfig, Optional<String> driverDialect) {
13+
QuartzBuildTimeConfig buildTimeConfig, Optional<String> driverDialect, Set<String> nonconcurrentMethods) {
1314
return new Supplier<QuartzSupport>() {
1415
@Override
1516
public QuartzSupport get() {
16-
return new QuartzSupport(runtimeConfig, buildTimeConfig, driverDialect);
17+
return new QuartzSupport(runtimeConfig, buildTimeConfig, driverDialect, nonconcurrentMethods);
1718
}
1819
};
1920
}

0 commit comments

Comments
 (0)