Skip to content

Commit

Permalink
Ensure default topics are fully qualified (#849)
Browse files Browse the repository at this point in the history
* Ensure default topics are fully qualified

Prior to this commit, only the topics specified on the `@PulsarListener`
were being fully qualified in the `DefaultPulsarConsumerFactory`. With
this change all topics (including the default topics driven by the
Spring Boot `spring.pulsar.consumer.topics` config prop) are
fully-qualified.
  • Loading branch information
onobc authored Sep 13, 2024
1 parent b16bfae commit bce10bd
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -160,7 +160,8 @@ static class ConfigPropsDrivenListenerConfig {
public void listen(String ignored, Consumer<String> consumer) {
assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))
.satisfies((conf) -> {
assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev");
assertThat(conf.getSingleTopic())
.isEqualTo("persistent://public/default/plit-config-props-topic-dev");
assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
Objects.requireNonNull(schema, "Schema must be specified");
ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);

// Apply the default config customizer (preserve the topic)
// Apply the default config customizer
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder)));
}
// Preserve the passed in topics (don't let default config customizer win)
if (topics != null) {
replaceTopicsOnBuilder(consumerBuilder, topics);
}
Expand All @@ -117,6 +118,9 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}
if (this.topicBuilder != null) {
this.ensureTopicNamesFullyQualified(consumerBuilder);
}
try {
return consumerBuilder.subscribe();
}
Expand All @@ -126,9 +130,6 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
}

private void replaceTopicsOnBuilder(ConsumerBuilder<T> builder, Collection<String> topics) {
if (this.topicBuilder != null) {
topics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
}
var builderImpl = (ConsumerBuilderImpl<T>) builder;
builderImpl.getConf().setTopicNames(new HashSet<>(topics));
}
Expand All @@ -139,4 +140,13 @@ private void replaceMetadataPropertiesOnBuilder(ConsumerBuilder<T> builder,
builderImpl.getConf().setProperties(new TreeMap<>(metadataProperties));
}

protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
var builderImpl = (ConsumerBuilderImpl<T>) builder;
var topics = builderImpl.getConf().getTopicNames();
if (!CollectionUtils.isEmpty(topics)) {
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics));
}
}

}

0 comments on commit bce10bd

Please sign in to comment.