From 001d78b95bab63e897338e573f1770273624bfba Mon Sep 17 00:00:00 2001 From: Adrian Chlebosz Date: Tue, 5 Dec 2023 00:38:53 +0100 Subject: [PATCH] GH-2800: create topics used for exception based DLT routing --- .../kafka/annotation/RetryableTopic.java | 8 +++ .../kafka/retrytopic/DestinationTopic.java | 13 ++-- .../DestinationTopicPropertiesFactory.java | 55 ++++++++++++---- .../ExceptionBasedDestinationDlt.java | 6 ++ .../retrytopic/ExceptionBasedDltRouting.java | 5 ++ .../RetryTopicConfigurationBuilder.java | 13 +++- ...estinationTopicPropertiesFactoryTests.java | 65 +++++++++++++------ .../retrytopic/DestinationTopicTests.java | 8 +-- 8 files changed, 131 insertions(+), 42 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index 31ed10f824..d937b202e2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -22,12 +22,18 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.kafka.retrytopic.ExceptionBasedDltRouting; import org.springframework.kafka.retrytopic.DltStrategy; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; import org.springframework.retry.annotation.Backoff; + +// TODO +// 2. inject exception detection when sending to DLT (consider traversing causes) +// 3. route the message to the configured additional destination or to the default DLT + /** * * Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated @@ -171,6 +177,8 @@ */ String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX; + ExceptionBasedDltRouting additionalExceptionBasedDltRouting() default @ExceptionBasedDltRouting; + /** * Whether the retry topics will be suffixed with the delay value for that topic or a * simple index. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index c6895be55a..711da8195d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -16,7 +16,9 @@ package org.springframework.kafka.retrytopic; +import java.util.Collections; import java.util.Objects; +import java.util.Set; import java.util.function.BiPredicate; import org.springframework.kafka.core.KafkaOperations; @@ -137,9 +139,10 @@ public static class Properties { private final long timeout; + private final Set> usedForExceptions; + @Nullable private final Boolean autoStartDltHandler; - /** * Create an instance with the provided properties with the DLT container starting * automatically (if the container factory is so configured). @@ -160,7 +163,7 @@ public Properties(long delayMs, String suffix, Type type, BiPredicate shouldRetryOn, long timeout) { this(delayMs, suffix, type, maxAttempts, numPartitions, dltStrategy, kafkaOperations, shouldRetryOn, - timeout, null); + timeout, null, Collections.emptySet()); } /** @@ -173,7 +176,7 @@ public Properties(long delayMs, String suffix, Type type, public Properties(Properties sourceProperties, String suffix, Type type) { this(sourceProperties.delayMs, suffix, type, sourceProperties.maxAttempts, sourceProperties.numPartitions, sourceProperties.dltStrategy, sourceProperties.kafkaOperations, sourceProperties.shouldRetryOn, - sourceProperties.timeout, null); + sourceProperties.timeout, null, Collections.emptySet()); } /** @@ -194,7 +197,8 @@ public Properties(long delayMs, String suffix, Type type, int maxAttempts, int numPartitions, DltStrategy dltStrategy, KafkaOperations kafkaOperations, - BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler) { + BiPredicate shouldRetryOn, long timeout, @Nullable Boolean autoStartDltHandler, + Set> usedForExceptions) { this.delayMs = delayMs; this.suffix = suffix; @@ -206,6 +210,7 @@ public Properties(long delayMs, String suffix, Type type, this.shouldRetryOn = shouldRetryOn; this.timeout = timeout; this.autoStartDltHandler = autoStartDltHandler; + this.usedForExceptions = usedForExceptions; } public boolean isDltTopic() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 462e63b40d..e8f3cb1ff5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -28,6 +28,11 @@ import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + /** * * Creates a list of {@link DestinationTopic.Properties} based on the @@ -63,6 +68,8 @@ public class DestinationTopicPropertiesFactory { private final long timeout; + private final Map>> exceptionBasedRouting; + @Nullable private Boolean autoStartDltHandler; @@ -86,7 +93,8 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff DltStrategy dltStrategy, TopicSuffixingStrategy topicSuffixingStrategy, SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, - long timeout) { + long timeout, + Map>> exceptionBasedRouting) { this.dltStrategy = dltStrategy; this.kafkaOperations = kafkaOperations; @@ -96,6 +104,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); + this.exceptionBasedRouting = exceptionBasedRouting; this.backOffValues = backOffValues; // Max Attempts include the initial try. this.maxAttempts = this.backOffValues.size() + 1; @@ -119,12 +128,18 @@ public List createProperties() { } private List createPropertiesForFixedDelaySingleTopic() { - return isNoDltStrategy() - ? Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, getShouldRetryOn())) - : Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, getShouldRetryOn()), - createDltProperties()); + DestinationTopic.Properties mainTopicProperties = createMainTopicProperties(); + DestinationTopic.Properties retryTopicProperties = createRetryProperties(1, getShouldRetryOn()); + if (isNoDltStrategy()) { + return Arrays.asList(mainTopicProperties, retryTopicProperties); + } else { + DestinationTopic.Properties dltProperties = createDltProperties(); + List customDltProperties = createCustomDltProperties(); + return Stream.concat( + Stream.of(mainTopicProperties, retryTopicProperties, dltProperties), + customDltProperties.stream()) + .toList(); + } } private boolean isSingleTopicFixedDelay() { @@ -136,14 +151,18 @@ private boolean isSingleTopicSameIntervalTopicReuseStrategy() { } private List createPropertiesForDefaultTopicStrategy() { - int retryTopicsAmount = retryTopicsAmount(); - return IntStream.rangeClosed(0, isNoDltStrategy() - ? retryTopicsAmount - : retryTopicsAmount + 1) - .mapToObj(this::createTopicProperties) - .collect(Collectors.toList()); + List basicProperties = IntStream.rangeClosed(0, isNoDltStrategy() + ? retryTopicsAmount + : retryTopicsAmount + 1) + .mapToObj(this::createTopicProperties) + .collect(Collectors.toList()); + if (!isNoDltStrategy()) { + basicProperties.addAll(createCustomDltProperties()); + } + + return basicProperties; } int retryTopicsAmount() { @@ -183,7 +202,15 @@ private DestinationTopic.Properties createMainTopicProperties() { private DestinationTopic.Properties createDltProperties() { return new DestinationTopic.Properties(0, this.destinationTopicSuffixes.getDltSuffix(), DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, - this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler); + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, Collections.emptySet()); + } + + private List createCustomDltProperties() { + return exceptionBasedRouting.entrySet().stream() + .map(entry -> new DestinationTopic.Properties(0, entry.getKey() + "-" + this.destinationTopicSuffixes.getDltSuffix(), + DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy, + this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler, entry.getValue())) + .toList(); } private BiPredicate getShouldRetryOn() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java new file mode 100644 index 0000000000..4ed6d46911 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDestinationDlt.java @@ -0,0 +1,6 @@ +package org.springframework.kafka.retrytopic; + +public @interface ExceptionBasedDestinationDlt { + String customSuffix(); + Class[] exceptions(); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java new file mode 100644 index 0000000000..e58cf71479 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ExceptionBasedDltRouting.java @@ -0,0 +1,5 @@ +package org.springframework.kafka.retrytopic; + +public @interface ExceptionBasedDltRouting { + ExceptionBasedDestinationDlt[] routing() default {}; +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 4e24b1deb7..36bb131c7e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -17,7 +17,10 @@ package org.springframework.kafka.retrytopic; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; @@ -66,6 +69,7 @@ public class RetryTopicConfigurationBuilder { private RetryTopicConfiguration.TopicCreation topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(); + private ConcurrentKafkaListenerContainerFactory listenerContainerFactory; @Nullable @@ -74,6 +78,8 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; + private Map>> exceptionBasedDltRouting = new HashMap<>(); + private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; private long timeout = RetryTopicConstants.NOT_SET; @@ -522,6 +528,11 @@ private BinaryExceptionClassifierBuilder classifierBuilder() { return this.classifierBuilder; } + public RetryTopicConfigurationBuilder exceptionBasedDltRouting(Map>> exceptionBasedDltRouting) { + this.exceptionBasedDltRouting = exceptionBasedDltRouting; + return this; + } + /* ---------------- Configure KafkaListenerContainerFactory -------------- */ /** * Configure the container factory to use. @@ -567,7 +578,7 @@ public RetryTopicConfiguration create(KafkaOperations sendToTopicKafkaTemp new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues, buildClassifier(), this.topicCreationConfiguration.getNumPartitions(), sendToTopicKafkaTemplate, this.dltStrategy, - this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout) + this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout, this.exceptionBasedDltRouting) .autoStartDltHandler(this.autoStartDltHandler) .createProperties(); return new RetryTopicConfiguration(destinationTopicProperties, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index e83bcec19f..1b9a2052c2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -27,14 +27,18 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + /** * @author Tomaz Fernandes @@ -89,10 +93,10 @@ void shouldCreateMainAndDltProperties() { List backOffValues = new BackOffValuesGenerator(1, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - RetryTopicConstants.NOT_SET).createProperties(); + RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); // then assertThat(propertiesList.size() == 2).isTrue(); @@ -112,6 +116,10 @@ void shouldCreateMainAndDltProperties() { } private void assertDltTopic(DestinationTopic.Properties dltProperties) { + assertDltTopic(dltProperties, this.dltSuffix); + } + + private void assertDltTopic(DestinationTopic.Properties dltProperties, String dltSuffix) { assertThat(dltProperties.suffix()).isEqualTo(dltSuffix); assertThat(dltProperties.isDltTopic()).isTrue(); assertThat(dltProperties.isRetryTopic()).isFalse(); @@ -135,10 +143,10 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, - multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET).createProperties(); + multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -186,16 +194,35 @@ void shouldNotCreateDltProperties() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, noDltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, - RetryTopicConstants.NOT_SET).createProperties(); + RetryTopicConstants.NOT_SET, Collections.emptyMap()).createProperties(); // then assertThat(propertiesList.size() == 3).isTrue(); assertThat(propertiesList.get(2).isDltTopic()).isFalse(); } + @Test + void shouldCreateDltPropertiesForCustomExceptionBasedRouting() { + // when + List backOffValues = new BackOffValuesGenerator(1, backOffPolicy).generateValues(); + + String desExcDltSuffix = "deserialization"; + List propertiesList = + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + RetryTopicConstants.NOT_SET, Map.of(desExcDltSuffix, Set.of(DeserializationException.class))).createProperties(); + + // then + assertThat(propertiesList.size()).isSameAs(3); + + assertDltTopic(propertiesList.get(1)); + assertDltTopic(propertiesList.get(2), desExcDltSuffix + "-" + this.dltSuffix); + } + @Test @SuppressWarnings("deprecation") void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { @@ -208,10 +235,10 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuse List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -250,10 +277,10 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); List destinationTopicList = propertiesList .stream() @@ -298,10 +325,10 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // when List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithIndexTopicSuffixingStrategy, - multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); + multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()).createProperties(); // then IntStream.range(1, maxAttempts).forEach(index -> assertThat(propertiesList.get(index).suffix()) @@ -320,10 +347,10 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // when List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, - -1).createProperties(); + -1, Collections.emptyMap()).createProperties(); // then IntStream.range(1, maxAttempts) @@ -346,7 +373,7 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -376,7 +403,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -405,7 +432,7 @@ void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); @@ -430,7 +457,7 @@ void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { // when DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, - dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1, Collections.emptyMap()); List propertiesList = factory.createProperties(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 1b381df46f..4a1574fd10 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -74,7 +74,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null, Collections.emptySet()); protected List allProps = Arrays .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); @@ -93,7 +93,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps2 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); protected List allProps2 = Arrays .asList(mainTopicProps2, firstRetryProps2, secondRetryProps2, dltTopicProps2); @@ -124,7 +124,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps4 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); protected DestinationTopic.Properties mainTopicProps5 = new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, @@ -136,7 +136,7 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps5 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null, Collections.emptySet()); // Holders