Skip to content

Commit 3bbbef7

Browse files
committed
Polish "Add support for Pulsar default tenant/namespace"
See gh-41851
1 parent 98da3aa commit 3bbbef7

File tree

5 files changed

+96
-36
lines changed

5 files changed

+96
-36
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,28 +89,30 @@ public class PulsarAutoConfiguration {
8989
@ConditionalOnMissingBean(PulsarProducerFactory.class)
9090
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
9191
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
92-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
92+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
93+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
9394
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
9495
customizersProvider);
9596
DefaultPulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
9697
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver);
97-
producerFactory.setTopicBuilder(topicBuilder);
98+
topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder);
9899
return producerFactory;
99100
}
100101

101102
@Bean
102103
@ConditionalOnMissingBean(PulsarProducerFactory.class)
103104
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
104105
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
105-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
106+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
107+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
106108
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
107109
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
108110
customizersProvider);
109111
CachingPulsarProducerFactory<?> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient,
110112
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver,
111113
cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(),
112114
cacheProperties.getInitialCapacity());
113-
producerFactory.setTopicBuilder(topicBuilder);
115+
topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder);
114116
return producerFactory;
115117
}
116118

@@ -144,15 +146,16 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
144146
@Bean
145147
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
146148
DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
147-
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
149+
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider,
150+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
148151
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
149152
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
150153
customizers.addAll(customizersProvider.orderedStream().toList());
151154
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
152155
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
153156
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
154157
lambdaSafeCustomizers);
155-
consumerFactory.setTopicBuilder(topicBuilder);
158+
topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder);
156159
return consumerFactory;
157160
}
158161

