Skip to content

Commit 7aadccb

Browse files
garyrussellartembilan
authored andcommitted
GH-1919: Fix TopicPartitionOffset Matching
Resolves #1919 When looking for a retry configuration instance, matches only worked when `topics` is provided, not `topicPartition`. If there is no content in the `topics` property, pull the topic names from `topicPartition`s (if any).
1 parent e974323 commit 7aadccb

File tree

3 files changed

+156
-8
lines changed

3 files changed

+156
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -466,14 +466,13 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
466466
TopicPartitionOffset[] tps) {
467467

468468
String[] retryableCandidates = topics;
469-
// TODO - support retryable with manual assignment https://github.com/spring-projects/spring-kafka/issues/1919
470-
// if (retryableCandidates.length == 0 && tps.length > 0) {
471-
// retryableCandidates = Arrays.stream(tps)
472-
// .map(tp -> tp.getTopic())
473-
// .distinct()
474-
// .collect(Collectors.toList())
475-
// .toArray(new String[0]);
476-
// }
469+
if (retryableCandidates.length == 0 && tps.length > 0) {
470+
retryableCandidates = Arrays.stream(tps)
471+
.map(tp -> tp.getTopic())
472+
.distinct()
473+
.collect(Collectors.toList())
474+
.toArray(new String[0]);
475+
}
477476

478477
RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider(this.beanFactory,
479478
this.resolver, this.expressionContext)

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint)
9595
if (topicPartitionsToAssign != null && topicPartitionsToAssign.length > 0) {
9696
topics = Arrays.stream(topicPartitionsToAssign)
9797
.map(TopicPartitionOffset::getTopic)
98+
.distinct()
9899
.collect(Collectors.toList());
99100
}
100101
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.apache.kafka.clients.admin.AdminClientConfig;
27+
import org.apache.kafka.clients.consumer.Consumer;
28+
import org.apache.kafka.clients.consumer.ConsumerConfig;
29+
import org.apache.kafka.common.PartitionInfo;
30+
import org.junit.jupiter.api.Test;
31+
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.kafka.annotation.EnableKafka;
36+
import org.springframework.kafka.annotation.KafkaListener;
37+
import org.springframework.kafka.annotation.TopicPartition;
38+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
40+
import org.springframework.kafka.core.ConsumerFactory;
41+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
42+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
43+
import org.springframework.kafka.core.KafkaAdmin;
44+
import org.springframework.kafka.core.KafkaTemplate;
45+
import org.springframework.kafka.core.ProducerFactory;
46+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
47+
import org.springframework.kafka.test.context.EmbeddedKafka;
48+
import org.springframework.kafka.test.utils.KafkaTestUtils;
49+
import org.springframework.test.annotation.DirtiesContext;
50+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
51+
52+
/**
53+
* @author Gary Russell
54+
* @since 2.7.7
55+
*
56+
*/
57+
@SpringJUnitConfig
58+
@DirtiesContext
59+
@EmbeddedKafka(topics = { RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC1,
60+
RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC2 }, partitions = 1)
61+
class RetryTopicConfigurationManualAssignmentIntegrationTests {
62+
63+
public static final String TOPIC1 = "RetryTopicConfigurationManualAssignmentIntegrationTests.1";
64+
65+
public static final String TOPIC2 = "RetryTopicConfigurationManualAssignmentIntegrationTests.2";
66+
67+
@Test
68+
void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory<Integer, String> cf,
69+
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config) throws InterruptedException {
70+
71+
Consumer<Integer, String> consumer = cf.createConsumer("grp2", "");
72+
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
73+
assertThat(topics.keySet()).contains("RetryTopicConfigurationManualAssignmentIntegrationTests.1",
74+
"RetryTopicConfigurationManualAssignmentIntegrationTests.1-dlt",
75+
"RetryTopicConfigurationManualAssignmentIntegrationTests.1-retry-100",
76+
"RetryTopicConfigurationManualAssignmentIntegrationTests.1-retry-110",
77+
"RetryTopicConfigurationManualAssignmentIntegrationTests.2",
78+
"RetryTopicConfigurationManualAssignmentIntegrationTests.2-dlt",
79+
"RetryTopicConfigurationManualAssignmentIntegrationTests.2-retry-100",
80+
"RetryTopicConfigurationManualAssignmentIntegrationTests.2-retry-110");
81+
template.send(TOPIC1, "foo");
82+
assertThat(config.latch.await(120, TimeUnit.SECONDS)).isTrue();
83+
}
84+
85+
@Configuration(proxyBeanMethods = false)
86+
@EnableKafka
87+
static class Config {
88+
89+
private final CountDownLatch latch = new CountDownLatch(1);
90+
91+
@KafkaListener(id = TOPIC1, topicPartitions = {
92+
@TopicPartition(topic = TOPIC1, partitions = "0"),
93+
@TopicPartition(topic = TOPIC1, partitions = "1"),
94+
@TopicPartition(topic = TOPIC2, partitions = "0") })
95+
void listen1(String in) {
96+
throw new RuntimeException("test");
97+
}
98+
99+
void dlt(String in) {
100+
this.latch.countDown();
101+
}
102+
103+
@Bean
104+
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaTemplate<Integer, String> template,
105+
ConsumerFactory<Integer, String> consumerFactory) {
106+
107+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
108+
new ConcurrentKafkaListenerContainerFactory<>();
109+
factory.setConsumerFactory(consumerFactory);
110+
factory.setReplyTemplate(template);
111+
return factory;
112+
}
113+
114+
@Bean
115+
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
116+
Map<String, Object> props = KafkaTestUtils.consumerProps("retryConfig", "false", embeddedKafka);
117+
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000);
118+
return new DefaultKafkaConsumerFactory<>(
119+
props);
120+
}
121+
122+
@Bean
123+
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> producerFactory) {
124+
return new KafkaTemplate<>(producerFactory);
125+
}
126+
127+
@Bean
128+
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
129+
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
130+
}
131+
132+
@Bean
133+
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
134+
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
135+
}
136+
137+
@Bean
138+
RetryTopicConfiguration retryTopicConfiguration1(KafkaTemplate<Integer, String> template) {
139+
return RetryTopicConfigurationBuilder.newInstance()
140+
.includeTopics(List.of(TOPIC1, TOPIC2))
141+
.exponentialBackoff(100, 1.1, 110)
142+
.dltHandlerMethod(getClass(), "dlt")
143+
.create(template);
144+
}
145+
146+
}
147+
148+
}

0 commit comments

Comments
 (0)