6060import org .springframework .kafka .support .ProducerListener ;
6161import org .springframework .kafka .support .converter .RecordMessageConverter ;
6262import org .springframework .kafka .transaction .KafkaTransactionManager ;
63- import org .springframework .retry .backoff .BackOffPolicyBuilder ;
64- import org .springframework .retry .backoff .SleepingBackOffPolicy ;
6563import org .springframework .util .StringUtils ;
64+ import org .springframework .util .backoff .BackOff ;
65+ import org .springframework .util .backoff .ExponentialBackOff ;
66+ import org .springframework .util .backoff .FixedBackOff ;
6667
6768/**
6869 * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@@ -192,13 +193,13 @@ KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) {
192193 @ ConditionalOnSingleCandidate (KafkaTemplate .class )
193194 RetryTopicConfiguration kafkaRetryTopicConfiguration (KafkaTemplate <?, ?> kafkaTemplate ) {
194195 KafkaProperties .Retry .Topic retryTopic = this .properties .getRetry ().getTopic ();
195- RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder .newInstance ()
196+ return RetryTopicConfigurationBuilder .newInstance ()
196197 .maxAttempts (retryTopic .getAttempts ())
197198 .useSingleTopicForSameIntervals ()
198199 .suffixTopicsWithIndexValues ()
199- .doNotAutoCreateRetryTopics ();
200- setBackOffPolicy ( builder , retryTopic .getBackoff ());
201- return builder .create (kafkaTemplate );
200+ .doNotAutoCreateRetryTopics ()
201+ . customBackoff ( getBackOffPolicy ( retryTopic .getBackoff ()))
202+ .create (kafkaTemplate );
202203 }
203204
204205 private void applyKafkaConnectionDetailsForConsumer (Map <String , Object > properties ,
@@ -225,20 +226,28 @@ private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties,
225226 applySslBundle (properties , admin .getSslBundle ());
226227 }
227228
228- private static void setBackOffPolicy (RetryTopicConfigurationBuilder builder , Backoff retryTopicBackoff ) {
229- long delay = (retryTopicBackoff .getDelay () != null ) ? retryTopicBackoff .getDelay ().toMillis () : 0 ;
230- if (delay > 0 ) {
231- PropertyMapper map = PropertyMapper .get ().alwaysApplyingWhenNonNull ();
232- BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder .newBuilder ();
233- map .from (delay ).to (backOffPolicy ::delay );
234- map .from (retryTopicBackoff .getMaxDelay ()).as (Duration ::toMillis ).to (backOffPolicy ::maxDelay );
235- map .from (retryTopicBackoff .getMultiplier ()).to (backOffPolicy ::multiplier );
236- map .from (retryTopicBackoff .isRandom ()).to (backOffPolicy ::random );
237- builder .customBackoff ((SleepingBackOffPolicy <?>) backOffPolicy .build ());
229+ private BackOff getBackOffPolicy (Backoff properties ) {
230+ Duration delay = properties .getDelay ();
231+ Duration maxDelay = properties .getMaxDelay ();
232+ if (delay == null || Duration .ZERO .equals (delay )) {
233+ return new FixedBackOff (0 );
238234 }
239- else {
240- builder .noBackoff ();
235+ if (properties .getMultiplier () > 0 || (maxDelay != null && maxDelay .toMillis () > delay .toMillis ())) {
236+ long jitter = 0 ;
237+ if (properties .isRandom () && maxDelay != null ) {
238+ jitter = (maxDelay .toMillis () - delay .toMillis ()) / 2 ;
239+ }
240+ ExponentialBackOff backOff = new ExponentialBackOff ();
241+ backOff .setInitialInterval (delay .toMillis () + jitter );
242+ backOff .setJitter (jitter );
243+ backOff .setMultiplier (properties .getMultiplier ());
244+ if (maxDelay != null && maxDelay .toMillis () > delay .toMillis ()) {
245+ backOff .setMaxInterval (properties .getMaxDelay ().toMillis ());
246+ }
247+ return backOff ;
241248 }
249+ return new FixedBackOff (delay .toMillis ());
250+
242251 }
243252
244253 static void applySslBundle (Map <String , Object > properties , @ Nullable SslBundle sslBundle ) {
0 commit comments