Skip to content

Commit b4b0f7b

Browse files
omercelikcengsobychacko
authored andcommitted
Replace synchronized blocks with ReentrantLocks for virtual thread support
Fixes: #3652 Replace synchronized methods and blocks with ReentrantLocks in a few classes in Spring Kafka to improve compatibility with virtual threads. This changes the synchronization mechanism in: - KafkaListenerAnnotationBeanPostProcessor - KafkaListenerEndpointRegistrar - KafkaAdmin - DefaultDestinationTopicResolver - JsonDeserializer/JsonSerializer The change helps avoid blocking virtual threads when using Spring Kafka in Project Loom environments while maintaining thread safety.
1 parent 71b571c commit b4b0f7b

File tree

6 files changed

+184
-86
lines changed

6 files changed

+184
-86
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.concurrent.ConcurrentHashMap;
3838
import java.util.concurrent.atomic.AtomicBoolean;
3939
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.concurrent.locks.Lock;
41+
import java.util.concurrent.locks.ReentrantLock;
4042
import java.util.function.BiFunction;
4143
import java.util.regex.Pattern;
4244
import java.util.stream.Stream;
@@ -144,6 +146,7 @@
144146
* @author Wang Zhiyang
145147
* @author Sanghyeok An
146148
* @author Soby Chacko
149+
* @author Omer Celik
147150
*
148151
* @see KafkaListener
149152
* @see KafkaListenerErrorHandler
@@ -207,6 +210,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
207210
@Nullable
208211
private RetryTopicConfigurer retryTopicConfigurer;
209212

213+
private final Lock globalLock = new ReentrantLock();
214+
210215
@Override
211216
public int getOrder() {
212217
return LOWEST_PRECEDENCE;
@@ -278,14 +283,20 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
278283
* {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
279284
* @param beanFactory the {@link BeanFactory} to be used.
280285
*/
281-
public synchronized void setBeanFactory(BeanFactory beanFactory) {
282-
this.beanFactory = beanFactory;
283-
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
284-
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
285-
if (beanExpressionResolver != null) {
286-
this.resolver = beanExpressionResolver;
286+
public void setBeanFactory(BeanFactory beanFactory) {
287+
try {
288+
this.globalLock.lock();
289+
this.beanFactory = beanFactory;
290+
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
291+
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
292+
if (beanExpressionResolver != null) {
293+
this.resolver = beanExpressionResolver;
294+
}
295+
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
287296
}
288-
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
297+
}
298+
finally {
299+
this.globalLock.unlock();
289300
}
290301
}
291302

@@ -451,36 +462,48 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
451462
}
452463
}
453464

454-
private synchronized void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
465+
private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
455466
List<Method> multiMethods, Class<?> clazz, Object bean, String beanName) {
456467

457-
List<Method> checkedMethods = new ArrayList<>();
458-
Method defaultMethod = null;
459-
for (Method method : multiMethods) {
460-
Method checked = checkProxy(method, bean);
461-
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
462-
if (annotation != null && annotation.isDefault()) {
463-
Method toAssert = defaultMethod;
464-
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
465-
+ toAssert.toString() + " and " + method);
466-
defaultMethod = checked;
468+
try {
469+
this.globalLock.lock();
470+
List<Method> checkedMethods = new ArrayList<>();
471+
Method defaultMethod = null;
472+
for (Method method : multiMethods) {
473+
Method checked = checkProxy(method, bean);
474+
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
475+
if (annotation != null && annotation.isDefault()) {
476+
Method toAssert = defaultMethod;
477+
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
478+
+ toAssert.toString() + " and " + method);
479+
defaultMethod = checked;
480+
}
481+
checkedMethods.add(checked);
482+
}
483+
for (KafkaListener classLevelListener : classLevelListeners) {
484+
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
485+
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
486+
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
467487
}
468-
checkedMethods.add(checked);
469488
}
470-
for (KafkaListener classLevelListener : classLevelListeners) {
471-
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
472-
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
473-
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
489+
finally {
490+
this.globalLock.unlock();
474491
}
475492
}
476493

477-
protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
494+
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
478495
String beanName) {
479496

480-
Method methodToUse = checkProxy(method, bean);
481-
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
482-
endpoint.setMethod(methodToUse);
483-
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
497+
try {
498+
this.globalLock.lock();
499+
Method methodToUse = checkProxy(method, bean);
500+
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
501+
endpoint.setMethod(methodToUse);
502+
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
503+
}
504+
finally {
505+
this.globalLock.unlock();
506+
}
484507
}
485508

486509
private void processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.Arrays;
2121
import java.util.Collections;
2222
import java.util.List;
23+
import java.util.concurrent.locks.Lock;
24+
import java.util.concurrent.locks.ReentrantLock;
2325

2426
import org.springframework.beans.factory.BeanFactory;
2527
import org.springframework.beans.factory.BeanFactoryAware;
@@ -40,6 +42,7 @@
4042
* @author Gary Russell
4143
* @author Filip Halemba
4244
* @author Wang Zhiyang
45+
* @author Omer Celik
4346
*
4447
* @see org.springframework.kafka.annotation.KafkaListenerConfigurer
4548
*/
@@ -49,6 +52,8 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial
4952

5053
private List<HandlerMethodArgumentResolver> customMethodArgumentResolvers = new ArrayList<>();
5154

55+
private final Lock endpointsLock = new ReentrantLock();
56+
5257
private KafkaListenerEndpointRegistry endpointRegistry;
5358

