Skip to content

Commit 638ee01

Browse files
Get rid of proxy in DefaultKafkaConsumerFactory (#2822)
* Get rid of proxy in DefaultKafkaConsumerFactory The proxy in the `DefaultKafkaConsumerFactory` for `KafkaConsumer` is created only to intercept `close()` call to remove an instance from the `listeners`. There is no need in such a proxy since simple `KafkaConsumer` class extension can handle that scenario. The reason behind this change is to avoid a `Serializable` (`java.lang.reflect.Proxy`) header in the produced message for the `KafkaConsumer` where we still fail to serialize it because other properties of the proxy are not `Serializable` * Introduce `DefaultKafkaConsumerFactory.ExtendedKafkaConsumer` to handle `listeners` interaction * The `createRawConsumer()` might be considered as breaking change since now end-user must extend this `ExtendedKafkaConsumer` to be able to handle `listeners` same way as before * Adjust `DefaultKafkaConsumerFactoryTests.listener()` test for the current code state * * Log warn for ignored `listeners` and not an `ExtendedKafkaConsumer` * Log polishing. --------- Co-authored-by: Gary Russell <grussell@vmware.com>
1 parent e95f041 commit 638ee01

File tree

2 files changed

+56
-73
lines changed

2 files changed

+56
-73
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.Collections;
2122
import java.util.Enumeration;
@@ -28,19 +29,13 @@
2829
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.function.Supplier;
3031

31-
import org.aopalliance.aop.Advice;
32-
import org.aopalliance.intercept.MethodInterceptor;
33-
import org.aopalliance.intercept.MethodInvocation;
3432
import org.apache.commons.logging.LogFactory;
3533
import org.apache.kafka.clients.consumer.Consumer;
3634
import org.apache.kafka.clients.consumer.ConsumerConfig;
3735
import org.apache.kafka.clients.consumer.KafkaConsumer;
38-
import org.apache.kafka.common.Metric;
3936
import org.apache.kafka.common.MetricName;
4037
import org.apache.kafka.common.serialization.Deserializer;
4138

42-
import org.springframework.aop.framework.ProxyFactory;
43-
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
4439
import org.springframework.beans.factory.BeanNameAware;
4540
import org.springframework.core.log.LogAccessor;
4641
import org.springframework.lang.Nullable;
@@ -445,68 +440,73 @@ private void checkInaccessible(Properties properties, Map<String, Object> modifi
445440
}
446441
}
447442

448-
@SuppressWarnings("resource")
449443
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
450444
checkBootstrap(configProps);
451445
Consumer<K, V> kafkaConsumer = createRawConsumer(configProps);
452-
453-
if (this.listeners.size() > 0) {
454-
Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();
455-
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
456-
String clientId;
457-
if (metricIterator.hasNext()) {
458-
clientId = metricIterator.next().tags().get("client-id");
459-
}
460-
else {
461-
clientId = "unknown";
462-
}
463-
String id = this.beanName + "." + clientId;
464-
kafkaConsumer = createProxy(kafkaConsumer, id);
465-
for (Listener<K, V> listener : this.listeners) {
466-
listener.consumerAdded(id, kafkaConsumer);
467-
}
446+
if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) {
447+
LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " +
448+
"because the consumer is not an instance of 'ExtendedKafkaConsumer'." +
449+
"Consider extending 'ExtendedKafkaConsumer' or implement your own 'ConsumerFactory'.");
468450
}
451+
469452
for (ConsumerPostProcessor<K, V> pp : this.postProcessors) {
470453
kafkaConsumer = pp.apply(kafkaConsumer);
471454
}
472455
return kafkaConsumer;
473456
}
474457

475458
/**
476-
* Create a Consumer.
459+
* Create a {@link Consumer}.
460+
* By default, this method returns an internal {@link ExtendedKafkaConsumer}
461+
* which is aware of provided into this {@link #listeners}, therefore it is recommended
462+
* to extend that class if {@link #listeners} are still involved for a custom {@link Consumer}.
477463
* @param configProps the configuration properties.
478464
* @return the consumer.
479465
* @since 2.5
480466
*/
481467
protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
482-
return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
483-
this.valueDeserializerSupplier.get());
468+
return new ExtendedKafkaConsumer(configProps);
484469
}
485470

