Skip to content

Add more settings to Consumer in PulsarReactiveProperties #226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.KeySharedMode;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.reactive.client.api.ImmutableReactiveMessageConsumerSpec;
Expand Down Expand Up @@ -436,16 +435,16 @@ public static class Consumer {
*/
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

/*
* KeyShared mode of KeyShared subscription.
*/
private KeySharedMode keySharedMode;

/**
* Map of properties to add to the subscription.
*/
private SortedMap<String, String> subscriptionProperties = new TreeMap<>();

/**
* Subscription mode to be used when subscribing to the topic.
*/
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;

/**
* Number of messages that can be accumulated before the consumer calls "receive".
*/
Expand Down Expand Up @@ -482,7 +481,7 @@ public static class Consumer {
/**
* Whether the retry letter topic is enabled.
*/
private Boolean retryLetterTopicEnable;
private Boolean retryLetterTopicEnable = false;

/**
* Maximum number of messages that a consumer can be pushed at once from a broker
Expand Down Expand Up @@ -553,7 +552,7 @@ public static class Consumer {
*/
private Boolean autoUpdatePartitions = true;

private Duration autoUpdatePartitionsInterval;
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);

/**
* Whether to replicate subscription state.
Expand Down Expand Up @@ -609,14 +608,6 @@ public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}

public KeySharedMode getKeySharedMode() {
return this.keySharedMode;
}

public void setKeySharedMode(KeySharedMode keySharedMode) {
this.keySharedMode = keySharedMode;
}

public SortedMap<String, String> getSubscriptionProperties() {
return this.subscriptionProperties;
}
Expand All @@ -625,6 +616,14 @@ public void setSubscriptionProperties(SortedMap<String, String> subscriptionProp
this.subscriptionProperties = subscriptionProperties;
}

public SubscriptionMode getSubscriptionMode() {
return this.subscriptionMode;
}

public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
}

public Integer getReceiverQueueSize() {
return this.receiverQueueSize;
}
Expand Down Expand Up @@ -835,11 +834,8 @@ public ReactiveMessageConsumerSpec buildReactiveMessageConsumerSpec() {
map.from(this::getTopicsPattern).to(spec::setTopicsPattern);
map.from(this::getSubscriptionName).to(spec::setSubscriptionName);
map.from(this::getSubscriptionType).to(spec::setSubscriptionType);
map.from(this::getKeySharedMode).as((mode) -> switch (mode) {
case STICKY -> KeySharedPolicy.stickyHashRange();
case AUTO_SPLIT -> KeySharedPolicy.autoSplitHashRange();
}).to(spec::setKeySharedPolicy);
map.from(this::getSubscriptionProperties).to(spec::setSubscriptionProperties);
map.from(this::getSubscriptionMode).to(spec::setSubscriptionMode);
map.from(this::getReceiverQueueSize).to(spec::setReceiverQueueSize);
map.from(this::getAcknowledgementsGroupTime).to(spec::setAcknowledgementsGroupTime);
map.from(this::getAcknowledgeAsynchronously).to(spec::setAcknowledgeAsynchronously);
Expand All @@ -862,6 +858,7 @@ public ReactiveMessageConsumerSpec buildReactiveMessageConsumerSpec() {
map.from(this::getProperties).to(spec::setProperties);
map.from(this::getReadCompacted).to(spec::setReadCompacted);
map.from(this::getBatchIndexAckEnabled).to(spec::setBatchIndexAckEnabled);
map.from(this::getSubscriptionInitialPosition).to(spec::setSubscriptionInitialPosition);
map.from(this::getTopicsPatternAutoDiscoveryPeriod).to(spec::setTopicsPatternAutoDiscoveryPeriod);
map.from(this::getTopicsPatternSubscriptionMode).to(spec::setTopicsPatternSubscriptionMode);
map.from(this::getAutoUpdatePartitions).to(spec::setAutoUpdatePartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.KeySharedMode;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
Expand Down Expand Up @@ -118,6 +119,7 @@ void consumerPropsToConsumerSpec() {
props.put("spring.pulsar.reactive.consumer.topics-pattern", "my-pattern");
props.put("spring.pulsar.reactive.consumer.subscription-name", "my-subscription");
props.put("spring.pulsar.reactive.consumer.subscription-type", "Shared");
props.put("spring.pulsar.reactive.consumer.subscription-mode", "NonDurable");
props.put("spring.pulsar.reactive.consumer.subscription-properties[my-sub-prop]", "my-sub-prop-value");
props.put("spring.pulsar.reactive.consumer.receiver-queue-size", "1");
props.put("spring.pulsar.reactive.consumer.acknowledgements-group-time", "2s");
Expand All @@ -137,6 +139,7 @@ void consumerPropsToConsumerSpec() {
props.put("spring.pulsar.reactive.consumer.properties[my-prop]", "my-prop-value");
props.put("spring.pulsar.reactive.consumer.read-compacted", "true");
props.put("spring.pulsar.reactive.consumer.batch-index-ack-enabled", "true");
props.put("spring.pulsar.reactive.consumer.subscription-initial-position", "Earliest");
props.put("spring.pulsar.reactive.consumer.topics-pattern-auto-discovery-period", "9s");
props.put("spring.pulsar.reactive.consumer.topics-pattern-subscription-mode", "AllTopics");
props.put("spring.pulsar.reactive.consumer.auto-update-partitions", "false");
Expand All @@ -153,6 +156,7 @@ void consumerPropsToConsumerSpec() {
assertThat(consumerSpec.getTopicsPattern().toString()).isEqualTo("my-pattern");
assertThat(consumerSpec.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(consumerSpec.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(consumerSpec.getSubscriptionMode()).isEqualTo(SubscriptionMode.NonDurable);
assertThat(consumerSpec.getSubscriptionProperties()).hasSize(1).containsEntry("my-sub-prop",
"my-sub-prop-value");
assertThat(consumerSpec.getReceiverQueueSize()).isEqualTo(1);
Expand All @@ -173,6 +177,7 @@ void consumerPropsToConsumerSpec() {
assertThat(consumerSpec.getProperties()).hasSize(1).containsEntry("my-prop", "my-prop-value");
assertThat(consumerSpec.getReadCompacted()).isTrue();
assertThat(consumerSpec.getBatchIndexAckEnabled()).isTrue();
assertThat(consumerSpec.getSubscriptionInitialPosition()).isEqualTo(SubscriptionInitialPosition.Earliest);
assertThat(consumerSpec.getTopicsPatternAutoDiscoveryPeriod()).isEqualTo(Duration.ofSeconds(9));
assertThat(consumerSpec.getTopicsPatternSubscriptionMode()).isEqualTo(RegexSubscriptionMode.AllTopics);
assertThat(consumerSpec.getAutoUpdatePartitions()).isFalse();
Expand All @@ -183,15 +188,6 @@ void consumerPropsToConsumerSpec() {
assertThat(consumerSpec.getExpireTimeOfIncompleteChunkedMessage()).isEqualTo(Duration.ofSeconds(12));
}

@ParameterizedTest
@EnumSource(KeySharedMode.class)
void keySharedModeProperty(KeySharedMode keySharedMode) {
bind("spring.pulsar.reactive.consumer.key-shared-mode", keySharedMode.name());
ReactiveMessageConsumerSpec consumerSpec = properties.buildReactiveMessageConsumerSpec();

assertThat(consumerSpec.getKeySharedPolicy().getKeySharedMode()).isEqualTo(keySharedMode);
}

@ParameterizedTest
@EnumSource(value = SchedulerType.class, names = "immediate", mode = Mode.EXCLUDE)
void acknowledgeScheduler(SchedulerType acknowledgeSchedulerType) {
Expand Down