5459
private MessageHandlerMethodFactory messageHandlerMethodFactory;
@@ -188,7 +193,8 @@ public void afterPropertiesSet() {
188193
}
189194

190195
protected void registerAllEndpoints() {
191-
synchronized (this.endpointDescriptors) {
196+
try {
197+
this.endpointsLock.lock();
192198
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
193199
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> mmkle
194200
&& this.validator != null) {
@@ -199,6 +205,9 @@ protected void registerAllEndpoints() {
199205
}
200206
this.startImmediately = true; // trigger immediate startup
201207
}
208+
finally {
209+
this.endpointsLock.unlock();
210+
}
202211
}
203212

204213
private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpointDescriptor descriptor) {
@@ -234,7 +243,8 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList
234243
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
235244
// Factory may be null, we defer the resolution right before actually creating the container
236245
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
237-
synchronized (this.endpointDescriptors) {
246+
try {
247+
this.endpointsLock.lock();
238248
if (this.startImmediately) { // Register and start immediately
239249
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
240250
resolveContainerFactory(descriptor), true);
@@ -243,6 +253,9 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaList
243253
this.endpointDescriptors.add(descriptor);
244254
}
245255
}
256+
finally {
257+
this.endpointsLock.unlock();
258+
}
246259
}
247260

248261
/**

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.TimeoutException;
3434
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.locks.Lock;
36+
import java.util.concurrent.locks.ReentrantLock;
3537
import java.util.function.Predicate;
3638
import java.util.stream.Collectors;
3739

3840
import org.apache.commons.logging.LogFactory;
41+
import org.apache.kafka.clients.admin.Admin;
3942
import org.apache.kafka.clients.admin.AdminClient;
4043
import org.apache.kafka.clients.admin.AdminClientConfig;
4144
import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -76,6 +79,8 @@
7679
* @author Adrian Gygax
7780
* @author Sanghyeok An
7881
* @author Valentina Armenise
82+
* @author Anders Swanson
83+
* @author Omer Celik
7984
*
8085
* @since 1.3
8186
*/
@@ -93,6 +98,8 @@ public class KafkaAdmin extends KafkaResourceFactory
9398

9499
private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();
95100

101+
private final Lock clusterIdLock = new ReentrantLock();
102+
96103
private final Map<String, Object> configs;
97104

98105
private ApplicationContext applicationContext;
@@ -265,12 +272,7 @@ public final boolean initialize() {
265272
}
266273
if (adminClient != null) {
267274
try {
268-
synchronized (this) {
269-
if (this.clusterId != null) {
270-
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
271-
TimeUnit.SECONDS);
272-
}
273-
}
275+
updateClusterId(adminClient);
274276
addOrModifyTopicsIfNeeded(adminClient, newTopics);
275277
return true;
276278
}
@@ -295,6 +297,19 @@ public final boolean initialize() {
295297
return false;
296298
}
297299

300+
private void updateClusterId(Admin adminClient) throws InterruptedException, ExecutionException, TimeoutException {
301+
try {
302+
this.clusterIdLock.lock();
303+
if (this.clusterId != null) {
304+
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
305+
TimeUnit.SECONDS);
306+
}
307+
}
308+
finally {
309+
this.clusterIdLock.unlock();
310+
}
311+
}
312+
298313
/**
299314
* Return a collection of {@link NewTopic}s to create or modify. The default
300315
* implementation retrieves all {@link NewTopic} beans in the application context and

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Map;
2525
import java.util.Objects;
2626
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.locks.Lock;
28+
import java.util.concurrent.locks.ReentrantLock;
2729
import java.util.stream.Collectors;
2830
import java.util.stream.IntStream;
2931

@@ -49,6 +51,7 @@
4951
* @author Gary Russell
5052
* @author Yvette Quinby
5153
* @author Adrian Chlebosz
54+
* @author Omer Celik
5255
* @since 2.7
5356
*
5457
*/
@@ -62,6 +65,8 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier
6265

6366
private final Map<String, Map<String, DestinationTopicHolder>> sourceDestinationsHolderMap;
6467

68+
private final Lock sourceDestinationsHolderLock = new ReentrantLock();
69+
6570
private final Clock clock;
6671

6772
private ApplicationContext applicationContext;
@@ -210,9 +215,13 @@ private DestinationTopicHolder getDestinationHolderFor(String mainListenerId, St
210215
}
211216

212217
private DestinationTopicHolder getDestinationTopicSynchronized(String mainListenerId, String topic) {
213-
synchronized (this.sourceDestinationsHolderMap) {
218+
try {
219+
this.sourceDestinationsHolderLock.lock();
214220
return doGetDestinationFor(mainListenerId, topic);
215221
}
222+
finally {
223+
this.sourceDestinationsHolderLock.unlock();
224+
}
216225
}
217226

218227
private DestinationTopicHolder doGetDestinationFor(String mainListenerId, String topic) {
@@ -229,11 +238,15 @@ public void addDestinationTopics(String mainListenerId, List<DestinationTopic> d
229238
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed.");
230239
}
231240
validateDestinations(destinationsToAdd);
232-
synchronized (this.sourceDestinationsHolderMap) {
241+
try {
242+
this.sourceDestinationsHolderLock.lock();
233243
Map<String, DestinationTopicHolder> map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId,
234244
id -> new HashMap<>());
235245
map.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd));
236246
}
247+
finally {
248+
this.sourceDestinationsHolderLock.unlock();
249+
}
237250
}
238251

239252
private void validateDestinations(List<DestinationTopic> destinationsToAdd) {

0 commit comments

Comments
 (0)