Skip to content

Commit c668097

Browse files
committed
Pulsar Reader auto config (imperative)
Resolves #328
1 parent 9fb8879 commit c668097

File tree

4 files changed

+41
-2
lines changed

4 files changed

+41
-2
lines changed

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@
3535
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
3636
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
3737
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
38+
import org.springframework.pulsar.core.DefaultPulsarReaderFactory;
3839
import org.springframework.pulsar.core.DefaultSchemaResolver;
3940
import org.springframework.pulsar.core.DefaultTopicResolver;
4041
import org.springframework.pulsar.core.PulsarAdministration;
4142
import org.springframework.pulsar.core.PulsarConsumerFactory;
4243
import org.springframework.pulsar.core.PulsarProducerFactory;
44+
import org.springframework.pulsar.core.PulsarReaderFactory;
4345
import org.springframework.pulsar.core.PulsarTemplate;
4446
import org.springframework.pulsar.core.SchemaResolver;
4547
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
@@ -157,4 +159,10 @@ public PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministr
157159
this.properties.getFunction().getPropagateStopFailures());
158160
}
159161

162+
@Bean
163+
@ConditionalOnMissingBean
164+
public PulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient) {
165+
return new DefaultPulsarReaderFactory<>(pulsarClient, this.properties.buildReaderProperties());
166+
}
167+
160168
}

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2185,7 +2185,7 @@ public Map<String, Object> buildProperties() {
21852185

21862186
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
21872187

2188-
map.from(this::getTopicNames).to(properties.in("topicName"));
2188+
map.from(this::getTopicNames).to(properties.in("topicNames"));
21892189
map.from(this::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
21902190
map.from(this::getReaderName).to(properties.in("readerName"));
21912191
map.from(this::getSubscriptionName).to(properties.in("subscriptionName"));

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,13 @@
5050
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
5151
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
5252
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
53+
import org.springframework.pulsar.core.DefaultPulsarReaderFactory;
5354
import org.springframework.pulsar.core.DefaultSchemaResolver;
5455
import org.springframework.pulsar.core.DefaultTopicResolver;
5556
import org.springframework.pulsar.core.PulsarAdministration;
5657
import org.springframework.pulsar.core.PulsarConsumerFactory;
5758
import org.springframework.pulsar.core.PulsarProducerFactory;
59+
import org.springframework.pulsar.core.PulsarReaderFactory;
5860
import org.springframework.pulsar.core.PulsarTemplate;
5961
import org.springframework.pulsar.core.SchemaResolver;
6062
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
@@ -490,6 +492,35 @@ private void assertHasProducerFactoryOfType(Class<?> producerFactoryType,
490492

491493
}
492494

495+
@Nested
496+
class ReaderFactoryAutoConfigurationTests {
497+
498+
@Test
499+
void readerFactoryIsAutoConfiguredByDefault() {
500+
contextRunner.run((context) -> assertThat(context).hasNotFailed().hasSingleBean(PulsarReaderFactory.class)
501+
.getBean(PulsarReaderFactory.class).isExactlyInstanceOf(DefaultPulsarReaderFactory.class));
502+
}
503+
504+
@Test
505+
void readerFactoryCanBeConfigured() {
506+
contextRunner.withPropertyValues("spring.pulsar.reader.topic-names=foo",
507+
"spring.pulsar.reader.receiver-queue-size=200", "spring.pulsar.reader.reader-name=test-reader",
508+
"spring.pulsar.reader.subscription-name=test-subscription",
509+
"spring.pulsar.reader.subscription-role-prefix=test-prefix",
510+
"spring.pulsar.reader.read-compacted=true", "spring.pulsar.reader.reset-include-head=true")
511+
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarReaderFactory.class)
512+
.extracting("readerConfig")
513+
.hasFieldOrPropertyWithValue("topicNames", new String[] { "foo" })
514+
.hasFieldOrPropertyWithValue("receiverQueueSize", 200)
515+
.hasFieldOrPropertyWithValue("readerName", "test-reader")
516+
.hasFieldOrPropertyWithValue("subscriptionName", "test-subscription")
517+
.hasFieldOrPropertyWithValue("subscriptionRolePrefix", "test-prefix")
518+
.hasFieldOrPropertyWithValue("readCompacted", true)
519+
.hasFieldOrPropertyWithValue("resetIncludeHead", true)));
520+
}
521+
522+
}
523+
493524
@Configuration(proxyBeanMethods = false)
494525
static class InterceptorTestConfiguration {
495526

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ void readerProperties() {
528528
new ReaderConfigurationData<>(), ReaderConfigurationData.class));
529529

530530
assertThat(readerProps)
531-
.hasEntrySatisfying("topicName",
531+
.hasEntrySatisfying("topicNames",
532532
topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.array(String[].class))
533533
.containsExactly("my-topic"))
534534
.containsEntry("receiverQueueSize", 100).containsEntry("readerName", "my-reader")

0 commit comments

Comments
 (0)