Skip to content

Commit 6e7b845

Browse files
onobcphilwebb
andcommitted
Add support for Apache Pulsar
Add support for Apache Pulsar using the Spring for Apache Pulsar project. See spring-projectsgh-34763 Co-authored-by: Phillip Webb <pwebb@vmware.com>
1 parent 8f78acd commit 6e7b845

File tree

56 files changed

+5261
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+5261
-3
lines changed

buildSrc/src/main/java/org/springframework/boot/build/context/properties/DocumentConfigurationProperties.java

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ private void integrationPrefixes(Config prefix) {
171171
prefix.accept("spring.integration");
172172
prefix.accept("spring.jms");
173173
prefix.accept("spring.kafka");
174+
prefix.accept("spring.pulsar");
174175
prefix.accept("spring.rabbitmq");
175176
prefix.accept("spring.hazelcast");
176177
prefix.accept("spring.webservices");

spring-boot-project/spring-boot-autoconfigure/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ dependencies {
179179
optional("org.springframework.data:spring-data-redis")
180180
optional("org.springframework.graphql:spring-graphql")
181181
optional("org.springframework.hateoas:spring-hateoas")
182+
optional("org.springframework.pulsar:spring-pulsar")
183+
optional("org.springframework.pulsar:spring-pulsar-reactive")
182184
optional("org.springframework.security:spring-security-acl")
183185
optional("org.springframework.security:spring-security-config")
184186
optional("org.springframework.security:spring-security-data") {
@@ -255,6 +257,7 @@ dependencies {
255257
testImplementation("org.testcontainers:junit-jupiter")
256258
testImplementation("org.testcontainers:mongodb")
257259
testImplementation("org.testcontainers:neo4j")
260+
testImplementation("org.testcontainers:pulsar")
258261
testImplementation("org.testcontainers:testcontainers")
259262
testImplementation("org.yaml:snakeyaml")
260263

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.apache.pulsar.client.api.DeadLetterPolicy;
20+
import org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder;
21+
22+
import org.springframework.boot.context.properties.PropertyMapper;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* Helper class used to map {@link PulsarProperties.Consumer.DeadLetterPolicy dead letter
27+
* policy properties}.
28+
*
29+
* @author Chris Bono
30+
* @author Phillip Webb
31+
*/
32+
final class DeadLetterPolicyMapper {
33+
34+
private DeadLetterPolicyMapper() {
35+
}
36+
37+
static DeadLetterPolicy map(PulsarProperties.Consumer.DeadLetterPolicy policy) {
38+
Assert.state(policy.getMaxRedeliverCount() > 0,
39+
"Pulsar DeadLetterPolicy must have a positive 'max-redelivery-count' property value");
40+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
41+
DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
42+
map.from(policy::getMaxRedeliverCount).to(builder::maxRedeliverCount);
43+
map.from(policy::getRetryLetterTopic).to(builder::retryLetterTopic);
44+
map.from(policy::getDeadLetterTopic).to(builder::deadLetterTopic);
45+
map.from(policy::getInitialSubscriptionName).to(builder::initialSubscriptionName);
46+
return builder.build();
47+
}
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
import org.apache.pulsar.client.api.ConsumerBuilder;
23+
import org.apache.pulsar.client.api.ProducerBuilder;
24+
import org.apache.pulsar.client.api.PulsarClient;
25+
import org.apache.pulsar.client.api.ReaderBuilder;
26+
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
27+
28+
import org.springframework.beans.factory.ObjectProvider;
29+
import org.springframework.boot.autoconfigure.AutoConfiguration;
30+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
31+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
32+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
33+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
34+
import org.springframework.boot.util.LambdaSafe;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.context.annotation.Import;
38+
import org.springframework.pulsar.annotation.EnablePulsar;
39+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
40+
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
41+
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
42+
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
43+
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
44+
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
45+
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
46+
import org.springframework.pulsar.core.DefaultPulsarReaderFactory;
47+
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
48+
import org.springframework.pulsar.core.PulsarConsumerFactory;
49+
import org.springframework.pulsar.core.PulsarProducerFactory;
50+
import org.springframework.pulsar.core.PulsarReaderFactory;
51+
import org.springframework.pulsar.core.PulsarTemplate;
52+
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
53+
import org.springframework.pulsar.core.SchemaResolver;
54+
import org.springframework.pulsar.core.TopicResolver;
55+
import org.springframework.pulsar.listener.PulsarContainerProperties;
56+
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
57+
58+
/**
59+
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
60+
*
61+
* @author Chris Bono
62+
* @author Soby Chacko
63+
* @author Alexander Preuß
64+
* @author Phillip Webb
65+
* @since 3.2.0
66+
*/
67+
@AutoConfiguration
68+
@ConditionalOnClass({ PulsarClient.class, PulsarTemplate.class })
69+
@Import(PulsarConfiguration.class)
70+
public class PulsarAutoConfiguration {
71+
72+
private PulsarProperties properties;
73+
74+
private PulsarPropertiesMapper propertiesMapper;
75+
76+
PulsarAutoConfiguration(PulsarProperties properties) {
77+
this.properties = properties;
78+
this.propertiesMapper = new PulsarPropertiesMapper(properties);
79+
}
80+
81+
@Bean
82+
@ConditionalOnMissingBean(PulsarProducerFactory.class)
83+
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
84+
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
85+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
86+
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
87+
customizersProvider);
88+
return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
89+
lambdaSafeCustomizers, topicResolver);
90+
}
91+
92+
@Bean
93+
@ConditionalOnMissingBean(PulsarProducerFactory.class)
94+
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
95+
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
96+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
97+
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
98+
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
99+
customizersProvider);
100+
return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
101+
lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(),
102+
cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity());
103+
}
104+
105+
private List<ProducerBuilderCustomizer<Object>> lambdaSafeProducerBuilderCustomizers(
106+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
107+
List<ProducerBuilderCustomizer<?>> customizers = new ArrayList<>();
108+
customizers.add(this.propertiesMapper::customizeProducerBuilder);
109+
customizers.addAll(customizersProvider.orderedStream().toList());
110+
return List.of((builder) -> applyProducerBuilderCustomizers(customizers, builder));
111+
}
112+
113+
@SuppressWarnings("unchecked")
114+
private void applyProducerBuilderCustomizers(List<ProducerBuilderCustomizer<?>> customizers,
115+
ProducerBuilder<?> builder) {
116+
LambdaSafe.callbacks(ProducerBuilderCustomizer.class, customizers, builder)
117+
.invoke((customizer) -> customizer.customize(builder));
118+
}
119+
120+
@Bean
121+
@ConditionalOnMissingBean
122+
PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
123+
ObjectProvider<ProducerInterceptor> producerInterceptors, SchemaResolver schemaResolver,
124+
TopicResolver topicResolver) {
125+
return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(),
126+
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
127+
}
128+
129+
@Bean
130+
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
131+
DefaultPulsarConsumerFactory<Object> pulsarConsumerFactory(PulsarClient pulsarClient,
132+
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider) {
133+
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
134+
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
135+
customizers.addAll(customizersProvider.orderedStream().toList());
136+
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
137+
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
138+
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
139+
}
140+
141+
@SuppressWarnings("unchecked")
142+
private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>> customizers,
143+
ConsumerBuilder<?> builder) {
144+
LambdaSafe.callbacks(ConsumerBuilderCustomizer.class, customizers, builder)
145+
.invoke((customizer) -> customizer.customize(builder));
146+
}
147+
148+
@Bean
149+
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
150+
ConcurrentPulsarListenerContainerFactory<Object> pulsarListenerContainerFactory(
151+
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
152+
TopicResolver topicResolver) {
153+
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
154+
containerProperties.setSchemaResolver(schemaResolver);
155+
containerProperties.setTopicResolver(topicResolver);
156+
this.propertiesMapper.customizeContainerProperties(containerProperties);
157+
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
158+
}
159+
160+
@Bean
161+
@ConditionalOnMissingBean(PulsarReaderFactory.class)
162+
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
163+
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider) {
164+
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
165+
customizers.add(this.propertiesMapper::customizeReaderBuilder);
166+
customizers.addAll(customizersProvider.orderedStream().toList());
167+
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
168+
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
169+
return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
170+
}
171+
172+
@SuppressWarnings("unchecked")
173+
private void applyReaderBuilderCustomizers(List<ReaderBuilderCustomizer<?>> customizers, ReaderBuilder<?> builder) {
174+
LambdaSafe.callbacks(ReaderBuilderCustomizer.class, customizers, builder)
175+
.invoke((customizer) -> customizer.customize(builder));
176+
}
177+
178+
@Bean
179+
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
180+
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
181+
SchemaResolver schemaResolver) {
182+
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
183+
readerContainerProperties.setSchemaResolver(schemaResolver);
184+
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
185+
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
186+
}
187+
188+
@Configuration(proxyBeanMethods = false)
189+
@EnablePulsar
190+
@ConditionalOnMissingBean(name = { PulsarAnnotationSupportBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
191+
PulsarAnnotationSupportBeanNames.PULSAR_READER_ANNOTATION_PROCESSOR_BEAN_NAME })
192+
static class EnablePulsarConfiguration {
193+
194+
}
195+
196+
}

0 commit comments

Comments
 (0)