Skip to content

Commit 062e79e

Browse files
authored
Add boot property support to Pulsar Reader (imperative) (#327)
1 parent e67f22b commit 062e79e

File tree

3 files changed

+163
-0
lines changed

3 files changed

+163
-0
lines changed

buildSrc/src/main/java/org/springframework/pulsar/gradle/docs/configprops/DocumentConfigurationProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* @author Phillip Webb
3636
* @author Chris Bono
3737
* @author Alexander Preuß
38+
* @author Soby Chacko
3839
*/
3940
public class DocumentConfigurationProperties extends DefaultTask {
4041

@@ -73,6 +74,9 @@ void documentConfigurationProperties() throws IOException {
7374
c.accept("spring.pulsar.consumer");
7475
c.accept("spring.pulsar.listener");
7576
});
77+
snippets.add("application-properties.pulsar-reader", "Pulsar Reader Properties", (c) -> {
78+
c.accept("spring.pulsar.reader");
79+
});
7680
snippets.add("application-properties.pulsar-defaults", "Pulsar Defaults Properties", (c) -> c.accept("spring.pulsar.defaults"));
7781
snippets.add("application-properties.pulsar-function", "Pulsar Function Properties", (c) -> c.accept("spring.pulsar.function"));
7882
snippets.add("application-properties.pulsar-administration", "Pulsar Administration Properties", (c) -> c.accept("spring.pulsar.administration"));

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ public class PulsarProperties {
7979

8080
private final Admin admin = new Admin();
8181

82+
private final Reader reader = new Reader();
83+
8284
private final Defaults defaults = new Defaults();
8385

8486
public Consumer getConsumer() {
@@ -109,6 +111,10 @@ public Admin getAdministration() {
109111
return this.admin;
110112
}
111113

114+
public Reader getReader() {
115+
return this.reader;
116+
}
117+
112118
public Defaults getDefaults() {
113119
return this.defaults;
114120
}
@@ -129,6 +135,10 @@ public Map<String, Object> buildAdminProperties() {
129135
return new HashMap<>(this.admin.buildProperties());
130136
}
131137

138+
public Map<String, Object> buildReaderProperties() {
139+
return new HashMap<>(this.reader.buildProperties());
140+
}
141+
132142
public static class Consumer {
133143

134144
/**
@@ -2075,6 +2085,119 @@ public Map<String, Object> buildProperties() {
20752085

20762086
}
20772087

2088+
public static class Reader {
2089+
2090+
/**
2091+
* Topic names.
2092+
*/
2093+
private String[] topicNames;
2094+
2095+
/**
2096+
* Size of a consumer's receiver queue.
2097+
*/
2098+
private Integer receiverQueueSize;
2099+
2100+
/**
2101+
* Reader name.
2102+
*/
2103+
private String readerName;
2104+
2105+
/**
2106+
* Subscription name.
2107+
*/
2108+
private String subscriptionName;
2109+
2110+
/**
2111+
* Prefix of subscription role.
2112+
*/
2113+
private String subscriptionRolePrefix;
2114+
2115+
/**
2116+
* Whether to read messages from a compacted topic rather than a full message
2117+
* backlog of a topic.
2118+
*/
2119+
private Boolean readCompacted;
2120+
2121+
/**
2122+
* Whether the first message to be returned is the one specified by messageId.
2123+
*/
2124+
private Boolean resetIncludeHead;
2125+
2126+
public String[] getTopicNames() {
2127+
return this.topicNames;
2128+
}
2129+
2130+
public void setTopicNames(String[] topicNames) {
2131+
this.topicNames = topicNames;
2132+
}
2133+
2134+
public Integer getReceiverQueueSize() {
2135+
return this.receiverQueueSize;
2136+
}
2137+
2138+
public void setReceiverQueueSize(Integer receiverQueueSize) {
2139+
this.receiverQueueSize = receiverQueueSize;
2140+
}
2141+
2142+
public String getReaderName() {
2143+
return this.readerName;
2144+
}
2145+
2146+
public void setReaderName(String readerName) {
2147+
this.readerName = readerName;
2148+
}
2149+
2150+
public String getSubscriptionName() {
2151+
return this.subscriptionName;
2152+
}
2153+
2154+
public void setSubscriptionName(String subscriptionName) {
2155+
this.subscriptionName = subscriptionName;
2156+
}
2157+
2158+
public String getSubscriptionRolePrefix() {
2159+
return this.subscriptionRolePrefix;
2160+
}
2161+
2162+
public void setSubscriptionRolePrefix(String subscriptionRolePrefix) {
2163+
this.subscriptionRolePrefix = subscriptionRolePrefix;
2164+
}
2165+
2166+
public Boolean getReadCompacted() {
2167+
return this.readCompacted;
2168+
}
2169+
2170+
public void setReadCompacted(Boolean readCompacted) {
2171+
this.readCompacted = readCompacted;
2172+
}
2173+
2174+
public Boolean getResetIncludeHead() {
2175+
return this.resetIncludeHead;
2176+
}
2177+
2178+
public void setResetIncludeHead(Boolean resetIncludeHead) {
2179+
this.resetIncludeHead = resetIncludeHead;
2180+
}
2181+
2182+
public Map<String, Object> buildProperties() {
2183+
2184+
PulsarProperties.Properties properties = new Properties();
2185+
2186+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
2187+
2188+
map.from(this::getTopicNames).to(properties.in("topicName"));
2189+
map.from(this::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
2190+
map.from(this::getReaderName).to(properties.in("readerName"));
2191+
map.from(this::getSubscriptionName).to(properties.in("subscriptionName"));
2192+
map.from(this::getSubscriptionRolePrefix).to(properties.in("subscriptionRolePrefix"));
2193+
map.from(this::getReadCompacted).to(properties.in("readCompacted"));
2194+
map.from(this::getResetIncludeHead).to(properties.in("resetIncludeHead"));
2195+
2196+
return properties;
2197+
}
2198+
2199+
}
2200+
20782201
public static class Defaults {
20792202

20802203
/**

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
4242
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
4343
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
44+
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
4445
import org.assertj.core.api.InstanceOfAssertFactories;
4546
import org.junit.jupiter.api.Nested;
4647
import org.junit.jupiter.api.Test;
@@ -56,6 +57,7 @@
5657
*
5758
* @author Chris Bono
5859
* @author Christophe Bornet
60+
* @author Soby Chacko
5961
*/
6062
public class PulsarPropertiesTests {
6163

@@ -503,4 +505,38 @@ void functionProperties() {
503505

504506
}
505507

508+
@Nested
509+
class ReaderPropertiesTests {
510+
511+
@Test
512+
void readerProperties() {
513+
Map<String, String> props = new HashMap<>();
514+
515+
props.put("spring.pulsar.reader.topic-names", "my-topic");
516+
props.put("spring.pulsar.reader.receiver-queue-size", "100");
517+
props.put("spring.pulsar.reader.reader-name", "my-reader");
518+
props.put("spring.pulsar.reader.subscription-name", "my-subscription");
519+
props.put("spring.pulsar.reader.subscription-role-prefix", "sub-role");
520+
props.put("spring.pulsar.reader.read-compacted", "true");
521+
props.put("spring.pulsar.reader.reset-include-head", "true");
522+
bind(props);
523+
524+
Map<String, Object> readerProps = properties.buildReaderProperties();
525+
526+
// Verify that the props can be loaded in a ReaderBuilder
527+
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(readerProps,
528+
new ReaderConfigurationData<>(), ReaderConfigurationData.class));
529+
530+
assertThat(readerProps)
531+
.hasEntrySatisfying("topicName",
532+
topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.array(String[].class))
533+
.containsExactly("my-topic"))
534+
.containsEntry("receiverQueueSize", 100).containsEntry("readerName", "my-reader")
535+
.containsEntry("subscriptionName", "my-subscription")
536+
.containsEntry("subscriptionRolePrefix", "sub-role").containsEntry("readCompacted", true)
537+
.containsEntry("resetIncludeHead", true);
538+
}
539+
540+
}
541+
506542
}

0 commit comments

Comments
 (0)