Skip to content

Commit 0621c54

Browse files
committed
PulsarReader Infrastructure
- Introducing PulsarReader annotation - Annotation bean post processor for PulsarReader - PulsarReader container and registry - PulsarReader endpoint and registry - PulsarReaderConfigurer - PulsarReader based listener adapter - PulsarReader annotation driven Boot auto configuration Resolves #353
1 parent 5b0a69e commit 0621c54

File tree

38 files changed

+2330
-423
lines changed

38 files changed

+2330
-423
lines changed

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-app'
3535
include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-function'
3636
include 'spring-pulsar-sample-apps:sample-reactive'
3737
include 'spring-pulsar-sample-apps:sample-pulsar-binder'
38+
include 'spring-pulsar-sample-apps:sample-pulsar-reader'
3839
include 'spring-pulsar-docs'
3940
include 'spring-pulsar-spring-cloud-stream-binder'
4041
include 'spring-pulsar-test'

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarBootstrapConfiguration.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.springframework.beans.factory.support.RootBeanDefinition;
2121
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
2222
import org.springframework.core.type.AnnotationMetadata;
23-
import org.springframework.pulsar.config.PulsarListenerBeanNames;
23+
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
2424
import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpointRegistry;
2525

2626
/**
@@ -43,16 +43,16 @@ public class ReactivePulsarBootstrapConfiguration implements ImportBeanDefinitio
4343
@Override
4444
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
4545
if (!registry.containsBeanDefinition(
46-
PulsarListenerBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
46+
PulsarAnnotationSupportBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
4747
registry.registerBeanDefinition(
48-
PulsarListenerBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
48+
PulsarAnnotationSupportBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
4949
new RootBeanDefinition(ReactivePulsarListenerAnnotationBeanPostProcessor.class));
5050
}
5151

52-
if (!registry
53-
.containsBeanDefinition(PulsarListenerBeanNames.REACTIVE_PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
52+
if (!registry.containsBeanDefinition(
53+
PulsarAnnotationSupportBeanNames.REACTIVE_PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
5454
registry.registerBeanDefinition(
55-
PulsarListenerBeanNames.REACTIVE_PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
55+
PulsarAnnotationSupportBeanNames.REACTIVE_PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
5656
new RootBeanDefinition(ReactivePulsarListenerEndpointRegistry.class));
5757
}
5858
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
7676
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
7777
import org.springframework.pulsar.annotation.PulsarListenerConfigurer;
78-
import org.springframework.pulsar.config.PulsarListenerBeanNames;
78+
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
7979
import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar;
8080
import org.springframework.pulsar.reactive.config.MethodReactivePulsarListenerEndpoint;
8181
import org.springframework.pulsar.reactive.config.ReactivePulsarListenerContainerFactory;
@@ -211,7 +211,7 @@ public void afterSingletonsInstantiated() {
211211
Assert.state(this.beanFactory != null,
212212
"BeanFactory must be set to find endpoint registry by bean name");
213213
this.endpointRegistry = this.beanFactory.getBean(
214-
PulsarListenerBeanNames.REACTIVE_PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
214+
PulsarAnnotationSupportBeanNames.REACTIVE_PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
215215
ReactivePulsarListenerEndpointRegistry.class);
216216
}
217217
this.registrar.setEndpointRegistry(this.endpointRegistry);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
plugins {
2+
id 'org.springframework.pulsar.spring-module'
3+
id 'org.springframework.boot' version '3.0.2'
4+
}
5+
6+
description = 'Spring Pulsar Sample Application (Send and Receive)'
7+
8+
dependencies {
9+
implementation project(':spring-pulsar-spring-boot-starter')
10+
implementation 'com.google.code.findbugs:jsr305'
11+
}
12+
13+
bootRun {
14+
jvmArgs = [
15+
"--add-opens", "java.base/java.lang=ALL-UNNAMED",
16+
"--add-opens", "java.base/java.util=ALL-UNNAMED",
17+
"--add-opens", "java.base/sun.net=ALL-UNNAMED"
18+
]
19+
}
20+
21+
project.afterEvaluate {
22+
project.tasks.publishArtifacts.enabled(false)
23+
project.tasks.artifactoryPublish.enabled(false)
24+
project.tasks.publishToOssrh.enabled(false)
25+
project.tasks.publishMavenJavaPublicationToOssrhRepository.enabled(false)
26+
project.tasks.publishAllPublicationsToOssrhRepository.enabled(false)
27+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reader.app;
18+
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import org.springframework.boot.ApplicationRunner;
23+
import org.springframework.boot.SpringApplication;
24+
import org.springframework.boot.autoconfigure.SpringBootApplication;
25+
import org.springframework.context.annotation.Bean;
26+
import org.springframework.pulsar.annotation.PulsarReader;
27+
import org.springframework.pulsar.core.PulsarTemplate;
28+
29+
@SpringBootApplication
30+
public class SpringPulsarReaderBootApp {
31+
32+
private static final Logger logger = LoggerFactory.getLogger(SpringPulsarReaderBootApp.class);
33+
34+
public static void main(String[] args) {
35+
SpringApplication.run(SpringPulsarReaderBootApp.class, args);
36+
}
37+
38+
/*
39+
* Basic publisher using PulsarTemplate<String> and a PulsarReader.
40+
*/
41+
@Bean
42+
ApplicationRunner runner1(PulsarTemplate<String> pulsarTemplate) {
43+
44+
String topic1 = "pulsar-reader-demo-topic";
45+
46+
return args -> {
47+
for (int i = 0; i < 10; i++) {
48+
pulsarTemplate.send(topic1, "This is message " + (i + 1));
49+
}
50+
};
51+
}
52+
53+
@PulsarReader(id = "my-id", subscriptionName = "pulsar-reader-demo-subscription",
54+
topics = "pulsar-reader-demo-topic")
55+
void read(String message) {
56+
logger.info(message);
57+
}
58+
59+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/**
2+
* Package containing sample app for the framework.
3+
*/
4+
@NonNullApi
5+
@NonNullFields
6+
package reader.app;
7+
8+
import org.springframework.lang.NonNullApi;
9+
import org.springframework.lang.NonNullFields;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAnnotationDrivenConfiguration.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@
2626
import org.springframework.context.annotation.Configuration;
2727
import org.springframework.pulsar.annotation.EnablePulsar;
2828
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
29-
import org.springframework.pulsar.config.PulsarListenerBeanNames;
29+
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
30+
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
3031
import org.springframework.pulsar.core.PulsarConsumerFactory;
32+
import org.springframework.pulsar.core.PulsarReaderFactory;
3133
import org.springframework.pulsar.core.SchemaResolver;
3234
import org.springframework.pulsar.core.TopicResolver;
3335
import org.springframework.pulsar.listener.PulsarContainerProperties;
3436
import org.springframework.pulsar.observation.PulsarListenerObservationConvention;
37+
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
3538
import org.springframework.util.unit.DataSize;
3639

