Skip to content

Commit ddee2e0

Browse files
authored
Merge pull request quarkusio#44224 from mkouba/issue-44048
Quartz: introduce Nonconcurrent
2 parents b3d5937 + 2ffe012 commit ddee2e0

File tree

16 files changed

+499
-81
lines changed

16 files changed

+499
-81
lines changed

extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.sql.Connection;
66
import java.util.ArrayList;
77
import java.util.HashMap;
8+
import java.util.HashSet;
89
import java.util.List;
910
import java.util.Map;
1011
import java.util.Optional;
@@ -55,6 +56,7 @@
5556
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
5657
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
5758
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
59+
import io.quarkus.quartz.Nonconcurrent;
5860
import io.quarkus.quartz.runtime.QuarkusQuartzConnectionPoolProvider;
5961
import io.quarkus.quartz.runtime.QuartzBuildTimeConfig;
6062
import io.quarkus.quartz.runtime.QuartzExtensionPointConfig;
@@ -69,6 +71,7 @@
6971
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
7072
import io.quarkus.runtime.configuration.ConfigurationException;
7173
import io.quarkus.scheduler.Scheduled;
74+
import io.quarkus.scheduler.deployment.ScheduledBusinessMethodItem;
7275
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;
7376

7477
public class QuartzProcessor {
@@ -79,6 +82,7 @@ public class QuartzProcessor {
7982
private static final DotName DELEGATE_HSQLDB = DotName.createSimple(QuarkusHSQLDBDelegate.class.getName());
8083
private static final DotName DELEGATE_MSSQL = DotName.createSimple(QuarkusMSSQLDelegate.class.getName());
8184
private static final DotName DELEGATE_STDJDBC = DotName.createSimple(QuarkusStdJDBCDelegate.class.getName());
85+
private static final DotName NONCONCURRENT = DotName.createSimple(Nonconcurrent.class);
8286

8387
@BuildStep
8488
FeatureBuildItem feature() {
@@ -313,12 +317,23 @@ public void start(BuildProducer<ServiceStartBuildItem> serviceStart,
313317
@Record(RUNTIME_INIT)
314318
public void quartzSupportBean(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig,
315319
QuartzRecorder recorder,
316-
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer,
317-
QuartzJDBCDriverDialectBuildItem driverDialect) {
320+
QuartzJDBCDriverDialectBuildItem driverDialect,
321+
List<ScheduledBusinessMethodItem> scheduledMethods,
322+
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer) {
323+
324+
Set<String> nonconcurrentMethods = new HashSet<>();
325+
for (ScheduledBusinessMethodItem m : scheduledMethods) {
326+
if (m.getMethod().hasAnnotation(NONCONCURRENT)) {
327+
nonconcurrentMethods.add(m.getMethod().declaringClass().name() + "#" + m.getMethod().name());
328+
}
329+
}
318330

319331
syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem.configure(QuartzSupport.class)
320332
.scope(Singleton.class) // this should be @ApplicationScoped but it fails for some reason
321333
.setRuntimeInit()
322-
.supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver())).done());
334+
.supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver(),
335+
nonconcurrentMethods))
336+
.done());
323337
}
338+
324339
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import jakarta.inject.Inject;
10+
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.RegisterExtension;
13+
14+
import io.quarkus.quartz.QuartzScheduler;
15+
import io.quarkus.scheduler.Scheduled;
16+
import io.quarkus.test.QuarkusUnitTest;
17+
18+
public class NonconcurrentJobDefinitionTest {
19+
20+
@RegisterExtension
21+
static final QuarkusUnitTest test = new QuarkusUnitTest()
22+
.withApplicationRoot(root -> root.addClasses(Jobs.class))
23+
.overrideConfigKey("quarkus.scheduler.start-mode", "forced");
24+
25+
@Inject
26+
QuartzScheduler scheduler;
27+
28+
@Test
29+
public void testExecution() throws InterruptedException {
30+
scheduler.newJob("foo")
31+
.setTask(se -> {
32+
Jobs.NONCONCURRENT_COUNTER.incrementAndGet();
33+
try {
34+
if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
35+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
36+
}
37+
} catch (InterruptedException e) {
38+
throw new IllegalStateException(e);
39+
}
40+
if (Jobs.NONCONCURRENT_COUNTER.get() == 1) {
41+
// concurrent() executed >= 5x and nonconcurrent() 1x
42+
Jobs.NONCONCURRENT_LATCH.countDown();
43+
}
44+
})
45+
.setInterval("1s")
46+
.setNonconcurrent()
47+
.schedule();
48+
49+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
50+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
51+
}
52+
53+
static class Jobs {
54+
55+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
56+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
57+
58+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
59+
60+
@Scheduled(identity = "bar", every = "1s")
61+
void concurrent() throws InterruptedException {
62+
CONCURRENT_LATCH.countDown();
63+
}
64+
65+
}
66+
67+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.RegisterExtension;
11+
12+
import io.quarkus.quartz.Nonconcurrent;
13+
import io.quarkus.scheduler.Scheduled;
14+
import io.quarkus.test.QuarkusUnitTest;
15+
16+
public class NonconcurrentOnQuartzThreadTest {
17+
18+
@RegisterExtension
19+
static final QuarkusUnitTest test = new QuarkusUnitTest()
20+
.withApplicationRoot(root -> root.addClasses(Jobs.class))
21+
.overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread",
22+
"true");
23+
24+
@Test
25+
public void testExecution() throws InterruptedException {
26+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
27+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
28+
}
29+
30+
static class Jobs {
31+
32+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
33+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
34+
35+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
36+
37+
@Nonconcurrent
38+
@Scheduled(identity = "foo", every = "1s")
39+
void nonconcurrent() throws InterruptedException {
40+
NONCONCURRENT_COUNTER.incrementAndGet();
41+
if (!CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
42+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
43+
}
44+
if (NONCONCURRENT_COUNTER.get() == 1) {
45+
// concurrent() executed >= 5x and nonconcurrent() 1x
46+
NONCONCURRENT_LATCH.countDown();
47+
}
48+
}
49+
50+
@Scheduled(identity = "bar", every = "1s")
51+
void concurrent() throws InterruptedException {
52+
CONCURRENT_LATCH.countDown();
53+
}
54+
55+
}
56+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import jakarta.inject.Inject;
10+
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.extension.RegisterExtension;
13+
import org.quartz.DisallowConcurrentExecution;
14+
import org.quartz.Job;
15+
import org.quartz.JobBuilder;
16+
import org.quartz.JobDetail;
17+
import org.quartz.JobExecutionContext;
18+
import org.quartz.JobExecutionException;
19+
import org.quartz.SchedulerException;
20+
import org.quartz.SimpleScheduleBuilder;
21+
import org.quartz.Trigger;
22+
import org.quartz.TriggerBuilder;
23+
24+
import io.quarkus.quartz.QuartzScheduler;
25+
import io.quarkus.scheduler.Scheduled;
26+
import io.quarkus.scheduler.Scheduler;
27+
import io.quarkus.test.QuarkusUnitTest;
28+
29+
public class NonconcurrentProgrammaticTest {
30+
31+
@RegisterExtension
32+
static final QuarkusUnitTest test = new QuarkusUnitTest()
33+
.withApplicationRoot(root -> root
34+
.addClasses(Jobs.class))
35+
.overrideConfigKey("quarkus.scheduler.start-mode", "halted");
36+
37+
@Inject
38+
QuartzScheduler scheduler;
39+
40+
@Test
41+
public void testExecution() throws SchedulerException, InterruptedException {
42+
JobDetail job = JobBuilder.newJob(Jobs.class)
43+
.withIdentity("foo", Scheduler.class.getName())
44+
.build();
45+
Trigger trigger = TriggerBuilder.newTrigger()
46+
.withIdentity("foo", Scheduler.class.getName())
47+
.startNow()
48+
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
49+
.withIntervalInSeconds(1)
50+
.repeatForever())
51+
.build();
52+
scheduler.getScheduler().scheduleJob(job, trigger);
53+
54+
scheduler.resume();
55+
56+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
57+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
58+
}
59+
60+
@DisallowConcurrentExecution
61+
static class Jobs implements Job {
62+
63+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
64+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
65+
66+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
67+
68+
@Scheduled(identity = "bar", every = "1s")
69+
void concurrent() throws InterruptedException {
70+
CONCURRENT_LATCH.countDown();
71+
}
72+
73+
@Override
74+
public void execute(JobExecutionContext context) throws JobExecutionException {
75+
Jobs.NONCONCURRENT_COUNTER.incrementAndGet();
76+
try {
77+
if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
78+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
79+
}
80+
} catch (InterruptedException e) {
81+
throw new IllegalStateException(e);
82+
}
83+
if (Jobs.NONCONCURRENT_COUNTER.get() == 1) {
84+
// concurrent() executed >= 5x and nonconcurrent() 1x
85+
Jobs.NONCONCURRENT_LATCH.countDown();
86+
}
87+
}
88+
89+
}
90+
91+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.quarkus.quartz.test;
2+
3+
import static org.junit.jupiter.api.Assertions.assertTrue;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.RegisterExtension;
11+
12+
import io.quarkus.quartz.Nonconcurrent;
13+
import io.quarkus.scheduler.Scheduled;
14+
import io.quarkus.test.QuarkusUnitTest;
15+
16+
public class NonconcurrentTest {
17+
18+
@RegisterExtension
19+
static final QuarkusUnitTest test = new QuarkusUnitTest()
20+
.withApplicationRoot(root -> root.addClasses(Jobs.class));
21+
22+
@Test
23+
public void testExecution() throws InterruptedException {
24+
assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS),
25+
String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get()));
26+
}
27+
28+
static class Jobs {
29+
30+
static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1);
31+
static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5);
32+
33+
static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0);
34+
35+
@Nonconcurrent
36+
@Scheduled(identity = "foo", every = "1s")
37+
void nonconcurrent() throws InterruptedException {
38+
NONCONCURRENT_COUNTER.incrementAndGet();
39+
if (!CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) {
40+
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
41+
}
42+
if (NONCONCURRENT_COUNTER.get() == 1) {
43+
// concurrent() executed >= 5x and nonconcurrent() 1x
44+
NONCONCURRENT_LATCH.countDown();
45+
}
46+
}
47+
48+
@Scheduled(identity = "bar", every = "1s")
49+
void concurrent() throws InterruptedException {
50+
CONCURRENT_LATCH.countDown();
51+
}
52+
53+
}
54+
}

extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testJobs() throws InterruptedException {
6969
.setSkipPredicate(AlwaysSkipPredicate.class)
7070
.schedule();
7171

72-
Scheduler.JobDefinition job1 = scheduler.newJob("foo")
72+
Scheduler.JobDefinition<?> job1 = scheduler.newJob("foo")
7373
.setInterval("1s")
7474
.setTask(ec -> {
7575
assertTrue(Arc.container().requestContext().isActive());
@@ -79,7 +79,7 @@ public void testJobs() throws InterruptedException {
7979
assertEquals("Sync task was already set",
8080
assertThrows(IllegalStateException.class, () -> job1.setAsyncTask(ec -> null)).getMessage());
8181

82-
Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?");
82+
Scheduler.JobDefinition<?> job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?");
8383
assertEquals("Either sync or async task must be set",
8484
assertThrows(IllegalStateException.class, () -> job2.schedule()).getMessage());
8585
job2.setTask(ec -> {
@@ -117,7 +117,7 @@ public void testJobs() throws InterruptedException {
117117
@Test
118118
public void testAsyncJob() throws InterruptedException, SchedulerException {
119119
String identity = "fooAsync";
120-
JobDefinition asyncJob = scheduler.newJob(identity)
120+
JobDefinition<?> asyncJob = scheduler.newJob(identity)
121121
.setInterval("1s")
122122
.setAsyncTask(ec -> {
123123
assertTrue(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext());
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.quarkus.quartz;
2+
3+
import static java.lang.annotation.ElementType.METHOD;
4+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
5+
6+
import java.lang.annotation.Retention;
7+
import java.lang.annotation.Target;
8+
9+
import org.quartz.DisallowConcurrentExecution;
10+
import org.quartz.Job;
11+
12+
import io.quarkus.scheduler.Scheduled;
13+
import io.quarkus.scheduler.SkippedExecution;
14+
15+
/**
16+
* A scheduled method annotated with this annotation may not be executed concurrently. The behavior is identical to a
17+
* {@link Job} class annotated with {@link DisallowConcurrentExecution}.
18+
* <p>
19+
* If {@code quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread} is set to
20+
* {@code false} the execution of a scheduled method is offloaded to a specific Quarkus thread pool but the triggering Quartz
21+
* thread is blocked until the execution is finished. Therefore, make sure the Quartz thread pool is configured appropriately.
22+
* <p>
23+
* If {@code quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread} is set to {@code true} the scheduled method is
24+
* invoked on a thread managed by Quartz.
25+
* <p>
26+
* Unlike with {@link Scheduled.ConcurrentExecution#SKIP} the {@link SkippedExecution} event is never fired if a method
27+
* execution is skipped by Quartz.
28+
*
29+
* @see DisallowConcurrentExecution
30+
*/
31+
@Target(METHOD)
32+
@Retention(RUNTIME)
33+
public @interface Nonconcurrent {
34+
35+
}

0 commit comments

Comments
 (0)