@@ -190,15 +193,16 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
190193
@Bean
191194
@ConditionalOnMissingBean(PulsarReaderFactory.class)
192195
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
193-
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
196+
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider,
197+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
194198
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
195199
customizers.add(this.propertiesMapper::customizeReaderBuilder);
196200
customizers.addAll(customizersProvider.orderedStream().toList());
197201
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
198202
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
199203
DefaultPulsarReaderFactory<?> readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient,
200204
lambdaSafeCustomizers);
201-
readerFactory.setTopicBuilder(topicBuilder);
205+
topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder);
202206
return readerFactory;
203207
}
204208

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -257,15 +257,7 @@ public static class Defaults {
257257
*/
258258
private List<TypeMapping> typeMappings = new ArrayList<>();
259259

260-
private Topic topic = new Topic();
261-
262-
public Topic getTopic() {
263-
return this.topic;
264-
}
265-
266-
public void setTopic(Topic topic) {
267-
this.topic = topic;
268-
}
260+
private final Topic topic = new Topic();
269261

270262
public List<TypeMapping> getTypeMappings() {
271263
return this.typeMappings;
@@ -275,6 +267,10 @@ public void setTypeMappings(List<TypeMapping> typeMappings) {
275267
this.typeMappings = typeMappings;
276268
}
277269

270+
public Topic getTopic() {
271+
return this.topic;
272+
}
273+
278274
/**
279275
* A mapping from message type to topic and/or schema info to use (at least one of
280276
* {@code topicName} or {@code schemaInfo} must be specified.

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2023 the original author or authors.
2+
* Copyright 2012-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@
4949
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
5050
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory;
5151
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory;
52+
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory.Builder;
5253
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
5354
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
5455
import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer;
@@ -114,18 +115,18 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach
114115
DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
115116
ObjectProvider<ReactiveMessageSenderCache> reactiveMessageSenderCache, TopicResolver topicResolver,
116117
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider,
117-
PulsarTopicBuilder topicBuilder) {
118+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
118119
List<ReactiveMessageSenderBuilderCustomizer<?>> customizers = new ArrayList<>();
119120
customizers.add(this.propertiesMapper::customizeMessageSenderBuilder);
120121
customizers.addAll(customizersProvider.orderedStream().toList());
121122
List<ReactiveMessageSenderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
122123
.of((builder) -> applyMessageSenderBuilderCustomizers(customizers, builder));
123-
return DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient)
124+
Builder<Object> senderFactoryBuilder = DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient)
124125
.withDefaultConfigCustomizers(lambdaSafeCustomizers)
125126
.withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable())
126-
.withTopicResolver(topicResolver)
127-
.withTopicBuilder(topicBuilder)
128-
.build();
127+
.withTopicResolver(topicResolver);
128+
topicBuilderProvider.ifAvailable(senderFactoryBuilder::withTopicBuilder);
129+
return senderFactoryBuilder.build();
129130
}
130131

131132
@SuppressWarnings("unchecked")
@@ -140,15 +141,15 @@ private void applyMessageSenderBuilderCustomizers(List<ReactiveMessageSenderBuil
140141
DefaultReactivePulsarConsumerFactory<?> reactivePulsarConsumerFactory(
141142
ReactivePulsarClient pulsarReactivePulsarClient,
142143
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider,
143-
PulsarTopicBuilder topicBuilder) {
144+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
144145
List<ReactiveMessageConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
145146
customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder);
146147
customizers.addAll(customizersProvider.orderedStream().toList());
147148
List<ReactiveMessageConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
148149
.of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder));
149150
DefaultReactivePulsarConsumerFactory<?> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
150151
pulsarReactivePulsarClient, lambdaSafeCustomizers);
151-
consumerFactory.setTopicBuilder(topicBuilder);
152+
topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder);
152153
return consumerFactory;
153154
}
154155

@@ -175,15 +176,15 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
175176
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
176177
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
177178
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider,
178-
PulsarTopicBuilder topicBuilder) {
179+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
179180
List<ReactiveMessageReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
180181
customizers.add(this.propertiesMapper::customizeMessageReaderBuilder);
181182
customizers.addAll(customizersProvider.orderedStream().toList());
182183
List<ReactiveMessageReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
183184
.of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder));
184185
DefaultReactivePulsarReaderFactory<?> readerFactory = new DefaultReactivePulsarReaderFactory<>(
185186
reactivePulsarClient, lambdaSafeCustomizers);
186-
readerFactory.setTopicBuilder(topicBuilder);
187+
topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder);
187188
return readerFactory;
188189
}
189190

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.springframework.pulsar.core.PulsarProducerFactory;
6868
import org.springframework.pulsar.core.PulsarReaderFactory;
6969
import org.springframework.pulsar.core.PulsarTemplate;
70+
import org.springframework.pulsar.core.PulsarTopicBuilder;
7071
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
7172
import org.springframework.pulsar.core.SchemaResolver;
7273
import org.springframework.pulsar.core.TopicResolver;
@@ -126,6 +127,7 @@ void autoConfiguresBeans() {
126127
.hasSingleBean(PulsarConnectionDetails.class)
127128
.hasSingleBean(DefaultPulsarClientFactory.class)
128129
.hasSingleBean(PulsarClient.class)
130+
.hasSingleBean(PulsarTopicBuilder.class)
129131
.hasSingleBean(PulsarAdministration.class)
130132
.hasSingleBean(DefaultSchemaResolver.class)
131133
.hasSingleBean(DefaultTopicResolver.class)
@@ -141,6 +143,12 @@ void autoConfiguresBeans() {
141143
.hasSingleBean(PulsarReaderEndpointRegistry.class));
142144
}
143145

146+
@Test
147+
void topicDefaultsCanBeDisabled() {
148+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
149+
.run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class));
150+
}
151+
144152
@Nested
145153
class ProducerFactoryTests {
146154

@@ -221,7 +229,15 @@ void injectsExpectedBeans() {
221229
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
222230
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))
223231
.extracting("topicBuilder")
224-
.isNotNull()); // prototype so only check not-null
232+
.isNotNull());
233+
}
234+
235+
@Test
236+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
237+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
238+
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
239+
.extracting("topicBuilder")
240+
.isNull());
225241
}
226242

227243
@ParameterizedTest
@@ -379,7 +395,16 @@ void injectsExpectedBeans() {
379395
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
380396
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
381397
.extracting("topicBuilder")
382-
.isNotNull()); // prototype so only check not-null
398+
.isNotNull());
399+
}
400+
401+
@Test
402+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
403+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
404+
.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
405+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
406+
.extracting("topicBuilder")
407+
.isNull());
383408
}
384409

385410
@Test
@@ -580,7 +605,15 @@ void injectsExpectedBeans() {
580605
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
581606
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
582607
.extracting("topicBuilder")
583-
.isNotNull()); // prototype so only check not-null
608+
.isNotNull());
609+
}
610+
611+
@Test
612+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
613+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
614+
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
615+
.extracting("topicBuilder")
616+
.isNull());
584617
}
585618

586619
@Test

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2023 the original author or authors.
2+
* Copyright 2012-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -115,6 +115,7 @@ void whenCustomPulsarListenerAnnotationProcessorDefinedAutoConfigurationIsSkippe
115115
void autoConfiguresBeans() {
116116
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class)
117117
.hasSingleBean(PulsarClient.class)
118+
.hasSingleBean(PulsarTopicBuilder.class)
118119
.hasSingleBean(PulsarAdministration.class)
119120
.hasSingleBean(DefaultSchemaResolver.class)
120121
.hasSingleBean(DefaultTopicResolver.class)
@@ -129,6 +130,12 @@ void autoConfiguresBeans() {
129130
.hasSingleBean(ReactivePulsarListenerEndpointRegistry.class));
130131
}
131132

133+
@Test
134+
void topicDefaultsCanBeDisabled() {
135+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
136+
.run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class));
137+
}
138+
132139
@Test
133140
@SuppressWarnings("rawtypes")
134141
void injectsExpectedBeansIntoReactivePulsarClient() {
@@ -178,14 +185,17 @@ void injectsExpectedBeans() {
178185
assertThat(senderFactory)
179186
.extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class))
180187
.isSameAs(context.getBean(TopicResolver.class));
181-
assertThat(senderFactory).extracting("topicBuilder").isNotNull(); // prototype
182-
// so
183-
// only
184-
// check
185-
// not-null
188+
assertThat(senderFactory).extracting("topicBuilder").isNotNull();
186189
});
187190
}
188191

192+
@Test
193+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
194+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
195+
.run((context) -> assertThat((DefaultReactivePulsarSenderFactory<?>) context
196+
.getBean(DefaultReactivePulsarSenderFactory.class)).extracting("topicBuilder").isNull());
197+
}
198+
189199
@Test
190200
void injectsExpectedBeansIntoReactiveMessageSenderCache() {
191201
ProducerCacheProvider provider = mock(ProducerCacheProvider.class);
@@ -273,6 +283,15 @@ void injectsExpectedBeans() {
273283
});
274284
}
275285

286+
@Test
287+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
288+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
289+
.run((context) -> assertThat(
290+
(ReactivePulsarConsumerFactory<?>) context.getBean(DefaultReactivePulsarConsumerFactory.class))
291+
.extracting("topicBuilder")
292+
.isNull());
293+
}
294+
276295
@Test
277296
<T> void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
278297
this.contextRunner.withPropertyValues("spring.pulsar.consumer.name=fromPropsCustomizer")
@@ -389,6 +408,13 @@ void injectsExpectedBeans() {
389408
});
390409
}
391410

411+
@Test
412+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
413+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
414+
.run((context) -> assertThat((DefaultReactivePulsarReaderFactory<?>) context
415+
.getBean(DefaultReactivePulsarReaderFactory.class)).extracting("topicBuilder").isNull());
416+
}
417+
392418
@Test
393419
<T> void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
394420
this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer")

0 commit comments

Comments
 (0)