3740
import io.micrometer.observation.ObservationRegistry;
@@ -80,9 +83,25 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
8083
? observationRegistryProvider.getIfUnique() : null);
8184
}
8285

86+
@Bean
87+
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
88+
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(
89+
ObjectProvider<PulsarReaderFactory<Object>> readerFactoryProvider, SchemaResolver schemaResolver) {
90+
91+
PulsarReaderContainerProperties containerProperties = new PulsarReaderContainerProperties();
92+
containerProperties.setSchemaResolver(schemaResolver);
93+
94+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
95+
PulsarProperties.Reader readerProperties = this.pulsarProperties.getReader();
96+
map.from(readerProperties::getTopicNames).to(containerProperties::setTopics);
97+
98+
return new DefaultPulsarReaderContainerFactory<>(readerFactoryProvider.getIfAvailable(), containerProperties);
99+
}
100+
83101
@Configuration(proxyBeanMethods = false)
84102
@EnablePulsar
85-
@ConditionalOnMissingBean(name = PulsarListenerBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
103+
@ConditionalOnMissingBean(name = { PulsarAnnotationSupportBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
104+
PulsarAnnotationSupportBeanNames.PULSAR_READER_ANNOTATION_PROCESSOR_BEAN_NAME })
86105
static class EnablePulsarConfiguration {
87106

88107
}

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,7 +1216,7 @@ public static class Reader {
12161216
/**
12171217
* Topic names.
12181218
*/
1219-
private String[] topicNames;
1219+
private List<String> topicNames;
12201220

12211221
/**
12221222
* Size of a consumer's receiver queue.
@@ -1249,11 +1249,11 @@ public static class Reader {
12491249
*/
12501250
private Boolean resetIncludeHead;
12511251

1252-
public String[] getTopicNames() {
1252+
public List<String> getTopicNames() {
12531253
return this.topicNames;
12541254
}
12551255

1256-
public void setTopicNames(String[] topicNames) {
1256+
public void setTopicNames(List<String> topicNames) {
12571257
this.topicNames = topicNames;
12581258
}
12591259

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarReactiveAnnotationDrivenConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.springframework.boot.context.properties.PropertyMapper;
2323
import org.springframework.context.annotation.Bean;
2424
import org.springframework.context.annotation.Configuration;
25-
import org.springframework.pulsar.config.PulsarListenerBeanNames;
25+
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
2626
import org.springframework.pulsar.core.SchemaResolver;
2727
import org.springframework.pulsar.core.TopicResolver;
2828
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
@@ -68,7 +68,8 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
6868

6969
@Configuration(proxyBeanMethods = false)
7070
@EnableReactivePulsar
71-
@ConditionalOnMissingBean(name = PulsarListenerBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
71+
@ConditionalOnMissingBean(
72+
name = PulsarAnnotationSupportBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
7273
static class EnableReactivePulsarConfiguration {
7374

7475
}

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.assertj.core.api.Assertions.entry;
2121
import static org.mockito.Mockito.mock;
2222

23+
import java.util.List;
2324
import java.util.concurrent.TimeUnit;
2425

2526
import org.apache.pulsar.client.api.PulsarClient;
@@ -496,8 +497,7 @@ void readerFactoryCanBeConfigured() {
496497
"spring.pulsar.reader.subscription-role-prefix=test-prefix",
497498
"spring.pulsar.reader.read-compacted=true", "spring.pulsar.reader.reset-include-head=true")
498499
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarReaderFactory.class)
499-
.extracting("readerConfig")
500-
.hasFieldOrPropertyWithValue("topicNames", new String[] { "foo" })
500+
.extracting("readerConfig").hasFieldOrPropertyWithValue("topicNames", List.of("foo"))
501501
.hasFieldOrPropertyWithValue("receiverQueueSize", 200)
502502
.hasFieldOrPropertyWithValue("readerName", "test-reader")
503503
.hasFieldOrPropertyWithValue("subscriptionName", "test-subscription")

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ void readerProperties() {
529529

530530
assertThat(readerProps)
531531
.hasEntrySatisfying("topicNames",
532-
topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.array(String[].class))
532+
topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.list(String.class))
533533
.containsExactly("my-topic"))
534534
.containsEntry("receiverQueueSize", 100).containsEntry("readerName", "my-reader")
535535
.containsEntry("subscriptionName", "my-subscription")

0 commit comments

Comments
 (0)