@@ -13,8 +13,7 @@ Licensed under MIT License Copyright (c) 2023 Raja Kolli.
13
13
import com .example .common .dtos .OrderDto ;
14
14
import com .example .orderservice .services .OrderManageService ;
15
15
import java .time .Duration ;
16
- import java .util .Map ;
17
- import org .apache .kafka .clients .consumer .ConsumerConfig ;
16
+ import java .util .Properties ;
18
17
import org .apache .kafka .common .TopicPartition ;
19
18
import org .apache .kafka .common .serialization .Serde ;
20
19
import org .apache .kafka .common .serialization .Serdes ;
@@ -31,19 +30,16 @@ Licensed under MIT License Copyright (c) 2023 Raja Kolli.
31
30
import org .apache .kafka .streams .state .Stores ;
32
31
import org .slf4j .Logger ;
33
32
import org .slf4j .LoggerFactory ;
34
- import org .springframework .boot .autoconfigure .kafka .KafkaConnectionDetails ;
35
- import org .springframework .boot .autoconfigure .kafka .KafkaProperties ;
36
33
import org .springframework .context .annotation .Bean ;
37
34
import org .springframework .context .annotation .Configuration ;
38
35
import org .springframework .kafka .annotation .EnableKafkaStreams ;
39
- import org .springframework .kafka .annotation .KafkaStreamsDefaultConfiguration ;
40
- import org .springframework .kafka .config .KafkaStreamsConfiguration ;
41
36
import org .springframework .kafka .config .StreamsBuilderFactoryBeanConfigurer ;
42
37
import org .springframework .kafka .core .KafkaTemplate ;
43
38
import org .springframework .kafka .core .ProducerFactory ;
44
39
import org .springframework .kafka .listener .DeadLetterPublishingRecoverer ;
45
40
import org .springframework .kafka .streams .RecoveringDeserializationExceptionHandler ;
46
41
import org .springframework .kafka .support .serializer .JsonSerde ;
42
+ import org .springframework .util .Assert ;
47
43
48
44
@ Configuration (proxyBeanMethods = false )
49
45
@ EnableKafkaStreams
@@ -58,32 +54,23 @@ class KafkaStreamsConfig {
58
54
}
59
55
60
56
@ Bean
61
- StreamsBuilderFactoryBeanConfigurer configurer () {
57
+ StreamsBuilderFactoryBeanConfigurer configurer (
58
+ DeadLetterPublishingRecoverer deadLetterPublishingRecoverer ) {
62
59
return factoryBean -> {
63
60
factoryBean .setStateListener (
64
61
(newState , oldState ) ->
65
62
log .info ("State transition from {} to {} " , oldState , newState ));
63
+ Properties streamsConfiguration = factoryBean .getStreamsConfiguration ();
64
+ Assert .notNull (streamsConfiguration , "streamsConfiguration must not be null" );
65
+ streamsConfiguration .put (
66
+ StreamsConfig .DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG ,
67
+ RecoveringDeserializationExceptionHandler .class );
68
+ streamsConfiguration .put (
69
+ RecoveringDeserializationExceptionHandler .KSTREAM_DESERIALIZATION_RECOVERER ,
70
+ deadLetterPublishingRecoverer );
66
71
};
67
72
}
68
73
69
- @ Bean (KafkaStreamsDefaultConfiguration .DEFAULT_STREAMS_CONFIG_BEAN_NAME )
70
- KafkaStreamsConfiguration defaultKafkaStreamsConfig (
71
- KafkaConnectionDetails connectionDetails ,
72
- KafkaProperties kafkaProperties ,
73
- DeadLetterPublishingRecoverer deadLetterPublishingRecoverer ) {
74
- Map <String , Object > properties = kafkaProperties .buildStreamsProperties (null );
75
- properties .put (
76
- ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ,
77
- connectionDetails .getStreamsBootstrapServers ());
78
- properties .put (
79
- StreamsConfig .DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG ,
80
- RecoveringDeserializationExceptionHandler .class );
81
- properties .put (
82
- RecoveringDeserializationExceptionHandler .KSTREAM_DESERIALIZATION_RECOVERER ,
83
- deadLetterPublishingRecoverer );
84
- return new KafkaStreamsConfiguration (properties );
85
- }
86
-
87
74
@ Bean
88
75
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer (
89
76
ProducerFactory <byte [], byte []> producerFactory ) {
0 commit comments