486-
@SuppressWarnings("unchecked")
487-
private Consumer<K, V> createProxy(Consumer<K, V> kafkaConsumer, String id) {
488-
ProxyFactory pf = new ProxyFactory(kafkaConsumer);
489-
Advice advice = new MethodInterceptor() {
471+
@Override
472+
public boolean isAutoCommit() {
473+
Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
474+
return auto instanceof Boolean
475+
? (Boolean) auto
476+
: !(auto instanceof String) || Boolean.parseBoolean((String) auto);
477+
}
478+
479+
protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {
480+
481+
private String idForListeners;
490482

491-
@Override
492-
public Object invoke(MethodInvocation invocation) throws Throwable {
493-
DefaultKafkaConsumerFactory.this.listeners.forEach(listener ->
494-
listener.consumerRemoved(id, kafkaConsumer));
495-
return invocation.proceed();
483+
protected ExtendedKafkaConsumer(Map<String, Object> configProps) {
484+
super(configProps,
485+
DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(),
486+
DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get());
487+
488+
if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) {
489+
Iterator<MetricName> metricIterator = metrics().keySet().iterator();
490+
String clientId = "unknown";
491+
if (metricIterator.hasNext()) {
492+
clientId = metricIterator.next().tags().get("client-id");
493+
}
494+
this.idForListeners = DefaultKafkaConsumerFactory.this.beanName + "." + clientId;
495+
for (Listener<K, V> listener : DefaultKafkaConsumerFactory.this.listeners) {
496+
listener.consumerAdded(this.idForListeners, this);
497+
}
496498
}
499+
}
497500

498-
};
499-
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
500-
advisor.addMethodName("close");
501-
pf.addAdvisor(advisor);
502-
return (Consumer<K, V>) pf.getProxy();
503-
}
501+
@Override
502+
public void close(Duration timeout) {
503+
super.close(timeout);
504+
505+
for (Listener<K, V> listener : DefaultKafkaConsumerFactory.this.listeners) {
506+
listener.consumerRemoved(this.idForListeners, this);
507+
}
508+
}
504509

505-
@Override
506-
public boolean isAutoCommit() {
507-
Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
508-
return auto instanceof Boolean ? (Boolean) auto
509-
: auto instanceof String ? Boolean.valueOf((String) auto) : true;
510510
}
511511

512512
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.mockito.BDDMockito.given;
2120
import static org.mockito.Mockito.mock;
2221
import static org.mockito.Mockito.verify;
2322

@@ -38,14 +37,11 @@
3837
import org.apache.kafka.clients.consumer.Consumer;
3938
import org.apache.kafka.clients.consumer.ConsumerConfig;
4039
import org.apache.kafka.clients.consumer.KafkaConsumer;
41-
import org.apache.kafka.common.Metric;
42-
import org.apache.kafka.common.MetricName;
4340
import org.apache.kafka.common.serialization.Deserializer;
4441
import org.apache.kafka.common.serialization.StringDeserializer;
4542
import org.junit.jupiter.api.Test;
4643

4744
import org.springframework.aop.framework.ProxyFactory;
48-
import org.springframework.aop.support.AopUtils;
4945
import org.springframework.beans.factory.annotation.Autowired;
5046
import org.springframework.context.annotation.Configuration;
5147
import org.springframework.kafka.core.ConsumerFactory.Listener;
@@ -63,6 +59,8 @@
6359
/**
6460
* @author Gary Russell
6561
* @author Chris Gilbert
62+
* @author Artem Bilan
63+
*
6664
* @since 1.0.6
6765
*/
6866
@EmbeddedKafka(topics = { "txCache1", "txCache2", "txCacheSendFromListener" },
@@ -345,7 +343,7 @@ public void testNestedTxProducerIsCached() throws Exception {
345343
ProxyFactory prox = new ProxyFactory();
346344
prox.setTarget(consumer);
347345
@SuppressWarnings("unchecked")
348-
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
346+
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
349347
wrapped.set(proxy);
350348
return proxy;
351349
});
@@ -381,25 +379,12 @@ public void testNestedTxProducerIsCached() throws Exception {
381379
@SuppressWarnings({ "rawtypes", "unchecked" })
382380
@Test
383381
void listener() {
384-
Consumer consumer = mock(Consumer.class);
385-
Map<MetricName, ? extends Metric> metrics = new HashMap<>();
386-
metrics.put(new MetricName("test", "group", "desc", Collections.singletonMap("client-id", "foo-0")), null);
387-
given(consumer.metrics()).willReturn(metrics);
388-
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(Collections.emptyMap()) {
389-
390-
@Override
391-
protected Consumer createRawConsumer(Map configProps) {
392-
return consumer;
393-
}
394-
395-
};
382+
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
383+
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0");
384+
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig);
396385
List<String> adds = new ArrayList<>();
397386
List<String> removals = new ArrayList<>();
398387

399-
Consumer consum = cf.createConsumer();
400-
assertThat(AopUtils.isAopProxy(consum)).isFalse();
401-
assertThat(adds).hasSize(0);
402-
403388
cf.addListener(new Listener() {
404389

405390
@Override
@@ -415,13 +400,11 @@ public void consumerRemoved(String id, Consumer consumer) {
415400
});
416401
cf.setBeanName("cf");
417402

418-
consum = cf.createConsumer();
419-
assertThat(AopUtils.isAopProxy(consum)).isTrue();
420-
assertThat(AopUtils.isJdkDynamicProxy(consum)).isTrue();
403+
Consumer consumer = cf.createConsumer();
421404
assertThat(adds).hasSize(1);
422405
assertThat(adds.get(0)).isEqualTo("cf.foo-0");
423406
assertThat(removals).hasSize(0);
424-
consum.close();
407+
consumer.close();
425408
assertThat(removals).hasSize(1);
426409
}
427410

0 commit comments

Comments
 (0)