Skip to content

Commit 4944be6

Browse files
cbornetonobc
authored andcommitted
Add autoconfiguration for ReactivePulsarListener
1 parent 1edbe63 commit 4944be6

File tree

6 files changed

+281
-1
lines changed

6 files changed

+281
-1
lines changed

buildSrc/src/main/java/org/springframework/pulsar/gradle/docs/configprops/DocumentConfigurationProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ void documentConfigurationProperties() throws IOException {
7979
});
8080
snippets.add("application-properties.pulsar-reactive-consumer", "Pulsar Reactive Consumer Properties", (c) -> {
8181
c.accept("spring.pulsar.reactive.consumer");
82+
c.accept("spring.pulsar.reactive.listener");
8283
});
8384
snippets.add("application-properties.pulsar-reactive-reader", "Pulsar Reactive Reader Properties", (c) -> {
8485
c.accept("spring.pulsar.reactive.reader");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.autoconfigure;
18+
19+
import org.springframework.beans.factory.ObjectProvider;
20+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
21+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
22+
import org.springframework.boot.context.properties.PropertyMapper;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.context.annotation.Configuration;
25+
import org.springframework.pulsar.annotation.EnablePulsar;
26+
import org.springframework.pulsar.config.PulsarListenerBeanNames;
27+
import org.springframework.pulsar.config.reactive.DefaultReactivePulsarListenerContainerFactory;
28+
import org.springframework.pulsar.core.reactive.ReactivePulsarConsumerFactory;
29+
import org.springframework.pulsar.listener.reactive.ReactivePulsarContainerProperties;
30+
31+
/**
32+
* Configuration for Reactive Pulsar annotation-driven support.
33+
*
34+
* @author Christophe Bornet
35+
*/
36+
@Configuration(proxyBeanMethods = false)
37+
@ConditionalOnClass(EnablePulsar.class)
38+
public class PulsarReactiveAnnotationDrivenConfiguration {
39+
40+
private final PulsarReactiveProperties properties;
41+
42+
public PulsarReactiveAnnotationDrivenConfiguration(PulsarReactiveProperties properties) {
43+
this.properties = properties;
44+
}
45+
46+
@Bean
47+
@ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory")
48+
DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory(
49+
ObjectProvider<ReactivePulsarConsumerFactory<Object>> consumerFactoryProvider) {
50+
51+
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
52+
containerProperties.setSubscriptionType(this.properties.getConsumer().getSubscriptionType());
53+
54+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
55+
PulsarReactiveProperties.Listener listenerProperties = this.properties.getListener();
56+
map.from(listenerProperties::getSchemaType).to(containerProperties::setSchemaType);
57+
map.from(listenerProperties::getHandlingTimeout).to(containerProperties::setHandlingTimeout);
58+
59+
return new DefaultReactivePulsarListenerContainerFactory<>(consumerFactoryProvider.getIfAvailable(),
60+
containerProperties);
61+
}
62+
63+
@Configuration(proxyBeanMethods = false)
64+
@EnablePulsar
65+
@ConditionalOnMissingBean(name = PulsarListenerBeanNames.REACTIVE_PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
66+
static class EnableReactivePulsarConfiguration {
67+
68+
}
69+
70+
}

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarReactiveAutoConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
3232
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3333
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Import;
3435
import org.springframework.pulsar.core.reactive.DefaultReactivePulsarConsumerFactory;
3536
import org.springframework.pulsar.core.reactive.DefaultReactivePulsarReaderFactory;
3637
import org.springframework.pulsar.core.reactive.DefaultReactivePulsarSenderFactory;
@@ -50,6 +51,7 @@
5051
@AutoConfiguration(after = PulsarAutoConfiguration.class)
5152
@ConditionalOnClass({ ReactivePulsarTemplate.class, ReactivePulsarClient.class })
5253
@EnableConfigurationProperties(PulsarReactiveProperties.class)
54+
@Import({ PulsarReactiveAnnotationDrivenConfiguration.class })
5355
public class PulsarReactiveAutoConfiguration {
5456

5557
private final PulsarReactiveProperties properties;

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarReactiveProperties.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3636
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3737
import org.apache.pulsar.client.api.SubscriptionType;
38+
import org.apache.pulsar.common.schema.SchemaType;
3839
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
3940
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageReaderSpec;
4041
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageSenderSpec;
@@ -67,6 +68,8 @@ public class PulsarReactiveProperties {
6768

6869
private final Reader reader = new Reader();
6970

71+
private final Listener listener = new Listener();
72+
7073
public Sender getSender() {
7174
return this.sender;
7275
}
@@ -79,6 +82,10 @@ public Reader getReader() {
7982
return this.reader;
8083
}
8184

85+
public Listener getListener() {
86+
return this.listener;
87+
}
88+
8289
public ReactiveMessageSenderSpec buildReactiveMessageSenderSpec() {
8390
return this.sender.buildReactiveMessageSenderSpec();
8491
}
@@ -944,4 +951,34 @@ public void setInitialCapacity(Integer initialCapacity) {
944951

945952
}
946953

954+
public static class Listener {
955+
956+
/**
957+
* SchemaType of the consumed messages.
958+
*/
959+
private SchemaType schemaType;
960+
961+
/**
962+
* Duration to wait before the message handling times out.
963+
*/
964+
private Duration handlingTimeout = Duration.ofMinutes(2);
965+
966+
public SchemaType getSchemaType() {
967+
return this.schemaType;
968+
}
969+
970+
public void setSchemaType(SchemaType schemaType) {
971+
this.schemaType = schemaType;
972+
}
973+
974+
public Duration getHandlingTimeout() {
975+
return this.handlingTimeout;
976+
}
977+
978+
public void setHandlingTimeout(Duration handlingTimeout) {
979+
this.handlingTimeout = handlingTimeout;
980+
}
981+
982+
}
983+
947984
}

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,13 @@
4444
import org.springframework.boot.test.context.FilteredClassLoader;
4545
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
4646
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
47+
import org.springframework.pulsar.annotation.EnablePulsar;
48+
import org.springframework.pulsar.annotation.ReactivePulsarBootstrapConfiguration;
49+
import org.springframework.pulsar.annotation.ReactivePulsarListenerAnnotationBeanPostProcessor;
4750
import org.springframework.pulsar.config.PulsarClientFactoryBean;
51+
import org.springframework.pulsar.config.reactive.DefaultReactivePulsarListenerContainerFactory;
52+
import org.springframework.pulsar.config.reactive.ReactivePulsarListenerContainerFactory;
53+
import org.springframework.pulsar.config.reactive.ReactivePulsarListenerEndpointRegistry;
4854
import org.springframework.pulsar.core.reactive.DefaultReactivePulsarConsumerFactory;
4955
import org.springframework.pulsar.core.reactive.DefaultReactivePulsarReaderFactory;
5056
import org.springframework.pulsar.core.reactive.DefaultReactivePulsarSenderFactory;
@@ -77,12 +83,31 @@ void autoConfigurationSkippedWhenReactivePulsarTemplateNotOnClasspath() {
7783
(context) -> assertThat(context).hasNotFailed().doesNotHaveBean(PulsarReactiveAutoConfiguration.class));
7884
}
7985

86+
@Test
87+
void annotationDrivenConfigurationSkippedWhenEnablePulsarAnnotationNotOnClasspath() {
88+
this.contextRunner.withClassLoader(new FilteredClassLoader(EnablePulsar.class))
89+
.run((context) -> assertThat(context).hasNotFailed()
90+
.doesNotHaveBean(PulsarReactiveAnnotationDrivenConfiguration.class));
91+
}
92+
93+
@Test
94+
void bootstrapConfigurationSkippedWhenCustomReactivePulsarListenerAnnotationProcessorDefined() {
95+
this.contextRunner
96+
.withBean("org.springframework.pulsar.config.internalReactivePulsarListenerAnnotationProcessor",
97+
String.class, () -> "someFauxBean")
98+
.run((context) -> assertThat(context).hasNotFailed()
99+
.doesNotHaveBean(ReactivePulsarBootstrapConfiguration.class));
100+
}
101+
80102
@Test
81103
void defaultBeansAreAutoConfigured() {
82104
this.contextRunner.run((context) -> assertThat(context).hasNotFailed()
83105
.hasSingleBean(ReactivePulsarTemplate.class).hasSingleBean(ReactivePulsarClient.class)
84106
.hasSingleBean(ProducerCacheProvider.class).hasSingleBean(ReactiveMessageSenderCache.class)
85-
.hasSingleBean(ReactivePulsarSenderFactory.class).getBean(ReactivePulsarTemplate.class));
107+
.hasSingleBean(ReactivePulsarSenderFactory.class).hasSingleBean(ReactivePulsarTemplate.class)
108+
.hasSingleBean(DefaultReactivePulsarListenerContainerFactory.class)
109+
.hasSingleBean(ReactivePulsarListenerAnnotationBeanPostProcessor.class)
110+
.hasSingleBean(ReactivePulsarListenerEndpointRegistry.class));
86111
}
87112

88113
@ParameterizedTest
@@ -95,6 +120,30 @@ <T> void customBeanIsRespected(Class<T> beanClass) {
95120
.run((context) -> assertThat(context).hasNotFailed().getBean(beanClass).isSameAs(bean));
96121
}
97122

123+
@Test
124+
void customReactivePulsarListenerContainerFactoryIsRespected() {
125+
ReactivePulsarListenerContainerFactory<String> listenerContainerFactory = mock(
126+
ReactivePulsarListenerContainerFactory.class);
127+
this.contextRunner
128+
.withBean("reactivePulsarListenerContainerFactory", ReactivePulsarListenerContainerFactory.class,
129+
() -> listenerContainerFactory)
130+
.run((context) -> assertThat(context).hasNotFailed()
131+
.getBean(ReactivePulsarListenerContainerFactory.class).isSameAs(listenerContainerFactory));
132+
}
133+
134+
@Test
135+
void customReactivePulsarListenerAnnotationBeanPostProcessorIsRespected() {
136+
ReactivePulsarListenerAnnotationBeanPostProcessor<String> listenerAnnotationBeanPostProcessor = mock(
137+
ReactivePulsarListenerAnnotationBeanPostProcessor.class);
138+
this.contextRunner
139+
.withBean("org.springframework.pulsar.config.internalReactivePulsarListenerAnnotationProcessor",
140+
ReactivePulsarListenerAnnotationBeanPostProcessor.class,
141+
() -> listenerAnnotationBeanPostProcessor)
142+
.run((context) -> assertThat(context).hasNotFailed()
143+
.getBean(ReactivePulsarListenerAnnotationBeanPostProcessor.class)
144+
.isSameAs(listenerAnnotationBeanPostProcessor));
145+
}
146+
98147
@Test
99148
void beansAreInjectedInReactivePulsarTemplate() {
100149
ReactivePulsarSenderFactory<?> senderFactory = mock(ReactivePulsarSenderFactory.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.autoconfigure;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.apache.pulsar.client.api.Message;
25+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
26+
import org.apache.pulsar.reactive.client.api.MessageResult;
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.boot.SpringApplication;
30+
import org.springframework.boot.WebApplicationType;
31+
import org.springframework.context.ConfigurableApplicationContext;
32+
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.context.annotation.Import;
35+
import org.springframework.pulsar.annotation.ReactivePulsarListener;
36+
import org.springframework.pulsar.core.PulsarTemplate;
37+
import org.springframework.pulsar.core.reactive.ReactiveMessageConsumerBuilderCustomizer;
38+
39+
import reactor.core.publisher.Flux;
40+
import reactor.core.publisher.Mono;
41+
42+
/**
43+
* Tests for {@link ReactivePulsarListener}.
44+
*
45+
* @author Christophe Bornet
46+
*/
47+
class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
48+
49+
static CountDownLatch latch1 = new CountDownLatch(1);
50+
static CountDownLatch latch2 = new CountDownLatch(10);
51+
52+
@Test
53+
void testBasicListener() throws Exception {
54+
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
55+
app.setWebApplicationType(WebApplicationType.NONE);
56+
app.setAllowCircularReferences(true);
57+
58+
try (ConfigurableApplicationContext context = app
59+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
60+
@SuppressWarnings("unchecked")
61+
final PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
62+
pulsarTemplate.send("hello-pulsar-exclusive", "John Doe");
63+
final boolean await = latch1.await(20, TimeUnit.SECONDS);
64+
assertThat(await).isTrue();
65+
}
66+
}
67+
68+
@Test
69+
void testFluxListener() throws Exception {
70+
SpringApplication app = new SpringApplication(FluxListenerConfig.class);
71+
app.setWebApplicationType(WebApplicationType.NONE);
72+
app.setAllowCircularReferences(true);
73+
74+
try (ConfigurableApplicationContext context = app
75+
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
76+
@SuppressWarnings("unchecked")
77+
final PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
78+
for (int i = 0; i < 10; i++) {
79+
pulsarTemplate.send("hello-pulsar-exclusive", "John Doe");
80+
}
81+
final boolean await = latch2.await(10, TimeUnit.SECONDS);
82+
assertThat(await).isTrue();
83+
}
84+
}
85+
86+
@Configuration
87+
@Import({ PulsarAutoConfiguration.class, PulsarReactiveAutoConfiguration.class })
88+
static class BasicListenerConfig {
89+
90+
@ReactivePulsarListener(subscriptionName = "test-exclusive-sub-1", topics = "hello-pulsar-exclusive",
91+
consumerCustomizer = "consumerCustomizer")
92+
public Mono<Void> listen(String foo) {
93+
latch1.countDown();
94+
return Mono.empty();
95+
}
96+
97+
@Bean
98+
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
99+
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
100+
}
101+
102+
}
103+
104+
@Configuration
105+
@Import({ PulsarAutoConfiguration.class, PulsarReactiveAutoConfiguration.class })
106+
static class FluxListenerConfig {
107+
108+
@ReactivePulsarListener(subscriptionName = "test-exclusive-sub-2", topics = "hello-pulsar-exclusive",
109+
stream = true, consumerCustomizer = "consumerCustomizer")
110+
public Flux<MessageResult<Void>> listen(Flux<Message<String>> messages) {
111+
return messages.doOnNext(t -> latch2.countDown()).map(m -> MessageResult.acknowledge(m.getMessageId()));
112+
}
113+
114+
@Bean
115+
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
116+
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
117+
}
118+
119+
}
120+
121+
}

0 commit comments

Comments
 (0)